Skip to content

Commit

Permalink
Kafka Connect: Add table to topics mapping property
Browse files Browse the repository at this point in the history
  • Loading branch information
igorvoltaic committed Oct 17, 2024
1 parent 3def1f4 commit 14ea47f
Showing 1 changed file with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String WRITE_PROP_PREFIX = "iceberg.tables.write-props.";

private static final String CATALOG_NAME_PROP = "iceberg.catalog";
private static final String TOPIC_TO_TABLES_MAPPING_PROP =
"iceberg.tables.topic-to-table-mapping";
private static final String TABLES_PROP = "iceberg.tables";
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled";
private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field";
Expand Down Expand Up @@ -108,6 +110,12 @@ public static String version() {

private static ConfigDef newConfigDef() {
ConfigDef configDef = new ConfigDef();
configDef.define(
TOPIC_TO_TABLES_MAPPING_PROP,
Type.LIST,
null,
Importance.LOW,
"Comma-delimited list of topic to table mappings");
configDef.define(
TABLES_PROP,
ConfigDef.Type.LIST,
Expand Down Expand Up @@ -298,6 +306,18 @@ public String catalogName() {
return getString(CATALOG_NAME_PROP);
}

public Map<String, String> topicToTableMap() {
Map<String, String> topicToTableMap = Maps.newHashMap();
for (String topicToTable : getList(TOPIC_TO_TABLES_MAPPING_PROP)) {
String[] propsplit = topicToTable.trim().split(":");
if (propsplit.length == 2) {
topicToTableMap.put(propsplit[0].trim(), propsplit[1].trim());
}
}
LOG.debug("Config: topicToTableMap: {}", topicToTableMap);
return topicToTableMap;
}

public List<String> tables() {
return getList(TABLES_PROP);
}
Expand Down

0 comments on commit 14ea47f

Please sign in to comment.