From 1e091c1bfc77e0bd81c2722b36cec4419b90c9c0 Mon Sep 17 00:00:00 2001 From: Cristian Figueroa Date: Fri, 1 Mar 2024 10:05:35 -0800 Subject: [PATCH] [StagingQuery] allow java serializer for staging queries (#694) * [StagingQuery] allow java serializer for staging queries * Add argument to javaFetcher --- .../ai/chronon/spark/SparkSessionBuilder.scala | 15 ++++++++++----- .../scala/ai/chronon/spark/StagingQuery.scala | 3 ++- .../ai/chronon/spark/test/JavaFetcherTest.java | 2 +- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index 64ce6e654..be15616c5 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -35,7 +35,8 @@ object SparkSessionBuilder { def build(name: String, local: Boolean = false, localWarehouseLocation: Option[String] = None, - additionalConfig: Option[Map[String, String]] = None): SparkSession = { + additionalConfig: Option[Map[String, String]] = None, + enforceKryoSerializer: Boolean = true): SparkSession = { if (local) { //required to run spark locally with hive support enabled - for sbt test System.setSecurityManager(null) @@ -49,16 +50,20 @@ object SparkSessionBuilder { .config("spark.sql.session.timeZone", "UTC") //otherwise overwrite will delete ALL partitions, not just the ones it touches .config("spark.sql.sources.partitionOverwriteMode", "dynamic") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator") - .config("spark.kryoserializer.buffer.max", "2000m") - .config("spark.kryo.referenceTracking", "false") .config("hive.exec.dynamic.partition", "true") .config("hive.exec.dynamic.partition.mode", "nonstrict") .config("spark.sql.catalogImplementation", "hive") .config("spark.hadoop.hive.exec.max.dynamic.partitions", 30000) .config("spark.sql.legacy.timeParserPolicy", "LEGACY") + // Staging queries don't benefit from the KryoSerializer and in fact may fail with buffer underflow in some cases. + if (enforceKryoSerializer) { + baseBuilder + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator") + .config("spark.kryoserializer.buffer.max", "2000m") + .config("spark.kryo.referenceTracking", "false") + } additionalConfig.foreach { configMap => configMap.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) } } diff --git a/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala b/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala index c25c7b0d7..9570ed945 100644 --- a/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala +++ b/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala @@ -128,7 +128,8 @@ object StagingQuery { val stagingQueryJob = new StagingQuery( stagingQueryConf, parsedArgs.endDate(), - TableUtils(SparkSessionBuilder.build(s"staging_query_${stagingQueryConf.metaData.name}")) + TableUtils( + SparkSessionBuilder.build(s"staging_query_${stagingQueryConf.metaData.name}", enforceKryoSerializer = false)) ) stagingQueryJob.computeStagingQuery(parsedArgs.stepDays.toOption) } diff --git a/spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java b/spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java index 611c7c927..2bbefaede 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java +++ b/spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java @@ -39,7 +39,7 @@ public class JavaFetcherTest { String namespace = "java_fetcher_test"; - SparkSession session = SparkSessionBuilder.build(namespace, true, scala.Option.apply(null), scala.Option.apply(null)); + SparkSession session = SparkSessionBuilder.build(namespace, true, scala.Option.apply(null), scala.Option.apply(null), true); TableUtils tu = new TableUtils(session); InMemoryKvStore kvStore = new InMemoryKvStore(func(() -> tu)); MockApi mockApi = new MockApi(func(() -> kvStore), "java_fetcher_test");