Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Add table to topics mapping property #10422

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/docs/kafka-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.tables | Comma-separated list of destination tables |
| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.topic-to-table-mapping | For topic to table mapping, statically map topic name to table identifier to route records |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
| iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables |
Expand Down Expand Up @@ -350,3 +351,43 @@ See above for creating two tables.
}
}
```

### Topic mapped to table identifier, static routing

This example writes to tables based on their mappings from connector config. For example, `events_list` is mapped to `db.event_get_log` table and `events_create` is mapped to `db.event_add_log`.

#### Create two destination tables

```sql
CREATE TABLE db.event_get_log (
id STRING,
type STRING,
ts TIMESTAMP,
payload STRING)
PARTITIONED BY (hours(ts));

CREATE TABLE db.event_add_log (
id STRING,
type STRING,
ts TIMESTAMP,
payload STRING)
PARTITIONED BY (hours(ts));
```

#### Connector config

```json
{
"name": "events-sink",
"config": {
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "events_list,events_create",
"iceberg.tables.topic-to-table-mapping": "event_list:db.event_get_log,events_create:db.event_add_log",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse name>"
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String WRITE_PROP_PREFIX = "iceberg.tables.write-props.";

private static final String CATALOG_NAME_PROP = "iceberg.catalog";
private static final String TOPIC_TO_TABLES_MAPPING_PROP =
"iceberg.tables.topic-to-table-mapping";
private static final String TABLES_PROP = "iceberg.tables";
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled";
private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field";
Expand Down Expand Up @@ -108,6 +110,12 @@ public static String version() {

private static ConfigDef newConfigDef() {
ConfigDef configDef = new ConfigDef();
configDef.define(
TOPIC_TO_TABLES_MAPPING_PROP,
ConfigDef.Type.LIST,
null,
Importance.LOW,
"Comma-delimited list of topic to table mappings");
configDef.define(
TABLES_PROP,
ConfigDef.Type.LIST,
Expand Down Expand Up @@ -298,6 +306,18 @@ public String catalogName() {
return getString(CATALOG_NAME_PROP);
}

public Map<String, String> topicToTableMap() {
Map<String, String> topicToTableMap = Maps.newHashMap();
for (String topicToTable : getList(TOPIC_TO_TABLES_MAPPING_PROP)) {
List<String> propsplit = Splitter.on(':').splitToList(topicToTable.trim());
if (propsplit.size() == 2) {
topicToTableMap.put(propsplit.get(0).trim(), propsplit.get(1).trim());
}
}
LOG.debug("Config: topicToTableMap: {}", topicToTableMap);
return topicToTableMap;
}

public List<String> tables() {
return getList(TABLES_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,26 @@ private void save(SinkRecord record) {

if (config.dynamicTablesEnabled()) {
routeRecordDynamically(record);
} else if (config.topicToTableMap().size() > 0) {
routeRecordByMap(record);
} else {
routeRecordStatically(record);
}
}

private void routeRecordByMap(SinkRecord record) {
Map<String, String> topicToTableMap = config.topicToTableMap();
String topicName = record.topic();
String tableName = topicToTableMap.get(topicName);

if (tableName == null) {
routeRecordStatically(record);
return;
}

writerForTable(tableName, record, false).write(record);
}

private void routeRecordStatically(SinkRecord record) {
String routeField = config.tablesRouteField();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class SinkWriterTest {
optional(2, "data", Types.StringType.get()),
optional(3, "date", Types.StringType.get()));
private static final String ROUTE_FIELD = "fld";
private static final String TOPIC_NAME = "topic";

@BeforeEach
public void before() {
Expand Down Expand Up @@ -167,6 +168,21 @@ public void testDynamicNoRoute() {
assertThat(writerResults.size()).isEqualTo(0);
}

@Test
public void testTopicToTableMapRoute() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.topicToTableMap())
.thenReturn(ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER.toString()));
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));

Map<String, Object> value = ImmutableMap.of(TOPIC_NAME, TABLE_IDENTIFIER.toString());

List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
assertThat(writerResults.size()).isEqualTo(1);
IcebergWriterResult writerResult = writerResults.get(0);
assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
}

private List<IcebergWriterResult> sinkWriterTest(
Map<String, Object> value, IcebergSinkConfig config) {
IcebergWriterResult writeResult =
Expand All @@ -187,7 +203,7 @@ private List<IcebergWriterResult> sinkWriterTest(
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
SinkRecord rec =
new SinkRecord(
"topic",
TOPIC_NAME,
1,
null,
"key",
Expand All @@ -200,7 +216,7 @@ private List<IcebergWriterResult> sinkWriterTest(

SinkWriterResult result = sinkWriter.completeWrite();

Offset offset = result.sourceOffsets().get(new TopicPartition("topic", 1));
Offset offset = result.sourceOffsets().get(new TopicPartition(TOPIC_NAME, 1));
assertThat(offset).isNotNull();
assertThat(offset.offset()).isEqualTo(101L); // should be 1 more than current offset
assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC));
Expand Down