Skip to content

Commit

Permalink
Implement routing and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
igorvoltaic committed Oct 17, 2024
1 parent 14ea47f commit 859f20e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,26 @@ private void save(SinkRecord record) {

if (config.dynamicTablesEnabled()) {
routeRecordDynamically(record);
} else if (config.topicToTableMap().size() > 0) {
routeRecordByMap(record);
} else {
routeRecordStatically(record);
}
}

private void routeRecordByMap(SinkRecord record) {
Map<String, String> topicToTableMap = config.topicToTableMap();
String topicName = record.topic();
String tableName = topicToTableMap.get(topicName);

if (tableName == null) {
routeRecordStatically(record);
return;
}

writerForTable(tableName, record, false).write(record);
}

private void routeRecordStatically(SinkRecord record) {
String routeField = config.tablesRouteField();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class SinkWriterTest {
optional(2, "data", Types.StringType.get()),
optional(3, "date", Types.StringType.get()));
private static final String ROUTE_FIELD = "fld";
private static final String TOPIC_NAME = "topic";

@BeforeEach
public void before() {
Expand Down Expand Up @@ -167,6 +168,19 @@ public void testDynamicNoRoute() {
assertThat(writerResults.size()).isEqualTo(0);
}

@Test
public void testTopicToTableMapRoute() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.topicToTableMap()).thenReturn(ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER));

Map<String, Object> value = ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER);

List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
assertThat(writerResults.size()).isEqualTo(1);
IcebergWriterResult writerResult = writerResults.get(0);
assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
}

private List<IcebergWriterResult> sinkWriterTest(
Map<String, Object> value, IcebergSinkConfig config) {
IcebergWriterResult writeResult =
Expand All @@ -187,7 +201,7 @@ private List<IcebergWriterResult> sinkWriterTest(
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
SinkRecord rec =
new SinkRecord(
"topic",
TOPIC_NAME,
1,
null,
"key",
Expand All @@ -200,7 +214,7 @@ private List<IcebergWriterResult> sinkWriterTest(

SinkWriterResult result = sinkWriter.completeWrite();

Offset offset = result.sourceOffsets().get(new TopicPartition("topic", 1));
Offset offset = result.sourceOffsets().get(new TopicPartition(TOPIC_NAME, 1));
assertThat(offset).isNotNull();
assertThat(offset.offset()).isEqualTo(101L); // should be 1 more than current offset
assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC));
Expand Down

0 comments on commit 859f20e

Please sign in to comment.