-
Notifications
You must be signed in to change notification settings - Fork 647
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fixes the ingestion of physical messages containing multiple logical …
…messages (#7129) (#7130) * The IncomingPipelineMetricTags add a new tag only when the key is not yet in the collection for dealing with the use case of a payload containing multiple logical messages. * Remove acceptance test for now * Add failing test --------- Co-authored-by: Laila Bougria <[email protected]> Co-authored-by: sara pellegrini <[email protected]>
- Loading branch information
1 parent
a8cfa7e
commit 581218c
Showing
2 changed files
with
60 additions
and
2 deletions.
There are no files selected for viewing
57 changes: 57 additions & 0 deletions
57
src/NServiceBus.Core.Tests/Pipeline/IncomingPipelineMetricTagsTests.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
namespace NServiceBus.Core.Tests.Pipeline.Incoming; | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.IO; | ||
using System.Threading.Tasks; | ||
using MessageInterfaces.MessageMapper.Reflection; | ||
using NServiceBus.Pipeline; | ||
using NUnit.Framework; | ||
using Serialization; | ||
using Testing; | ||
using Transport; | ||
using Unicast.Messages; | ||
|
||
[TestFixture] | ||
public class IncomingPipelineMetricTagsTests | ||
{ | ||
[Test] | ||
public async Task Should_not_fail_when_handling_more_than_one_logical_message() | ||
{ | ||
var registry = new MessageMetadataRegistry(new Conventions().IsMessageType, true); | ||
|
||
registry.RegisterMessageTypesFoundIn( | ||
[ | ||
typeof(MyMessage) | ||
]); | ||
|
||
var context = new TestableIncomingPhysicalMessageContext | ||
{ | ||
Message = new IncomingMessage("messageId", new Dictionary<string, string> | ||
{ | ||
{ Headers.EnclosedMessageTypes, typeof(MyMessage).AssemblyQualifiedName } | ||
}, new ReadOnlyMemory<byte>(new byte[] { 1 })) | ||
}; | ||
|
||
var messageMapper = new MessageMapper(); | ||
var behavior = new DeserializeMessageConnector(new MessageDeserializerResolver(new FakeSerializer(), []), new LogicalMessageFactory(registry, messageMapper), registry, messageMapper, false); | ||
|
||
await behavior.Invoke(context, c => | ||
{ | ||
c.Extensions.Get<IncomingPipelineMetricTags>().Add("Same", "Same"); | ||
return Task.CompletedTask; | ||
}); | ||
|
||
Assert.That(true, Is.True); | ||
} | ||
|
||
class MyMessage : IMessage { } | ||
|
||
class FakeSerializer : IMessageSerializer | ||
{ | ||
public string ContentType { get; } | ||
public void Serialize(object message, Stream stream) => throw new NotImplementedException(); | ||
|
||
public object[] Deserialize(ReadOnlyMemory<byte> body, IList<Type> messageTypes = null) => [new MyMessage(), new MyMessage()]; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters