Skip to content

Latest commit

 

History

History
120 lines (91 loc) · 3.09 KB

File metadata and controls

120 lines (91 loc) · 3.09 KB

🌤️ CloudEvents Apache Kafka® - Apache Avro™ Serialization

Serialize and Deserialize CloudEvents integrated with Schema Registry.

Support:

Getting Started

  1. Dependency
  • Gradle

    repositories {
        // ...
    
        maven {
            url = uri('http://packages.confluent.io/maven/')
        }
    
        maven { url 'https://jitpack.io' }
    
    }
    
    dependencies {
        implementation 'com.github.kattlo:cloudevents-kafka-avro-serializer:v0.11.0'
    }
  • Apache Maven®

    <repositories>
    	<repository>
    	    <id>jitpack.io</id>
    	    <url>https://jitpack.io</url>
    	</repository>
    </repositories>
    
    <dependency>
        <groupId>com.github.kattlo</groupId>
        <artifactId>cloudevents-kafka-avro-serializer</artifactId>
        <version>v0.11.0</version>
    </dependency>
  1. Configure
  • Serializer
    cloudevents.serializer.encoding=BINARY
    schema.registry.url=http://configure.me:8081
    auto.register.schemas=true
    
    value.serializer=io.github.kattlo.cloudevents.KafkaAvroCloudEventSerializer
  • Deserializer
    specific.avro.reader=false #to use GenericRecord data
    #specific.avro.reader=true #to use strong typed data
    schema.registry.url=http://configure.me:8081
    
    value.deserializer=io.github.kattlo.cloudevents.KafkaAvroCloudEventDeserializer
  1. Use
  • Serialization
    import java.net.URI;
    import java.time.OffsetDateTime;
    import java.util.UUID;
    import io.cloudevents.core.builder.CloudEventBuilder;
    import io.github.kattlo.cloudevents.AvroCloudEventData;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    // . . .
    
    var event = CloudEventBuilder
        .v1()
        .withId(UUID.randomUUID().toString())
        .withSource(URI.create("/example"))
        .withType("type.example")
        .withTime(OffsetDateTime.now())
        .withData(AvroCloudEventData.MIME_TYPE, data)
        .build();
    
    var record = new ProducerRecord<>("my-topic", event);
    
    // --- create KafkaProducer with Serializer configurations --- //
    
    // producer.send(record);
  • Deserialization
    import io.github.kattlo.cloudevents.AvroCloudEventData;
    import io.cloudevents.CloudEvent;
    import org.apache.avro.generic.GenericRecord;
    
    // --- create KafkaConsumer with Deserializer configurations --- //
    
    // consumer.subscribe(...)
    
    //var records = consumer.pool()
    
    records.forEach(record -> {
    
      // Get the CloudEvent instance
      CloudEvent event = record.value();
    
      // when specific.avro.reader=false
      GenericRecord data = AvroCloudEventData.dataOf(event);
    
      // when specific.avro.reader=true
      YourType data = AvroCloudEventData.dataOf(event);
    
    });