Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for consuming events from Kafka #325

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions docs/documentation/configuration/eventhandlers.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ Eventing in Conductor provides for loose coupling between workflows and support

This includes:

1. Being able to produce an event (message) in an external system like SQS or internal to Conductor.
1. Being able to produce an event (message) in an external system like SQS, Kafka or internal to Conductor.
2. Start a workflow when a specific event occurs that matches the provided criteria.

Conductor provides SUB_WORKFLOW task that can be used to embed a workflow inside parent workflow. Eventing supports provides similar capability without explicitly adding dependencies and provides **fire-and-forget** style integrations.

## Event Task
Event task provides ability to publish an event (message) to either Conductor or an external eventing system like SQS. Event tasks are useful for creating event based dependencies for workflows and tasks.
Event task provides ability to publish an event (message) to either Conductor or an external eventing system like SQS or Kafka. Event tasks are useful for creating event based dependencies for workflows and tasks.

See [Event Task](workflowdef/systemtasks/event-task.md) for documentation.

Expand All @@ -20,7 +20,7 @@ Event handlers are listeners registered that executes an action when a matching
2. Fail a Task
3. Complete a Task

Event Handlers can be configured to listen to Conductor Events or an external event like SQS.
Event Handlers can be configured to listen to Conductor Events or an external event like SQS or Kafka.

## Configuration
Event Handlers are configured via ```/event/``` APIs.
Expand Down
136 changes: 136 additions & 0 deletions kafka-event-queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Event Queue

## Published Artifacts

Group: `com.netflix.conductor`

| Published Artifact | Description |
| ----------- | ----------- |
| conductor-kafka-event-queue | Support for integration with Kafka and consume events from it. |

## Modules

### Kafka

https://kafka.apache.org/

## kafka-event-queue

Provides ability to consume messages from Kafka.

## Usage

To use it in an event handler prefix the event with `kafka` followed by the topic.

Example:

```json
{
"name": "kafka_test_event_handler",
"event": "kafka:conductor-event",
"actions": [
{
"action": "start_workflow",
"start_workflow": {
"name": "workflow_triggered_by_kafka",
"input": {
"payload": "${payload}"
}
},
"expandInlineJSON": true
}
],
"active": true
}
```

The data from the kafka event has the format:

```json
{
"key": "key-1",
"headers": {
"header-1": "value1"
},
"payload": {
"first": "Marcelo",
"middle": "Billie",
"last": "Mertz"
}
}
```

* `key` is the key field in Kafka message.
* `headers` is the headers in the kafka message.
* `payload` is the message of the Kafka message.

To access them in the event handler use for example `"${payload}"` to access the payload property, which contains the kafka message data.

## Configuration

To enable the queue use set the following to true.

```properties
conductor.event-queues.kafka.enabled=true
```

There are is a set of shared properties these are:

```properties
# If kafka should be used with event queues like SQS or AMPQ
conductor.default-event-queue.type=kafka

# the bootstrap server ot use.
conductor.event-queues.kafka.bootstrap-servers=kafka:29092

# The topic to listen to
conductor.event-queues.kafka.topic=conductor-event

# The dead letter queue to use for events that had some error.
conductor.event-queues.kafka.dlq-topic=conductor-dlq

# topic prefix combined with conductor.default-event-queue.type
conductor.event-queues.kafka.listener-queue-prefix=conductor_

# The polling duration. Start at 500ms and reduce based on how your environment behaves.
conductor.event-queues.kafka.poll-time-duration=500ms
```

There are 3 clients that should be configured, there is the Consumer, responsible to consuming messages, Publisher that publishes messages to Kafka and the Admin which handles admin operations.

The supported properties for the 3 clients are the ones included in `org.apache.kafka:kafka-clients:3.5.1` for each client type.

## Consumer properties

Example of consumer settings.

```properties
conductor.event-queues.kafka.consumer.client.id=consumer-client
conductor.event-queues.kafka.consumer.auto.offset.reset=earliest
conductor.event-queues.kafka.consumer.enable.auto.commit=false
conductor.event-queues.kafka.consumer.fetch.min.bytes=1
conductor.event-queues.kafka.consumer.max.poll.records=500
conductor.event-queues.kafka.consumer.group-id=conductor-group
```

## Producer properties

Example of producer settings.

```properties
conductor.event-queues.kafka.producer.client.id=producer-client
conductor.event-queues.kafka.producer.acks=all
conductor.event-queues.kafka.producer.retries=5
conductor.event-queues.kafka.producer.batch.size=16384
conductor.event-queues.kafka.producer.linger.ms=10
conductor.event-queues.kafka.producer.compression.type=gzip
```

## Admin properties

Example of admin settings.

```properties
conductor.event-queues.kafka.admin.client.id=admin-client
conductor.event-queues.kafka.admin.connections.max.idle.ms=10000
```
44 changes: 44 additions & 0 deletions kafka-event-queue/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
dependencies {
// Core Conductor dependencies
implementation project(':conductor-common')
implementation project(':conductor-core')

// Spring Boot support
implementation 'org.springframework.boot:spring-boot-starter'

// Apache Commons Lang for utility classes
implementation 'org.apache.commons:commons-lang3'

// Reactive programming support with RxJava
implementation "io.reactivex:rxjava:${revRxJava}"

// SBMTODO: Remove Guava dependency if possible
// Guava should only be included if specifically needed
implementation "com.google.guava:guava:${revGuava}"

// Removed AWS SQS SDK as we are transitioning to Kafka
// implementation "com.amazonaws:aws-java-sdk-sqs:${revAwsSdk}"

// Test dependencies
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation project(':conductor-common').sourceSets.test.output


// Add Kafka client dependency
implementation 'org.apache.kafka:kafka-clients:3.5.1'

// Add SLF4J API for logging
implementation 'org.slf4j:slf4j-api:2.0.9'

// Add SLF4J binding for logging with Logback
runtimeOnly 'ch.qos.logback:logback-classic:1.4.11'
}

// test {
// testLogging {
// events "passed", "skipped", "failed"
// showStandardStreams = true // Enable standard output
// }
// }


Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.kafkaeq.config;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.kafkaeq.eventqueue.KafkaObservableQueue.Builder;
import com.netflix.conductor.model.TaskModel.Status;

@Configuration
@EnableConfigurationProperties(KafkaEventQueueProperties.class)
@ConditionalOnProperty(name = "conductor.event-queues.kafka.enabled", havingValue = "true")
public class KafkaEventQueueConfiguration {

@Autowired private KafkaEventQueueProperties kafkaProperties;

private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaEventQueueConfiguration.class);

public KafkaEventQueueConfiguration(KafkaEventQueueProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}

@Bean
public EventQueueProvider kafkaEventQueueProvider() {
return new KafkaEventQueueProvider(kafkaProperties);
}

@ConditionalOnProperty(
name = "conductor.default-event-queue.type",
havingValue = "kafka",
matchIfMissing = false)
@Bean
public Map<Status, ObservableQueue> getQueues(
ConductorProperties conductorProperties, KafkaEventQueueProperties properties) {
try {

LOGGER.debug(
"Starting to create KafkaObservableQueues with properties: {}", properties);

String stack =
Optional.ofNullable(conductorProperties.getStack())
.filter(stackName -> stackName.length() > 0)
.map(stackName -> stackName + "_")
.orElse("");

LOGGER.debug("Using stack: {}", stack);

Status[] statuses = new Status[] {Status.COMPLETED, Status.FAILED};
Map<Status, ObservableQueue> queues = new HashMap<>();

for (Status status : statuses) {
// Log the status being processed
LOGGER.debug("Processing status: {}", status);

String queuePrefix =
StringUtils.isBlank(properties.getListenerQueuePrefix())
? conductorProperties.getAppId() + "_kafka_notify_" + stack
: properties.getListenerQueuePrefix();

LOGGER.debug("queuePrefix: {}", queuePrefix);

String topicName = queuePrefix + status.name();

LOGGER.debug("topicName: {}", topicName);

final ObservableQueue queue = new Builder(properties).build(topicName);
queues.put(status, queue);
}

LOGGER.debug("Successfully created queues: {}", queues);
return queues;
} catch (Exception e) {
LOGGER.error("Failed to create KafkaObservableQueues", e);
throw new RuntimeException("Failed to getQueues on KafkaEventQueueConfiguration", e);
}
}
}
Loading
Loading