From 12b38ce675a300163f93aaabb1e61da43e6d4cd4 Mon Sep 17 00:00:00 2001 From: Jeffrey Brooks Date: Sat, 15 Jun 2024 15:05:27 -0700 Subject: [PATCH] Fix thread safety issue in consistency pipeline --- online/src/main/scala/ai/chronon/online/Fetcher.scala | 10 +++++++--- .../src/main/scala/ai/chronon/online/JoinCodec.scala | 7 +++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/Fetcher.scala b/online/src/main/scala/ai/chronon/online/Fetcher.scala index e7e490d0a..d8d77f16f 100644 --- a/online/src/main/scala/ai/chronon/online/Fetcher.scala +++ b/online/src/main/scala/ai/chronon/online/Fetcher.scala @@ -160,10 +160,8 @@ class Fetcher(val kvStore: KVStore, val joinName = joinConf.metaData.nameToFilePath val keySchema = StructType(s"${joinName.sanitize}_key", keyFields.toArray) - val keyCodec = AvroCodec.of(AvroConversions.fromChrononSchema(keySchema).toString) val baseValueSchema = StructType(s"${joinName.sanitize}_value", valueFields.toArray) - val baseValueCodec = AvroCodec.of(AvroConversions.fromChrononSchema(baseValueSchema).toString) - val joinCodec = JoinCodec(joinConf, keySchema, baseValueSchema, keyCodec, baseValueCodec) + val joinCodec = JoinCodec(joinConf, keySchema, baseValueSchema) logControlEvent(joinCodec) joinCodec } @@ -310,6 +308,12 @@ class Fetcher(val kvStore: KVStore, val loggingTry: Try[Unit] = joinCodecTry.map(codec => { val metaData = codec.conf.join.metaData val samplePercent = if (metaData.isSetSamplePercent) metaData.getSamplePercent else 0 + + // Exit early if sample percent is 0 + if (samplePercent == 0) { + return Response(resp.request, Success(resp.derivedValues)) + } + val keyBytes = encode(codec.keySchema, codec.keyCodec, resp.request.keys, cast = true) val hash = if (samplePercent > 0) { diff --git a/online/src/main/scala/ai/chronon/online/JoinCodec.scala b/online/src/main/scala/ai/chronon/online/JoinCodec.scala index 9cf3ea2fd..1277e687c 100644 --- a/online/src/main/scala/ai/chronon/online/JoinCodec.scala +++ b/online/src/main/scala/ai/chronon/online/JoinCodec.scala @@ -32,9 +32,7 @@ import ai.chronon.online.OnlineDerivationUtil.{ case class JoinCodec(conf: JoinOps, keySchema: StructType, - baseValueSchema: StructType, - keyCodec: AvroCodec, - baseValueCodec: AvroCodec) + baseValueSchema: StructType) extends Serializable { @transient lazy val valueSchema: StructType = { @@ -65,7 +63,8 @@ case class JoinCodec(conf: JoinOps, @transient lazy val renameOnlyDeriveFunc: (Map[String, Any], Map[String, Any]) => Map[String, Any] = buildRenameOnlyDerivationFunction(conf.derivationsScala) - @transient lazy val valueCodec: AvroCodec = AvroCodec.of(AvroConversions.fromChrononSchema(valueSchema).toString) + def valueCodec: AvroCodec = AvroCodec.of(AvroConversions.fromChrononSchema(valueSchema).toString) + def keyCodec: AvroCodec = AvroCodec.of(AvroConversions.fromChrononSchema(keySchema).toString) /* * Get the serialized string repr. of the logging schema.