You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When I run this code the same message is replayed multiple times in the stream. I have a tap with console.log on line 3 and I get the same message 4 times in logged (one original + for each 3 predicates in the branch call).
After first logging, all other values came as [object Object]. I found this issue is because mapJSONConvenience mutate the original message value (in mapBufferValueToString). I was able to fix it (by cloning the original object), but the duplication issue is still there.
I got item in parentMessages array repeated 4 times on the console.
UPDATE:
I believe it is because the stream that gets cloned (in branch) is the one that has all map functions. So the same message goes through mapWrapKafkaValue and myCustomFunction again. If the clone operation clones the original stream (without all the map functions applied) then this problem might not happen.
How can I fix this? I know I can probably take the 3 branches first and call all the map methods on each of them. But is there a better approach?
The text was updated successfully, but these errors were encountered:
I have created a PR for preventing mapJSONConvenience from mutating the data #148
And now, even though all the map functions are called again for the same message, they are passed with original message and not the modified one (not [object Object]).
Note: Original issue of all map functions called extra times (for as many items as in branch array) is still there.
I have something like this:
When I run this code the same message is replayed multiple times in the stream. I have a tap with console.log on line 3 and I get the same message 4 times in logged (one original + for each 3 predicates in the branch call).
After first logging, all other values came as [object Object]. I found this issue is because
mapJSONConvenience
mutate the original message value (inmapBufferValueToString
). I was able to fix it (by cloning the original object), but the duplication issue is still there.I ran unit test and put a tap with console.log here https://github.com/nodefluent/kafka-streams/blob/master/test/unit/KStream.test.js#L27 and I got the same result.
I got item in
parentMessages
array repeated 4 times on the console.UPDATE:
I believe it is because the stream that gets cloned (in branch) is the one that has all map functions. So the same message goes through
mapWrapKafkaValue
andmyCustomFunction
again. If the clone operation clones the original stream (without all the map functions applied) then this problem might not happen.How can I fix this? I know I can probably take the 3 branches first and call all the map methods on each of them. But is there a better approach?
The text was updated successfully, but these errors were encountered: