From 659c938533bf81e584869a1ddb54a155f99cb209 Mon Sep 17 00:00:00 2001 From: hariprasad-k Date: Tue, 18 Aug 2020 11:03:21 +0200 Subject: [PATCH] Recover the offset from HDFS will use the custom patch we did by adding or removing the topic name from the path. Fix a test that failed after the patch --- .../java/io/confluent/connect/hdfs/DataWriter.java | 7 ++++++- .../java/io/confluent/connect/hdfs/FileUtils.java | 13 +++++++++++++ .../connect/hdfs/TopicPartitionWriter.java | 12 +++++++++--- .../io/confluent/connect/hdfs/FileUtilsTest.java | 6 ++++++ .../hdfs/partitioner/TimeBasedPartitionerTest.java | 1 + 5 files changed, 35 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/confluent/connect/hdfs/DataWriter.java b/src/main/java/io/confluent/connect/hdfs/DataWriter.java index ef12aa50e..106122b41 100644 --- a/src/main/java/io/confluent/connect/hdfs/DataWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/DataWriter.java @@ -399,7 +399,12 @@ public void syncWithHive() throws ConnectException { try { for (String topic : topics) { - String topicDir = FileUtils.topicDirectory(url, topicDirs.get(topic), topic); + String topicDir = FileUtils.topicDirectory( + url, + topicDirs.get(topic), + topic, + connectorConfig.getBoolean(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG) + ); CommittedFileFilter filter = new TopicCommittedFileFilter(topic); FileStatus fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset( storage, diff --git a/src/main/java/io/confluent/connect/hdfs/FileUtils.java b/src/main/java/io/confluent/connect/hdfs/FileUtils.java index 184236562..d87131252 100644 --- a/src/main/java/io/confluent/connect/hdfs/FileUtils.java +++ b/src/main/java/io/confluent/connect/hdfs/FileUtils.java @@ -100,10 +100,23 @@ public static String committedFileName( return fileName(url, topicsDir, directory, name); } + public static String topicDirectory(String url, String topicsDir) { + return url + "/" + topicsDir; + } + public static String topicDirectory(String url, String topicsDir, String topic) { return url + "/" + topicsDir + "/" + topic; } + public static String topicDirectory(String url, String topicsDir, String topic, + Boolean includeTopicName) { + if (includeTopicName) { + return topicDirectory(url, topicsDir, topic); + } else { + return topicDirectory(url, topicsDir); + } + } + public static FileStatus fileStatusWithMaxOffset( Storage storage, Path path, diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index a62c957bf..96e6f703b 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -53,6 +53,7 @@ import io.confluent.connect.hdfs.storage.HdfsStorage; import io.confluent.connect.storage.StorageSinkConnectorConfig; import io.confluent.connect.storage.hive.HiveConfig; +import io.confluent.connect.storage.common.StorageCommonConfig; import io.confluent.connect.storage.partitioner.PartitionerConfig; import io.confluent.connect.storage.partitioner.TimeBasedPartitioner; import io.confluent.connect.storage.partitioner.TimestampExtractor; @@ -338,7 +339,12 @@ public void write() { case WRITE_PARTITION_PAUSED: if (currentSchema == null) { if (compatibility != StorageSchemaCompatibility.NONE && offset != -1) { - String topicDir = FileUtils.topicDirectory(url, topicsDir, tp.topic()); + String topicDir = FileUtils.topicDirectory( + url, + topicsDir, + tp.topic(), + connectorConfig.getBoolean(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG) + ); CommittedFileFilter filter = new TopicPartitionCommittedFileFilter(tp); FileStatus fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset( storage, @@ -422,8 +428,8 @@ public void write() { // records available if (recordCounter == 0 || !shouldRotateAndMaybeUpdateTimers(currentRecord, now)) { break; - } - + } + log.info( "committing files after waiting for rotateIntervalMs time but less than " + "flush.size records available." diff --git a/src/test/java/io/confluent/connect/hdfs/FileUtilsTest.java b/src/test/java/io/confluent/connect/hdfs/FileUtilsTest.java index efdf1993d..6d29f69ce 100644 --- a/src/test/java/io/confluent/connect/hdfs/FileUtilsTest.java +++ b/src/test/java/io/confluent/connect/hdfs/FileUtilsTest.java @@ -16,4 +16,10 @@ public void testExtractOffset() { public void testExtractOffsetInvalid() { assertEquals(1001, FileUtils.extractOffset("namespace+topic+1+1000+1001.avro")); } + + @Test + public void testTopicDirectories() { + assertEquals("hdfs:///topicsDir/topic", FileUtils.topicDirectory("hdfs://", "topicsDir", "topic", true)); + assertEquals("hdfs:///topicsDir", FileUtils.topicDirectory("hdfs://", "topicsDir", "topic", false)); + } } diff --git a/src/test/java/io/confluent/connect/hdfs/partitioner/TimeBasedPartitionerTest.java b/src/test/java/io/confluent/connect/hdfs/partitioner/TimeBasedPartitionerTest.java index f26248fed..688876bb1 100644 --- a/src/test/java/io/confluent/connect/hdfs/partitioner/TimeBasedPartitionerTest.java +++ b/src/test/java/io/confluent/connect/hdfs/partitioner/TimeBasedPartitionerTest.java @@ -66,6 +66,7 @@ private static class BiHourlyPartitioner extends TimeBasedPartitioner { @Override public void configure(Map config) { + super.configure(config); init(partitionDurationMs, pathFormat, Locale.FRENCH, DATE_TIME_ZONE, config); }