Skip to content

Commit

Permalink
feat: kafka生产者示例
Browse files Browse the repository at this point in the history
  • Loading branch information
TomShiDi committed Mar 13, 2024
1 parent c42e657 commit 9a931d1
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 0 deletions.
8 changes: 8 additions & 0 deletions kafka-demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,12 @@
</parent>
<artifactId>kafka-demo</artifactId>
<name>kafka-demo</name>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.tomshidi.kafka.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
* @author TomShiDi
* @since 2024/3/13 14:39
*/
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 1.计算分区号
String keyStr = key.toString();
int keyHash = keyStr.hashCode();
Integer partitionCount = cluster.partitionCountForTopic(topic);
return keyHash % partitionCount;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.tomshidi.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
* @author TomShiDi
* @since 2024/3/13 10:01
*/
public class CustomCallbackProducer {
public static void main(String[] args) {
// 1.创建配置对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.tomshidi.kafka.partitioner.CustomPartitioner");
// 2.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 3.发送数据
for (int i = 1; i < 11; i++) {
String message = String.format("第%d号服务员", i);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first", i + "", message);
kafkaProducer.send(producerRecord,
(recordMetadata, ex) -> {
if (ex == null) {
int partition = recordMetadata.partition();
long offset = recordMetadata.offset();
System.out.printf("数据:%s\n分区:%d\n偏移量:%d\n", message, partition, offset);
}
});
}
// 4.关闭资源
kafkaProducer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.tomshidi.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
* @author TomShiDi
* @since 2024/3/13 10:01
*/
public class CustomProducer {
public static void main(String[] args) {
// 1.创建配置对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 3.发送数据
for (int i = 1; i < 11; i++) {
String message = String.format("第%d号服务员", i);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first", message);
kafkaProducer.send(producerRecord);
}
// 4.关闭资源
kafkaProducer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.tomshidi.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
* 生产者同步发送
* @author TomShiDi
* @since 2024/3/13 10:42
*/
public class CustomSyncProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.创建配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2.创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3. 循环发送
for (int i = 1; i < 11; i++) {
String message = String.format("第%d号服务员", i);
ProducerRecord<String, String> record = new ProducerRecord<>("first", message);
producer.send(record, (recordMetadata, ex) -> {
if (ex == null) {
int partition = recordMetadata.partition();
long offset = recordMetadata.offset();
System.out.printf("数据:%s\n分区:%d\n偏移量:%d\n", message, partition, offset);
}
}).get();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.tomshidi.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
* @author TomShiDi
* @since 2024/3/13 10:01
*/
public class CustomThroughputProducer {
public static void main(String[] args) {
// 1.创建配置对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.tomshidi.kafka.partitioner.CustomPartitioner");
// 性能优化配置
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// ack应答机制配置
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// 幂等性
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// 2.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 3.发送数据
for (int i = 1; i < 11; i++) {
String message = String.format("第%d号服务员", i);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first", i + "", message);
kafkaProducer.send(producerRecord,
(recordMetadata, ex) -> {
if (ex == null) {
int partition = recordMetadata.partition();
long offset = recordMetadata.offset();
System.out.printf("数据:%s\n分区:%d\n偏移量:%d\n", message, partition, offset);
}
});
}
// 4.关闭资源
kafkaProducer.close();
}
}

0 comments on commit 9a931d1

Please sign in to comment.