Skip to content

Commit

Permalink
Closes #1426: Run IIS experiments by relying on spark 3.4 version
Browse files Browse the repository at this point in the history
WIP.

Commenting out avro related methods from scala sources which were relying on avro deserializer class which was made private in spark3.
Aligning sources with those changes by changing the way dataframes are constructed from collections of avro records and other refactoring required to compile IIS sources successsfully. This does not mean the code is already operational, some tests fail and still need to be fixed.

Upgrading logging system dependencies to match sharelib log4j dependencies version.
Upgrading maven-plugin-plugin version to solve build bug induced by upgraded log4j version.
  • Loading branch information
marekhorst committed Sep 27, 2023
1 parent 1449c01 commit 6f1a1d9
Show file tree
Hide file tree
Showing 18 changed files with 297 additions and 382 deletions.
2 changes: 1 addition & 1 deletion iis-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@
</build>

<properties>
<scala.version>2.11.12</scala.version>
<scala.version>2.12.14</scala.version>
</properties>

</project>

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ class AvroDataFrameSupport(val spark: SparkSession) extends Serializable {
* @tparam T Type of elements.
* @return DataFrame containing data from the given list.
*/
def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = {
createDataFrame(data.asScala, avroSchema)
}
// def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = {
// createDataFrame(data.asScala, avroSchema)
// }

/**
* Creates a dataframe from a given collection.
Expand All @@ -38,13 +38,13 @@ class AvroDataFrameSupport(val spark: SparkSession) extends Serializable {
* @tparam T Type of elements.
* @return DataFrame containing data from the given seq.
*/
def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = {
val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder.apply(rowSchema).resolveAndBind()
val deserializer = new AvroDeserializer(avroSchema, rowSchema)
val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow]))
spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema)
}
// def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = {
// val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
// val encoder = RowEncoder.apply(rowSchema).resolveAndBind()
// val deserializer = new AvroDeserializer(avroSchema, rowSchema)
// val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow]))
// spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema)
// }

/**
* Creates a dataset from given dataframe using kryo encoder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ class AvroDatasetSupport(val spark: SparkSession) extends Serializable {
* @tparam T Type of objects in the dataset.
* @return DataFrame of objects corresponding to records in the given dataset.
*/
def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = {
val avroSchemaStr = avroSchema.toString
val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder(rowSchema).resolveAndBind()

object SerializationSupport extends Serializable {
@transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema)
private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow]))

def doToDF(): DataFrame = {
spark.createDataFrame(rows, rowSchema)
}
}

SerializationSupport.doToDF()
}
// def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = {
// val avroSchemaStr = avroSchema.toString
// val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
// val encoder = RowEncoder(rowSchema).resolveAndBind()
//
// object SerializationSupport extends Serializable {
// @transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema)
// private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow]))
//
// def doToDF(): DataFrame = {
// spark.createDataFrame(rows, rowSchema)
// }
// }
//
// SerializationSupport.doToDF()
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class AvroDatasetWriter[T <: SpecificRecordBase](ds: Dataset[T]) extends Seriali
* @param path Path to the data store.
* @param avroSchema Avro schema of the records.
*/
def write(path: String, avroSchema: Schema): Unit = {
new AvroDataFrameWriter(new AvroDatasetSupport(ds.sparkSession).toDF(ds, avroSchema))
.write(path, avroSchema)
}
// def write(path: String, avroSchema: Schema): Unit = {
// new AvroDataFrameWriter(new AvroDatasetSupport(ds.sparkSession).toDF(ds, avroSchema))
// .write(path, avroSchema)
// }
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ public void beforeEach() {
support = new AvroDataFrameSupport(spark());
}

