Skip to content

Commit

Permalink
cherry-pick #775
Browse files Browse the repository at this point in the history
  • Loading branch information
Haozhen Ding committed Oct 5, 2024
1 parent 28b0f3d commit c3ff378
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
12 changes: 8 additions & 4 deletions online/src/main/scala/ai/chronon/online/Fetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,16 @@ class Fetcher(val kvStore: KVStore,

val loggingTry: Try[Unit] = joinCodecTry.map(codec => {
val metaData = codec.conf.join.metaData
val keyCodec = AvroCodec.of(codec.keySchemaStr)
val valueCodec = AvroCodec.of(codec.valueSchemaStr)
val samplePercent = if (metaData.isSetSamplePercent) metaData.getSamplePercent else 0

// Exit early if sample percent is 0
if (samplePercent == 0) {
return Response(resp.request, Success(resp.derivedValues))
}

val keyBytesTry: Try[Array[Byte]] = encode(loggingContext.map(_.withSuffix("encode_key")),
codec.keySchema,
keyCodec,
codec.keyCodec,
resp.request.keys,
cast = true)
if (keyBytesTry.isFailure) {
Expand Down Expand Up @@ -350,7 +354,7 @@ class Fetcher(val kvStore: KVStore,
}

val valueBytesTry: Try[Array[Byte]] =
encode(loggingContext.map(_.withSuffix("encode_value")), codec.valueSchema, valueCodec, values)
encode(loggingContext.map(_.withSuffix("encode_value")), codec.valueSchema, codec.valueCodec, values)
if (valueBytesTry.isFailure) {
loggingContext.foreach(_.withSuffix("encode_value").incrementException(valueBytesTry.failed.get))
throw valueBytesTry.failed.get
Expand Down
2 changes: 2 additions & 0 deletions online/src/main/scala/ai/chronon/online/JoinCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ case class JoinCodec(conf: JoinOps, keySchema: StructType, baseValueSchema: Stru

@transient lazy val keySchemaStr: String = AvroConversions.fromChrononSchema(keySchema).toString
@transient lazy val valueSchemaStr: String = AvroConversions.fromChrononSchema(valueSchema).toString
def keyCodec: AvroCodec = AvroCodec.of(keySchemaStr)
def valueCodec: AvroCodec = AvroCodec.of(valueSchemaStr)

/*
* Get the serialized string repr. of the logging schema.
Expand Down

0 comments on commit c3ff378

Please sign in to comment.