diff --git a/online/src/main/scala/ai/chronon/online/Fetcher.scala b/online/src/main/scala/ai/chronon/online/Fetcher.scala index a04f15127..61595c2ee 100644 --- a/online/src/main/scala/ai/chronon/online/Fetcher.scala +++ b/online/src/main/scala/ai/chronon/online/Fetcher.scala @@ -313,12 +313,16 @@ class Fetcher(val kvStore: KVStore, val loggingTry: Try[Unit] = joinCodecTry.map(codec => { val metaData = codec.conf.join.metaData - val keyCodec = AvroCodec.of(codec.keySchemaStr) - val valueCodec = AvroCodec.of(codec.valueSchemaStr) val samplePercent = if (metaData.isSetSamplePercent) metaData.getSamplePercent else 0 + + // Exit early if sample percent is 0 + if (samplePercent == 0) { + return Response(resp.request, Success(resp.derivedValues)) + } + val keyBytesTry: Try[Array[Byte]] = encode(loggingContext.map(_.withSuffix("encode_key")), codec.keySchema, - keyCodec, + codec.keyCodec, resp.request.keys, cast = true) if (keyBytesTry.isFailure) { @@ -350,7 +354,7 @@ class Fetcher(val kvStore: KVStore, } val valueBytesTry: Try[Array[Byte]] = - encode(loggingContext.map(_.withSuffix("encode_value")), codec.valueSchema, valueCodec, values) + encode(loggingContext.map(_.withSuffix("encode_value")), codec.valueSchema, codec.valueCodec, values) if (valueBytesTry.isFailure) { loggingContext.foreach(_.withSuffix("encode_value").incrementException(valueBytesTry.failed.get)) throw valueBytesTry.failed.get diff --git a/online/src/main/scala/ai/chronon/online/JoinCodec.scala b/online/src/main/scala/ai/chronon/online/JoinCodec.scala index bcb6c25be..be94f9d57 100644 --- a/online/src/main/scala/ai/chronon/online/JoinCodec.scala +++ b/online/src/main/scala/ai/chronon/online/JoinCodec.scala @@ -61,6 +61,8 @@ case class JoinCodec(conf: JoinOps, keySchema: StructType, baseValueSchema: Stru @transient lazy val keySchemaStr: String = AvroConversions.fromChrononSchema(keySchema).toString @transient lazy val valueSchemaStr: String = AvroConversions.fromChrononSchema(valueSchema).toString + def keyCodec: AvroCodec = AvroCodec.of(keySchemaStr) + def valueCodec: AvroCodec = AvroCodec.of(valueSchemaStr) /* * Get the serialized string repr. of the logging schema.