From 9a931d11657c44ca162e753627c4d14f8866d0ee Mon Sep 17 00:00:00 2001 From: TomShiDi <1341109792@qq.com> Date: Wed, 13 Mar 2024 21:18:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20kafka=E7=94=9F=E4=BA=A7=E8=80=85?= =?UTF-8?q?=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka-demo/pom.xml | 8 +++ .../kafka/partitioner/CustomPartitioner.java | 31 +++++++++++ .../producer/CustomCallbackProducer.java | 38 ++++++++++++++ .../kafka/producer/CustomProducer.java | 32 ++++++++++++ .../kafka/producer/CustomSyncProducer.java | 36 +++++++++++++ .../producer/CustomThroughputProducer.java | 51 +++++++++++++++++++ 6 files changed, 196 insertions(+) create mode 100644 kafka-demo/src/main/java/com/tomshidi/kafka/partitioner/CustomPartitioner.java create mode 100644 kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomCallbackProducer.java create mode 100644 kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomProducer.java create mode 100644 kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomSyncProducer.java create mode 100644 kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomThroughputProducer.java diff --git a/kafka-demo/pom.xml b/kafka-demo/pom.xml index ffb5845..f345cfe 100644 --- a/kafka-demo/pom.xml +++ b/kafka-demo/pom.xml @@ -8,4 +8,12 @@ kafka-demo kafka-demo + + + + org.apache.kafka + kafka-clients + 3.0.0 + + diff --git a/kafka-demo/src/main/java/com/tomshidi/kafka/partitioner/CustomPartitioner.java b/kafka-demo/src/main/java/com/tomshidi/kafka/partitioner/CustomPartitioner.java new file mode 100644 index 0000000..0d07339 --- /dev/null +++ b/kafka-demo/src/main/java/com/tomshidi/kafka/partitioner/CustomPartitioner.java @@ -0,0 +1,31 @@ +package com.tomshidi.kafka.partitioner; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +import java.util.Map; + +/** + * @author TomShiDi + * @since 2024/3/13 14:39 + */ +public class CustomPartitioner implements Partitioner { + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + // 1.计算分区号 + String keyStr = key.toString(); + int keyHash = keyStr.hashCode(); + Integer partitionCount = cluster.partitionCountForTopic(topic); + return keyHash % partitionCount; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } +} diff --git a/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomCallbackProducer.java b/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomCallbackProducer.java new file mode 100644 index 0000000..fedb7c6 --- /dev/null +++ b/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomCallbackProducer.java @@ -0,0 +1,38 @@ +package com.tomshidi.kafka.producer; + +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + +/** + * @author TomShiDi + * @since 2024/3/13 10:01 + */ +public class CustomCallbackProducer { + public static void main(String[] args) { + // 1.创建配置对象 + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.tomshidi.kafka.partitioner.CustomPartitioner"); + // 2.创建kafka生产者对象 + KafkaProducer kafkaProducer = new KafkaProducer<>(properties); + // 3.发送数据 + for (int i = 1; i < 11; i++) { + String message = String.format("第%d号服务员", i); + ProducerRecord producerRecord = new ProducerRecord<>("first", i + "", message); + kafkaProducer.send(producerRecord, + (recordMetadata, ex) -> { + if (ex == null) { + int partition = recordMetadata.partition(); + long offset = recordMetadata.offset(); + System.out.printf("数据:%s\n分区:%d\n偏移量:%d\n", message, partition, offset); + } + }); + } + // 4.关闭资源 + kafkaProducer.close(); + } +} diff --git a/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomProducer.java b/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomProducer.java new file mode 100644 index 0000000..f8e1762 --- /dev/null +++ b/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomProducer.java @@ -0,0 +1,32 @@ +package com.tomshidi.kafka.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + +/** + * @author TomShiDi + * @since 2024/3/13 10:01 + */ +public class CustomProducer { + public static void main(String[] args) { + // 1.创建配置对象 + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + // 2.创建kafka生产者对象 + KafkaProducer kafkaProducer = new KafkaProducer<>(properties); + // 3.发送数据 + for (int i = 1; i < 11; i++) { + String message = String.format("第%d号服务员", i); + ProducerRecord producerRecord = new ProducerRecord<>("first", message); + kafkaProducer.send(producerRecord); + } + // 4.关闭资源 + kafkaProducer.close(); + } +} diff --git a/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomSyncProducer.java b/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomSyncProducer.java new file mode 100644 index 0000000..78281ac --- /dev/null +++ b/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomSyncProducer.java @@ -0,0 +1,36 @@ +package com.tomshidi.kafka.producer; + +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +/** + * 生产者同步发送 + * @author TomShiDi + * @since 2024/3/13 10:42 + */ +public class CustomSyncProducer { + public static void main(String[] args) throws ExecutionException, InterruptedException { + // 1.创建配置 + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + // 2.创建生产者对象 + KafkaProducer producer = new KafkaProducer<>(properties); + // 3. 循环发送 + for (int i = 1; i < 11; i++) { + String message = String.format("第%d号服务员", i); + ProducerRecord record = new ProducerRecord<>("first", message); + producer.send(record, (recordMetadata, ex) -> { + if (ex == null) { + int partition = recordMetadata.partition(); + long offset = recordMetadata.offset(); + System.out.printf("数据:%s\n分区:%d\n偏移量:%d\n", message, partition, offset); + } + }).get(); + } + } +} diff --git a/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomThroughputProducer.java b/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomThroughputProducer.java new file mode 100644 index 0000000..99b025d --- /dev/null +++ b/kafka-demo/src/main/java/com/tomshidi/kafka/producer/CustomThroughputProducer.java @@ -0,0 +1,51 @@ +package com.tomshidi.kafka.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + +/** + * @author TomShiDi + * @since 2024/3/13 10:01 + */ +public class CustomThroughputProducer { + public static void main(String[] args) { + // 1.创建配置对象 + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.tomshidi.kafka.partitioner.CustomPartitioner"); + // 性能优化配置 + properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); + properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + properties.put(ProducerConfig.LINGER_MS_CONFIG, 10); + properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); + // ack应答机制配置 + properties.put(ProducerConfig.ACKS_CONFIG, "all"); + properties.put(ProducerConfig.RETRIES_CONFIG, 3); + // 幂等性 + properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + + // 2.创建kafka生产者对象 + KafkaProducer kafkaProducer = new KafkaProducer<>(properties); + // 3.发送数据 + for (int i = 1; i < 11; i++) { + String message = String.format("第%d号服务员", i); + ProducerRecord producerRecord = new ProducerRecord<>("first", i + "", message); + kafkaProducer.send(producerRecord, + (recordMetadata, ex) -> { + if (ex == null) { + int partition = recordMetadata.partition(); + long offset = recordMetadata.offset(); + System.out.printf("数据:%s\n分区:%d\n偏移量:%d\n", message, partition, offset); + } + }); + } + // 4.关闭资源 + kafkaProducer.close(); + } +}