@Test
@DisplayName("Avro dataframe support creates dataframe from collection of avro type")
public void givenACollectionOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() {
Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build();
List<Person> data = Collections.singletonList(person);

Dataset<Row> result = support.createDataFrame(data, Person.SCHEMA$);

assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema());
List<Row> rows = result.collectAsList();
assertEquals(1, rows.size());
Row row = rows.get(0);
assertEquals(person.getId(), row.getAs("id"));
assertEquals(person.getName(), row.getAs("name"));
assertEquals(person.getAge(), row.getAs("age"));
}
// @Test
// @DisplayName("Avro dataframe support creates dataframe from collection of avro type")
// public void givenACollectionOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() {
// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build();
// List<Person> data = Collections.singletonList(person);
//
// Dataset<Row> result = support.createDataFrame(data, Person.SCHEMA$);
//
// assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema());
// List<Row> rows = result.collectAsList();
// assertEquals(1, rows.size());
// Row row = rows.get(0);
// assertEquals(person.getId(), row.getAs("id"));
// assertEquals(person.getName(), row.getAs("name"));
// assertEquals(person.getAge(), row.getAs("age"));
// }

@Test
@DisplayName("Avro dataframe support converts dataframe of avro type to dataset of avro type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,25 @@ public void beforeEach() {
super.beforeEach();
}

@Test
@DisplayName("Avro dataset support converts dataset of avro type to dataframe of avro type")
public void givenDatasetOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() {
Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build();
Dataset<Person> ds = spark().createDataset(
Collections.singletonList(person),
Encoders.kryo(Person.class)
);

Dataset<Row> result = new AvroDatasetSupport(spark()).toDF(ds, Person.SCHEMA$);

assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema());
List<Row> rows = result.collectAsList();
assertEquals(1, rows.size());
Row row = rows.get(0);
assertEquals(person.getId(), row.getAs("id"));
assertEquals(person.getName(), row.getAs("name"));
assertEquals(person.getAge(), row.getAs("age"));
}
// @Test
// @DisplayName("Avro dataset support converts dataset of avro type to dataframe of avro type")
// public void givenDatasetOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() {
// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build();
// Dataset<Person> ds = spark().createDataset(
// Collections.singletonList(person),
// Encoders.kryo(Person.class)
// );
//
// Dataset<Row> result = new AvroDatasetSupport(spark()).toDF(ds, Person.SCHEMA$);
//
// assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema());
// List<Row> rows = result.collectAsList();
// assertEquals(1, rows.size());
// Row row = rows.get(0);
// assertEquals(person.getId(), row.getAs("id"));
// assertEquals(person.getName(), row.getAs("name"));
// assertEquals(person.getAge(), row.getAs("age"));
// }

public static void assertSchemasEqualIgnoringNullability(Schema avroSchema, StructType sqlSchema) {
assertEquals(SchemaConverters.toSqlType(avroSchema).dataType().asNullable(), sqlSchema.asNullable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ public void beforeEach() {
super.beforeEach();
}

@Test
@DisplayName("Avro dataset writer writes dataset of avro type")
public void givenDatasetOfAvroType_whenWrittenToOutput_thenWriteSucceeds(@TempDir Path workingDir) throws IOException {
Path outputDir = workingDir.resolve("output");
Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build();
Dataset<Person> ds = spark().createDataset(
Collections.singletonList(person),
Encoders.kryo(Person.class)
);

new AvroDatasetWriter<>(ds).write(outputDir.toString(), Person.SCHEMA$);

List<Person> personList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString());
assertEquals(1, personList.size());
Person personRead = personList.get(0);
assertEquals(person, personRead);
}
// @Test
// @DisplayName("Avro dataset writer writes dataset of avro type")
// public void givenDatasetOfAvroType_whenWrittenToOutput_thenWriteSucceeds(@TempDir Path workingDir) throws IOException {
// Path outputDir = workingDir.resolve("output");
// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build();
// Dataset<Person> ds = spark().createDataset(
// Collections.singletonList(person),
// Encoders.kryo(Person.class)
// );
//
// new AvroDatasetWriter<>(ds).write(outputDir.toString(), Person.SCHEMA$);
//
// List<Person> personList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString());
// assertEquals(1, personList.size());
// Person personRead = personList.get(0);
// assertEquals(person, personRead);
// }
}
Loading

0 comments on commit 6f1a1d9

Please sign in to comment.