Skip to content

Commit

Permalink
Closes #1426: Run IIS experiments by relying on spark 3.4 version
Browse files Browse the repository at this point in the history
WIP.

Fixing task serialization issue by upgrading avro dependency from 1.8.10 to 1.11.1 which is already a part of sharelib342. This induced the requirement to align JsonConverter with the new code and one of the requirements to move it to a different package due to limited visibility of one of the crucial methods.

Further logging system dependency alignment to make unit tests output produced on console visible.
  • Loading branch information
marekhorst committed Sep 29, 2023
1 parent 2e84ec9 commit 37b5364
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.io.IOException;

import org.apache.avro.JsonConverter;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
Expand All @@ -32,7 +33,6 @@
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.util.Progressable;

import com.cloudera.science.avro.common.JsonConverter;
import com.cloudera.science.avro.common.SchemaLoader;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

import java.io.IOException;

import org.apache.avro.JsonConverter;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;

import com.cloudera.science.avro.common.JsonConverter;

public class AvroAsJSONRecordWriter implements RecordWriter<Text, Text> {

private final DataFileWriter<GenericRecord> writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* the specific language governing permissions and limitations under the
* License.
*/
package com.cloudera.science.avro.common;
package org.apache.avro;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -21,14 +21,13 @@
import java.util.Map;
import java.util.Set;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -119,26 +118,26 @@ private GenericRecord convert(Map<String, Object> raw, Schema schema)
Object value = null;
switch (fieldSchema.getType()) {
case BOOLEAN:
value = defaultValue.getValueAsBoolean();
value = defaultValue.asBoolean();
break;
case DOUBLE:
value = defaultValue.getValueAsDouble();
value = defaultValue.asDouble();
break;
case FLOAT:
value = (float) defaultValue.getValueAsDouble();
value = (float) defaultValue.asDouble();
break;
case INT:
value = defaultValue.getValueAsInt();
value = defaultValue.asInt();
break;
case LONG:
value = defaultValue.getValueAsLong();
value = defaultValue.asLong();
break;
case STRING:
value = defaultValue.getValueAsText();
value = defaultValue.asText();
break;
case MAP:
Map<String, Object> fieldMap = mapper.readValue(
defaultValue.getValueAsText(), Map.class);
defaultValue.asText(), Map.class);
Map<String, Object> mvalue = Maps.newHashMap();
for (Map.Entry<String, Object> e : fieldMap.entrySet()) {
mvalue.put(e.getKey(),
Expand All @@ -148,7 +147,7 @@ private GenericRecord convert(Map<String, Object> raw, Schema schema)
break;
case ARRAY:
List fieldArray = mapper.readValue(
defaultValue.getValueAsText(), List.class);
defaultValue.asText(), List.class);
List lvalue = Lists.newArrayList();
for (Object elem : fieldArray) {
lvalue.add(typeConvert(elem, name, fieldSchema.getElementType()));
Expand All @@ -157,7 +156,7 @@ private GenericRecord convert(Map<String, Object> raw, Schema schema)
break;
case RECORD:
Map<String, Object> fieldRec = mapper.readValue(
defaultValue.getValueAsText(), Map.class);
defaultValue.asText(), Map.class);
value = convert(fieldRec, fieldSchema);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.cloudera.science.avro.common;

import com.google.common.collect.ImmutableList;

import org.apache.avro.JsonConverter;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
Expand Down
4 changes: 4 additions & 0 deletions iis-3rdparty-avrojsoncoders/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.BitSet;

import org.apache.avro.AvroTypeException;
Expand Down Expand Up @@ -196,8 +197,10 @@ public void writeBytes(byte[] bytes, int start, int len) throws IOException {

private void writeByteArray(byte[] bytes, int start, int len)
throws IOException {
out.writeString(
new String(bytes, start, len, JsonDecoder.CHARSET));
out.writeString(
// mh: after upgrading avro version JsonDecoder.CHARSET got missing but the value used in that class was set to StandardCharsets.ISO_8859_1
// new String(bytes, start, len, JsonDecoder.CHARSET));
new String(bytes, start, len, StandardCharsets.ISO_8859_1));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import eu.dnetlib.iis.common.java.io.HdfsUtils;
import eu.dnetlib.iis.common.schemas.ReportEntry;
import eu.dnetlib.iis.common.spark.avro.AvroDataFrameReader;
import eu.dnetlib.iis.common.spark.avro.AvroDataFrameWriter;
import eu.dnetlib.iis.common.utils.RDDUtils;
import eu.dnetlib.iis.export.schemas.Citations;
import pl.edu.icm.sparkutils.avro.SparkAvroSaver;
Expand Down Expand Up @@ -86,10 +85,8 @@ public static void storeReportEntries(SparkAvroSaver avroSaver,
Dataset<ReportEntry> reportEntries,
String outputReportPath) {
storeReportEntries(avroSaver, reportEntries, outputReportPath, (ds, path) ->
// FIXME avoiding relying on writing Dataset<ReportEntry> which apparently is not easily achievable in spark3
// due to AvroDatasetSupport scala functions referring to classes which were made private in spark3
avroSaver.saveJavaRDD(ds.javaRDD(), ReportEntry.SCHEMA$, path));
// new AvroDataFrameWriter(ds).write(path, ReportEntry.SCHEMA$));
//mh: due to changes in avro serialization model in spark3 relying on AvroSaver instead of writer storing Datasets
avroSaver.saveJavaRDD(ds.toJavaRDD(), ReportEntry.SCHEMA$, path));
}

public static void storeReportEntries(SparkAvroSaver avroSaver, Dataset<ReportEntry> reportEntries,
Expand Down
44 changes: 41 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,28 @@
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.19.0</version>
</dependency>

<!-- just to make sure all transitive dependencies of an old log4j are excluded -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
<scope>provided</scope>
</dependency>

<!-- this dep is required to align on slf4j-api version used by spark-core-2.12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>

<!-- old log4j deps -->
<!--
Expand Down Expand Up @@ -378,18 +400,34 @@
<groupId>pl.edu.icm.spark-utils</groupId>
<artifactId>spark-utils_2.12</artifactId>
<version>1.0.2</version>
<!-- FIXME not available in spark3 sharelib folder, commenting out until finishing running tests -->
<!-- FIXME not available in spark3 sharelib folder yet, commenting out until finishing the testing phase -->
<!--
<scope>provided</scope>
-->
</dependency>

<!-- Avro dependencies -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${iis.avro.version}</version>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.2</version>
<scope>provided</scope>
</dependency>

<!-- updated avro does not include dependency required by iis-3rdparty-avrojsoncoders -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<!-- this was the version used by older avro, we might want to upgrade it -->
<version>1.8.10</version>
</dependency>


<dependency>
<groupId>org.apache.avro</groupId>
Expand Down

0 comments on commit 37b5364

Please sign in to comment.