Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

branch repeating the message to original stream #147

Open
saleel opened this issue Jul 25, 2020 · 1 comment
Open

branch repeating the message to original stream #147

saleel opened this issue Jul 25, 2020 · 1 comment

Comments

@saleel
Copy link

saleel commented Jul 25, 2020

I have something like this:

const [one$, two$, three$] = stream$
    .mapJSONConvenience()
    .tap((a) => console.log('raw', a))
    .mapWrapKafkaValue()
    .map(myCustomFunction)
    .map(myCustomFunction2)
    .branch([
      (message) => message.type === 'one',
      (message) => message.type === 'two',
      (message) => message.type === 'three',
    ]);

When I run this code the same message is replayed multiple times in the stream. I have a tap with console.log on line 3 and I get the same message 4 times in logged (one original + for each 3 predicates in the branch call).

After first logging, all other values came as [object Object]. I found this issue is because mapJSONConvenience mutate the original message value (in mapBufferValueToString). I was able to fix it (by cloning the original object), but the duplication issue is still there.

I ran unit test and put a tap with console.log here https://github.com/nodefluent/kafka-streams/blob/master/test/unit/KStream.test.js#L27 and I got the same result.

            const [
                streamA,
                streamB,
                streamTrue
            ] = parent
                .tap(m => console.log('raw', m))
                .branch([
                (message) => message.startsWith("a"),
                (message) => message.startsWith("b"),
                (message) => !!message
            ]);

I got item in parentMessages array repeated 4 times on the console.

UPDATE:
I believe it is because the stream that gets cloned (in branch) is the one that has all map functions. So the same message goes through mapWrapKafkaValue and myCustomFunction again. If the clone operation clones the original stream (without all the map functions applied) then this problem might not happen.

How can I fix this? I know I can probably take the 3 branches first and call all the map methods on each of them. But is there a better approach?

@saleel
Copy link
Author

saleel commented Jul 25, 2020

I have created a PR for preventing mapJSONConvenience from mutating the data #148

And now, even though all the map functions are called again for the same message, they are passed with original message and not the modified one (not [object Object]).

Note: Original issue of all map functions called extra times (for as many items as in branch array) is still there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant