Skip to content

Commit

Permalink
[StagingQuery] allow java serializer for staging queries (#694)
Browse files Browse the repository at this point in the history
* [StagingQuery] allow java serializer for staging queries

* Add argument to javaFetcher
  • Loading branch information
cristianfr authored Mar 1, 2024
1 parent 5ae48b1 commit 1e091c1
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
15 changes: 10 additions & 5 deletions spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object SparkSessionBuilder {
def build(name: String,
local: Boolean = false,
localWarehouseLocation: Option[String] = None,
additionalConfig: Option[Map[String, String]] = None): SparkSession = {
additionalConfig: Option[Map[String, String]] = None,
enforceKryoSerializer: Boolean = true): SparkSession = {
if (local) {
//required to run spark locally with hive support enabled - for sbt test
System.setSecurityManager(null)
Expand All @@ -49,16 +50,20 @@ object SparkSessionBuilder {
.config("spark.sql.session.timeZone", "UTC")
//otherwise overwrite will delete ALL partitions, not just the ones it touches
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator")
.config("spark.kryoserializer.buffer.max", "2000m")
.config("spark.kryo.referenceTracking", "false")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.catalogImplementation", "hive")
.config("spark.hadoop.hive.exec.max.dynamic.partitions", 30000)
.config("spark.sql.legacy.timeParserPolicy", "LEGACY")

// Staging queries don't benefit from the KryoSerializer and in fact may fail with buffer underflow in some cases.
if (enforceKryoSerializer) {
baseBuilder
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator")
.config("spark.kryoserializer.buffer.max", "2000m")
.config("spark.kryo.referenceTracking", "false")
}
additionalConfig.foreach { configMap =>
configMap.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }
}
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/StagingQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ object StagingQuery {
val stagingQueryJob = new StagingQuery(
stagingQueryConf,
parsedArgs.endDate(),
TableUtils(SparkSessionBuilder.build(s"staging_query_${stagingQueryConf.metaData.name}"))
TableUtils(
SparkSessionBuilder.build(s"staging_query_${stagingQueryConf.metaData.name}", enforceKryoSerializer = false))
)
stagingQueryJob.computeStagingQuery(parsedArgs.stepDays.toOption)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

public class JavaFetcherTest {
String namespace = "java_fetcher_test";
SparkSession session = SparkSessionBuilder.build(namespace, true, scala.Option.apply(null), scala.Option.apply(null));
SparkSession session = SparkSessionBuilder.build(namespace, true, scala.Option.apply(null), scala.Option.apply(null), true);
TableUtils tu = new TableUtils(session);
InMemoryKvStore kvStore = new InMemoryKvStore(func(() -> tu));
MockApi mockApi = new MockApi(func(() -> kvStore), "java_fetcher_test");
Expand Down

0 comments on commit 1e091c1

Please sign in to comment.