diff --git a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java index bf812d7ba..d77af8a49 100644 --- a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java @@ -18,6 +18,8 @@ import io.confluent.connect.hdfs.storage.HdfsStorage; import io.confluent.connect.storage.format.RecordWriter; import java.io.OutputStream; + +import org.apache.avro.AvroRuntimeException; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; @@ -84,6 +86,14 @@ public void write(SinkRecord record) { } } catch (IOException e) { throw new DataException(e); + } catch (AvroRuntimeException ex) { + // Making schema null to invalidate file corrupted due to connection failure + schema = null; + /* + Wrapped into ConnectException so that setRetryTimeout() is called in the + TopicPartitionWriter class to retry the records. + */ + throw new ConnectException(ex); } } diff --git a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java index aba47b9e5..1f71ba814 100644 --- a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java @@ -19,12 +19,15 @@ import io.confluent.connect.storage.format.RecordWriter; import io.confluent.connect.storage.format.RecordWriterProvider; import io.confluent.connect.storage.hive.HiveSchemaConverter; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.ipc.RemoteException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; @@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.URI; public class OrcRecordWriterProvider implements RecordWriterProvider { @@ -99,6 +103,24 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) { ); } } catch (IOException e) { + if (e instanceof RemoteException || e instanceof AlreadyBeingCreatedException) { + try { + /* + The current version of OrcFile.WriterOptions does not supports file overwriting + therefore deleting tmp file whenever`AlreadyBeingCreatedException` is thrown. + */ + FileSystem fileSystem = FileSystem.get( + URI.create(conf.getUrl()), + conf.getHadoopConfiguration() + ); + fileSystem.delete(path, true); + } catch (IOException ex) { + // connection refused to the HDFS cluster. + log.warn("Connection failed while deleting the tmp file from the HDFS cluster.", ex); + } + } + // Making schema null to invalidate corrupted file due to connection failure + schema = null; throw new ConnectException("Failed to write record: ", e); } }