-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
no .mapWrapKafkaKey() & .wrapAsKafkaKey() #152
Comments
Hi @cnwme , |
Hi, here is my code. const { Kafka, logLevel, CompressionCodecs, CompressionTypes } = require('kafkajs')
const dotenv = require('dotenv');
const {KEY} = require('uuid');
const { debug, info, Console } = require('console');
dotenv.config({ path: '.env' });
const { pid, cpuUsage } = require('process');
const idGroup = process.env.KAFKA_C_GROUPID;
const idProducer = process.env.PRODUCERID;
const mainTopics = process.env.KAFKA_C_TOPIC ? process.env.KAFKA_C_TOPIC.split(',') : ['TEST_KAFKA','TEST_KAFKA2']
const subTopics = process.env.KAFKA_P_TOPIC ? process.env.KAFKA_P_TOPIC.split(',') : ['TEST_KAFKA','TEST_KAFKA2']
const brokerString = process.env.KAFKA_BROKERS_STRING ? process.env.KAFKA_BROKERS_STRING.split(',') : ['kafka:9092']
const { KafkaStreams } = require('kafka-streams');
const config = dotenv.config({path: 'config'});
var now = new Date().toISOString();
const { nativeConfig: configure } = require("./config.js")
/** INIT Filtering */
const FilteringStreamMonoKafka = new KafkaStreams(configure);
/** INIT consumer */
const kafkaMainTopic = mainTopics;
/** INIT Producers */
const kafkaSubtopicBAL = process.env.KAFKA_P_STREAM_BAL;
const stream = FilteringStreamMonoKafka.getKStream()
FilteringStreamMonoKafka.on("error", (error) => console.error(error));
/** Start stream */
stream
/** from topic consumer */
.from(kafkaMainTopic)
/** condition */
stream.filter( (message) => {
console.log(message.value.toString())
/** condition push message to producer */
/*
return (
JSON.parse(message.value.toString()).values.brand[0].data === `${process.env.FILTER_BRAND_BAL}`
)
*/
if (JSON.parse(message.value.toString()).values.brand[0].data === `${process.env.FILTER_BRAND_BAL}`){
return true
}
})
.mapJSONConvenience(kafkaSubtopicBAL)
.mapWrapKafkaValue(kafkaSubtopicBAL)
.wrapAsKafkaValue(kafkaSubtopicBAL)
.to(kafkaSubtopicBAL, "auto", "buffer");
stream.start() Regards |
Thanks for the info @cnwme i'll take a look into this shortly. |
Hi @rob3000 , Regards |
Hi,
To add Key on produce messages after .filter() was not possible for me, always null even if the consuming message got a value.
Is There another way to push it to the producer with the : .to() ?
Regards
Martin
The text was updated successfully, but these errors were encountered: