Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DLQ support for Error records #34

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.tm.kafka.connect.rest.http.executor.RequestExecutor;
import com.tm.kafka.connect.rest.http.handler.DefaultResponseHandler;
import com.tm.kafka.connect.rest.http.handler.ResponseHandler;
import com.tm.kafka.connect.rest.errors.DLQReporter;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand All @@ -16,7 +17,9 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Properties;

import static com.tm.kafka.connect.rest.errors.DLQReporter.DLQ_TOPIC_CONFIG;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;


Expand Down Expand Up @@ -68,6 +71,11 @@ public class RestSinkConnectorConfig extends AbstractConfig {
private static final String SINK_DATE_FORMAT_DOC = "Date format for interpolation. The default is MM-dd-yyyy HH:mm:ss.SSS";
private static final String SINK_DATE_FORMAT_DEFAULT = "MM-dd-yyyy HH:mm:ss.SSS";

private static final String SINK_DLQ_KAFKA_ENABLE_CONFIG = "rest.deadletter.kafka.enabled";
private static final String SINK_DLQ_KAFKA_ENABLE_DISPLAY = "HTTP request error to kafka";
private static final String SINK_DLQ_KAFKA_ENABLE_DOC = "Enable deadletter queue support for error records";
private static final Boolean SINK_DLQ_KAFKA_ENABLE_DEFAULT = false;

private static final long SINK_RETRY_BACKOFF_DEFAULT = 5000L;


Expand Down Expand Up @@ -184,6 +192,16 @@ public static ConfigDef conf() {
++orderInGroup,
ConfigDef.Width.NONE,
SINK_DATE_FORMAT_DISPLAY)

.define(SINK_DLQ_KAFKA_ENABLE_CONFIG,
Type.BOOLEAN,
SINK_DLQ_KAFKA_ENABLE_DEFAULT,
Importance.LOW,
SINK_DLQ_KAFKA_ENABLE_DOC,
group,
++orderInGroup,
ConfigDef.Width.NONE,
SINK_DLQ_KAFKA_ENABLE_DISPLAY)
;
}

Expand Down Expand Up @@ -222,6 +240,17 @@ public ResponseHandler getResponseHandler() {
);
}

public Boolean isDlqKafkaEnabled() {
return this.getBoolean(SINK_DLQ_KAFKA_ENABLE_CONFIG);
}

public DLQReporter getDLQReporter() {
String topic = (String) this.originals().get(DLQ_TOPIC_CONFIG);
Properties props = new Properties();
props.putAll(this.originalsWithPrefix("producer."));
return new DLQReporter(topic, props);
}

private static class MethodRecommender implements ConfigDef.Recommender {
@Override
public List<Object> validValues(String name, Map<String, Object> connectorConfigs) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tm.kafka.connect.rest;

import com.tm.kafka.connect.rest.errors.DLQReporter;
import com.tm.kafka.connect.rest.http.Request;
import com.tm.kafka.connect.rest.http.Response;
import com.tm.kafka.connect.rest.http.executor.RequestExecutor;
Expand Down Expand Up @@ -29,6 +30,7 @@ public class RestSinkTask extends SinkTask {
private RequestExecutor executor;
private ResponseHandler responseHandler;
private String taskName = "";
private DLQReporter errorReporter;

@Override
public void start(Map<String, String> map) {
Expand All @@ -40,6 +42,9 @@ public void start(Map<String, String> map) {
maxRetries = connectorConfig.getMaxRetries();
responseHandler = connectorConfig.getResponseHandler();
executor = connectorConfig.getRequestExecutor();
if (connectorConfig.isDlqKafkaEnabled())
errorReporter = connectorConfig.getDLQReporter();
else errorReporter = null;
}

@Override
Expand Down Expand Up @@ -74,6 +79,7 @@ public void put(Collection<SinkRecord> records) {
} catch (RetriableException e) {
log.error("HTTP call failed", e);
increaseCounter(RETRIABLE_ERROR_METRIC, ctx);
if (retries == -1 && errorReporter != null) errorReporter.reportError(record, e);
try {
Thread.sleep(retryBackoff);
log.error("Retrying");
Expand All @@ -83,6 +89,7 @@ public void put(Collection<SinkRecord> records) {
} catch (Exception e) {
log.error("HTTP call execution failed " + e.getMessage(), e);
increaseCounter(UNRETRIABLE_ERROR_METRIC, ctx);
if (errorReporter != null) errorReporter.reportError(record, e);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.tm.kafka.connect.rest.errors;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

public class DLQReporter {
public static final String DLQ_TOPIC_CONFIG = "errors.deadletterqueue.topic.name";
public static final String DEFAULT_SERIALIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
public static final String HEADER_PREFIX = "__connect.errors.";
public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name";
public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message";
public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace";
private static final Logger log = LoggerFactory.getLogger(DLQReporter.class);
private String dlqTopic;
private Producer<byte[], byte[]> dlqProducer;

public DLQReporter(String topic, Properties properties) {
//set default serializers if required
if (!properties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_SERIALIZER);
if (!properties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_SERIALIZER);
this.dlqTopic = topic;
this.dlqProducer = new KafkaProducer<>(properties);
}

public DLQReporter() {
}

public void setDlqTopic(String dlqTopic) {
this.dlqTopic = dlqTopic;
}

public void setDlqProducer(Producer<byte[], byte[]> dlqProducer) {
this.dlqProducer = dlqProducer;
}

public void reportError(ConnectRecord msg, Exception e) {
ProducerRecord<byte[], byte[]> producerRecord;
if (msg.timestamp() == RecordBatch.NO_TIMESTAMP) {
producerRecord = new ProducerRecord<>(dlqTopic, null, toBytes(msg.key()), toBytes(msg.value()));

} else {
producerRecord = new ProducerRecord<>(dlqTopic, null, msg.timestamp(), toBytes(msg.key()), toBytes(msg.value()));
}
populateErrorHeaders(producerRecord, e);
this.dlqProducer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
log.error("Could not produce message to dead letter queue. topic=" + dlqTopic, exception);
}
});
}

private byte[] toBytes(Object value) {
if (value != null) {
return ((String) value).getBytes(StandardCharsets.UTF_8);
} else {
return null;
}
}

private void populateErrorHeaders(ProducerRecord<byte[], byte[]> producerRecord, Exception e) {
Headers headers = producerRecord.headers();
if (e != null) {
headers.add(ERROR_HEADER_EXCEPTION, toBytes(e.getClass().getName()));
headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, toBytes(e.getMessage()));
byte[] trace;
if ((trace = stacktrace(e)) != null) {
headers.add(ERROR_HEADER_EXCEPTION_STACK_TRACE, trace);
}
}
}

private byte[] stacktrace(Throwable error) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
PrintStream stream = new PrintStream(bos, true, "UTF-8");
error.printStackTrace(stream);
bos.close();
return bos.toByteArray();
} catch (IOException e) {
log.error("Could not serialize stacktrace.", e);
}
return null;
}

}


Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.tm.kafka.connect.rest.errors;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.RetriableException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)
public class DLQReporterTest {
@Mock
Producer<byte[], byte[]> producer;

@Mock
ConnectRecord record;

DLQReporter reporter;

@Before
public void setUp() {
reporter = new DLQReporter();
reporter.setDlqTopic("test-topic");
reporter.setDlqProducer(producer);
}

@Test
public void populateHeadersAndSendToKafka() throws Exception {
Exception e = new RetriableException("Test");
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
reporter.reportError(record, e);
verify(producer).send(captor.capture(), any());
List<String> HEADERS_KEY = Arrays.asList(DLQReporter.ERROR_HEADER_EXCEPTION, DLQReporter.ERROR_HEADER_EXCEPTION_MESSAGE, DLQReporter.ERROR_HEADER_EXCEPTION_STACK_TRACE);
ProducerRecord r = captor.getValue();
Map<String, String> errHeaders = new HashMap<>();
r.headers().forEach(header -> errHeaders.put(header.key(), new String(header.value())));
HEADERS_KEY.forEach(key -> Assert.assertTrue(errHeaders.containsKey(key)));
Assert.assertEquals("org.apache.kafka.connect.errors.RetriableException", errHeaders.get(DLQReporter.ERROR_HEADER_EXCEPTION));
Assert.assertEquals("Test", errHeaders.get(DLQReporter.ERROR_HEADER_EXCEPTION_MESSAGE));
}
}