Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recover the offset from HDFS, even if topic name is not present in storage path #528

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/main/java/io/confluent/connect/hdfs/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,12 @@ public void syncWithHive() throws ConnectException {

try {
for (String topic : topics) {
String topicDir = FileUtils.topicDirectory(url, topicDirs.get(topic), topic);
String topicDir = FileUtils.topicDirectory(
url,
topicDirs.get(topic),
topic,
connectorConfig.getBoolean(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG)
);
CommittedFileFilter filter = new TopicCommittedFileFilter(topic);
FileStatus fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset(
storage,
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,23 @@ public static String committedFileName(
return fileName(url, topicsDir, directory, name);
}

public static String topicDirectory(String url, String topicsDir) {
return url + "/" + topicsDir;
}

public static String topicDirectory(String url, String topicsDir, String topic) {
return url + "/" + topicsDir + "/" + topic;
}

public static String topicDirectory(String url, String topicsDir, String topic,
Boolean includeTopicName) {
if (includeTopicName) {
return topicDirectory(url, topicsDir, topic);
} else {
return topicDirectory(url, topicsDir);
}
}

public static FileStatus fileStatusWithMaxOffset(
Storage storage,
Path path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.storage.StorageSinkConnectorConfig;
import io.confluent.connect.storage.hive.HiveConfig;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.partitioner.PartitionerConfig;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import io.confluent.connect.storage.partitioner.TimestampExtractor;
Expand Down Expand Up @@ -338,7 +339,12 @@ public void write() {
case WRITE_PARTITION_PAUSED:
if (currentSchema == null) {
if (compatibility != StorageSchemaCompatibility.NONE && offset != -1) {
String topicDir = FileUtils.topicDirectory(url, topicsDir, tp.topic());
String topicDir = FileUtils.topicDirectory(
url,
topicsDir,
tp.topic(),
connectorConfig.getBoolean(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG)
);
CommittedFileFilter filter = new TopicPartitionCommittedFileFilter(tp);
FileStatus fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset(
storage,
Expand Down Expand Up @@ -422,8 +428,8 @@ public void write() {
// records available
if (recordCounter == 0 || !shouldRotateAndMaybeUpdateTimers(currentRecord, now)) {
break;
}
}

log.info(
"committing files after waiting for rotateIntervalMs time but less than "
+ "flush.size records available."
Expand Down
6 changes: 6 additions & 0 deletions src/test/java/io/confluent/connect/hdfs/FileUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@ public void testExtractOffset() {
public void testExtractOffsetInvalid() {
assertEquals(1001, FileUtils.extractOffset("namespace+topic+1+1000+1001.avro"));
}

@Test
public void testTopicDirectories() {
assertEquals("hdfs:///topicsDir/topic", FileUtils.topicDirectory("hdfs://", "topicsDir", "topic", true));
assertEquals("hdfs:///topicsDir", FileUtils.topicDirectory("hdfs://", "topicsDir", "topic", false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ private static class BiHourlyPartitioner extends TimeBasedPartitioner {

@Override
public void configure(Map<String, Object> config) {
super.configure(config);
init(partitionDurationMs, pathFormat, Locale.FRENCH, DATE_TIME_ZONE, config);
}

Expand Down