Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

no .mapWrapKafkaKey() & .wrapAsKafkaKey() #152

Open
cnwme opened this issue Aug 7, 2020 · 4 comments
Open

no .mapWrapKafkaKey() & .wrapAsKafkaKey() #152

cnwme opened this issue Aug 7, 2020 · 4 comments
Assignees
Labels

Comments

@cnwme
Copy link

cnwme commented Aug 7, 2020

Hi,
To add Key on produce messages after .filter() was not possible for me, always null even if the consuming message got a value.
Is There another way to push it to the producer with the : .to() ?
Regards
Martin

@rob3000
Copy link
Member

rob3000 commented Aug 11, 2020

Hi @cnwme ,
Are you able to provide more context and some code examples of what you are trying to achieve?

@cnwme
Copy link
Author

cnwme commented Aug 17, 2020

Hi,
At first i'm sorry about delay.
i'm trying to test a kafka stream to read topic A and with some filtering if it's true, put data to topic B as is in topic A (Key and Value event).
When using .mapWrapKafkaValue & .wrapAsKafkaValue we got the message value (on topic B) but the Key still null even if it contain values on topic A.

here is my code.

const { Kafka, logLevel, CompressionCodecs, CompressionTypes } = require('kafkajs')
const dotenv = require('dotenv');
const {KEY} = require('uuid');
const { debug, info, Console } = require('console');
dotenv.config({ path: '.env' });
const { pid, cpuUsage } = require('process');
const idGroup = process.env.KAFKA_C_GROUPID;
const idProducer = process.env.PRODUCERID;
const mainTopics = process.env.KAFKA_C_TOPIC ? process.env.KAFKA_C_TOPIC.split(',') : ['TEST_KAFKA','TEST_KAFKA2']
const subTopics = process.env.KAFKA_P_TOPIC ? process.env.KAFKA_P_TOPIC.split(',') : ['TEST_KAFKA','TEST_KAFKA2']
const brokerString = process.env.KAFKA_BROKERS_STRING ? process.env.KAFKA_BROKERS_STRING.split(',') : ['kafka:9092']
const { KafkaStreams } = require('kafka-streams');
const config = dotenv.config({path: 'config'});
var now = new Date().toISOString();
const { nativeConfig: configure } = require("./config.js")
/** INIT Filtering */
const FilteringStreamMonoKafka = new KafkaStreams(configure);
/** INIT consumer */
const kafkaMainTopic = mainTopics;
/** INIT Producers */
const kafkaSubtopicBAL = process.env.KAFKA_P_STREAM_BAL;
const stream = FilteringStreamMonoKafka.getKStream()
FilteringStreamMonoKafka.on("error", (error) => console.error(error));
    /** Start stream  */
    stream
    /** from topic consumer */
    .from(kafkaMainTopic)
    /** condition */
    stream.filter( (message) => { 
        console.log(message.value.toString())
        /** condition push message to producer */
        /*
        return ( 
            JSON.parse(message.value.toString()).values.brand[0].data === `${process.env.FILTER_BRAND_BAL}`
            )
        */
       if (JSON.parse(message.value.toString()).values.brand[0].data === `${process.env.FILTER_BRAND_BAL}`){
           return true
       }
    })
    .mapJSONConvenience(kafkaSubtopicBAL)
    .mapWrapKafkaValue(kafkaSubtopicBAL)
    .wrapAsKafkaValue(kafkaSubtopicBAL)
    .to(kafkaSubtopicBAL, "auto", "buffer");
stream.start()

Regards
Martin

@rob3000 rob3000 self-assigned this Aug 18, 2020
@rob3000
Copy link
Member

rob3000 commented Aug 18, 2020

Thanks for the info @cnwme i'll take a look into this shortly.

@cnwme
Copy link
Author

cnwme commented Sep 7, 2020

Hi @rob3000 ,
any update ?

Regards
Martin

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants