From 61b3b57c2c0d772de1659665fb4d80721dcf2ac9 Mon Sep 17 00:00:00 2001 From: "sagar.bindal" Date: Fri, 27 Nov 2020 12:57:54 +0530 Subject: [PATCH 1/2] Adding support for reconnection logic --- .../hdfs/avro/AvroRecordWriterProvider.java | 10 +++++++++ .../hdfs/orc/OrcRecordWriterProvider.java | 22 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java index bf812d7ba..d77af8a49 100644 --- a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java @@ -18,6 +18,8 @@ import io.confluent.connect.hdfs.storage.HdfsStorage; import io.confluent.connect.storage.format.RecordWriter; import java.io.OutputStream; + +import org.apache.avro.AvroRuntimeException; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; @@ -84,6 +86,14 @@ public void write(SinkRecord record) { } } catch (IOException e) { throw new DataException(e); + } catch (AvroRuntimeException ex) { + // Making schema null to invalidate file corrupted due to connection failure + schema = null; + /* + Wrapped into ConnectException so that setRetryTimeout() is called in the + TopicPartitionWriter class to retry the records. + */ + throw new ConnectException(ex); } } diff --git a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java index aba47b9e5..fcefc1df4 100644 --- a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java @@ -19,12 +19,15 @@ import io.confluent.connect.storage.format.RecordWriter; import io.confluent.connect.storage.format.RecordWriterProvider; import io.confluent.connect.storage.hive.HiveSchemaConverter; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.ipc.RemoteException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; @@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.URI; public class OrcRecordWriterProvider implements RecordWriterProvider { @@ -99,6 +103,24 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) { ); } } catch (IOException e) { + if (e instanceof RemoteException || e instanceof AlreadyBeingCreatedException) { + try { + /* + The current version of OrcFile.WriterOptions does not supports file overwriting + therefore deleting tmp file whenever`AlreadyBeingCreatedException` is thrown. + */ + FileSystem fileSystem = FileSystem.get( + URI.create(conf.getUrl()), + conf.getHadoopConfiguration() + ); + fileSystem.delete(path, true); + } catch (IOException ex) { + // connection refused to the HDFS cluster. + log.debug("Connection failed while deleting the tmp file from the HDFS cluster."); + } + } + // Making schema null to invalidate corrupted file due to connection failure + schema = null; throw new ConnectException("Failed to write record: ", e); } } From 3bc06c7c429964d240aa6bdcfcd1291c72c2bc3b Mon Sep 17 00:00:00 2001 From: SanchayGupta1197 Date: Mon, 14 Dec 2020 11:59:00 +0530 Subject: [PATCH 2/2] logging exception --- .../io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java index fcefc1df4..1f71ba814 100644 --- a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java @@ -116,7 +116,7 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) { fileSystem.delete(path, true); } catch (IOException ex) { // connection refused to the HDFS cluster. - log.debug("Connection failed while deleting the tmp file from the HDFS cluster."); + log.warn("Connection failed while deleting the tmp file from the HDFS cluster.", ex); } } // Making schema null to invalidate corrupted file due to connection failure