Skip to content

Commit

Permalink
Fix thread safety issue in consistency pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrooks-stripe committed Jun 15, 2024
1 parent 68a8b4d commit 12b38ce
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
10 changes: 7 additions & 3 deletions online/src/main/scala/ai/chronon/online/Fetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,8 @@ class Fetcher(val kvStore: KVStore,

val joinName = joinConf.metaData.nameToFilePath
val keySchema = StructType(s"${joinName.sanitize}_key", keyFields.toArray)
val keyCodec = AvroCodec.of(AvroConversions.fromChrononSchema(keySchema).toString)
val baseValueSchema = StructType(s"${joinName.sanitize}_value", valueFields.toArray)
val baseValueCodec = AvroCodec.of(AvroConversions.fromChrononSchema(baseValueSchema).toString)
val joinCodec = JoinCodec(joinConf, keySchema, baseValueSchema, keyCodec, baseValueCodec)
val joinCodec = JoinCodec(joinConf, keySchema, baseValueSchema)
logControlEvent(joinCodec)
joinCodec
}
Expand Down Expand Up @@ -310,6 +308,12 @@ class Fetcher(val kvStore: KVStore,
val loggingTry: Try[Unit] = joinCodecTry.map(codec => {
val metaData = codec.conf.join.metaData
val samplePercent = if (metaData.isSetSamplePercent) metaData.getSamplePercent else 0

// Exit early if sample percent is 0
if (samplePercent == 0) {
return Response(resp.request, Success(resp.derivedValues))
}

val keyBytes = encode(codec.keySchema, codec.keyCodec, resp.request.keys, cast = true)

val hash = if (samplePercent > 0) {
Expand Down
7 changes: 3 additions & 4 deletions online/src/main/scala/ai/chronon/online/JoinCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ import ai.chronon.online.OnlineDerivationUtil.{

case class JoinCodec(conf: JoinOps,
keySchema: StructType,
baseValueSchema: StructType,
keyCodec: AvroCodec,
baseValueCodec: AvroCodec)
baseValueSchema: StructType)
extends Serializable {

@transient lazy val valueSchema: StructType = {
Expand Down Expand Up @@ -65,7 +63,8 @@ case class JoinCodec(conf: JoinOps,
@transient lazy val renameOnlyDeriveFunc: (Map[String, Any], Map[String, Any]) => Map[String, Any] =
buildRenameOnlyDerivationFunction(conf.derivationsScala)

@transient lazy val valueCodec: AvroCodec = AvroCodec.of(AvroConversions.fromChrononSchema(valueSchema).toString)
def valueCodec: AvroCodec = AvroCodec.of(AvroConversions.fromChrononSchema(valueSchema).toString)
def keyCodec: AvroCodec = AvroCodec.of(AvroConversions.fromChrononSchema(keySchema).toString)

/*
* Get the serialized string repr. of the logging schema.
Expand Down

0 comments on commit 12b38ce

Please sign in to comment.