From 39331379ad28f83afa5d5cfb23f75f9fd0b329b7 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Mon, 12 Feb 2024 15:23:04 -0500 Subject: [PATCH 01/21] Add LRU Cache --- build.sbt | 5 +- .../scala/ai/chronon/online/LRUCache.scala | 57 +++++++++++++++++++ .../ai/chronon/online/LRUCacheTest.scala | 37 ++++++++++++ 3 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 online/src/main/scala/ai/chronon/online/LRUCache.scala create mode 100644 online/src/test/scala/ai/chronon/online/LRUCacheTest.scala diff --git a/build.sbt b/build.sbt index ee6fa2428..ed489613f 100644 --- a/build.sbt +++ b/build.sbt @@ -293,7 +293,7 @@ lazy val online = project // statsd 3.0 has local aggregation - TODO: upgrade "com.datadoghq" % "java-dogstatsd-client" % "2.7", "org.rogach" %% "scallop" % "4.0.1", - "net.jodah" % "typetools" % "0.4.1" + "com.github.ben-manes.caffeine" % "caffeine" % "2.9.3" ), libraryDependencies ++= fromMatrix(scalaVersion.value, "spark-all", "scala-parallel-collections", "netty-buffer") ) @@ -308,7 +308,8 @@ lazy val online_unshaded = (project in file("online")) // statsd 3.0 has local aggregation - TODO: upgrade "com.datadoghq" % "java-dogstatsd-client" % "2.7", "org.rogach" %% "scallop" % "4.0.1", - "net.jodah" % "typetools" % "0.4.1" + "net.jodah" % "typetools" % "0.4.1", + "com.github.ben-manes.caffeine" % "caffeine" % "2.9.3" ), libraryDependencies ++= fromMatrix(scalaVersion.value, "jackson", diff --git a/online/src/main/scala/ai/chronon/online/LRUCache.scala b/online/src/main/scala/ai/chronon/online/LRUCache.scala new file mode 100644 index 000000000..0caa2f7e6 --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/LRUCache.scala @@ -0,0 +1,57 @@ +package ai.chronon.online + +import com.github.benmanes.caffeine.cache.{Caffeine, Cache => CaffeineCache} + +/** + * Utility to create a cache with LRU semantics. + * + * The original purpose of having an LRU cache in Chronon is to cache KVStore calls and decoded IRs + * in the Fetcher. This helps decrease to feature serving latency. + */ +object LRUCache { + + /** + * Build a bounded, thread-safe Caffeine cache that stores KEY-VALUE pairs. + * + * @param cacheName Name of the cache + * @param maximumSize Maximum number of entries in the cache + * @tparam KEY The type of the key used to access the cache + * @tparam VALUE The type of the value stored in the cache + * @return Caffeine cache + */ + def apply[KEY <: Object, VALUE <: Object](cacheName: String, maximumSize: Int = 10000): CaffeineCache[KEY, VALUE] = { + buildCaffeineCache[KEY, VALUE](cacheName, maximumSize) + } + + private def buildCaffeineCache[KEY <: Object, VALUE <: Object]( + cacheName: String, + maximumSize: Int = 10000): CaffeineCache[KEY, VALUE] = { + println(s"Chronon Cache build started. cacheName=$cacheName") + val cache: CaffeineCache[KEY, VALUE] = Caffeine + .newBuilder() + .maximumSize(maximumSize) + .recordStats() + .build[KEY, VALUE]() + println(s"Chronon Cache build finished. cacheName=$cacheName") + cache + } + + /** + * Report metrics for a Caffeine cache. The "cache" tag is added to all metrics. + * + * @param metricsContext Metrics.Context for recording metrics + * @param cache Caffeine cache to get metrics from + * @param cacheName Cache name for tagging + */ + def collectCaffeineCacheMetrics(metricsContext: Metrics.Context, + cache: CaffeineCache[_, _], + cacheName: String): Unit = { + val stats = cache.stats() + metricsContext.gauge(s"$cacheName.hits", stats.hitCount()) + metricsContext.gauge(s"$cacheName.misses", stats.missCount()) + metricsContext.gauge(s"$cacheName.evictions", stats.evictionCount()) + metricsContext.gauge(s"$cacheName.loads", stats.loadCount()) + metricsContext.gauge(s"$cacheName.hit_rate", stats.hitRate()) + metricsContext.gauge(s"$cacheName.average_load_penalty", stats.averageLoadPenalty()) + } +} diff --git a/online/src/test/scala/ai/chronon/online/LRUCacheTest.scala b/online/src/test/scala/ai/chronon/online/LRUCacheTest.scala new file mode 100644 index 000000000..bcb5060a6 --- /dev/null +++ b/online/src/test/scala/ai/chronon/online/LRUCacheTest.scala @@ -0,0 +1,37 @@ +package ai.chronon.online + +import org.junit.Test +import com.github.benmanes.caffeine.cache.{Cache => CaffeineCache} +import ai.chronon.online.LRUCache + +import scala.collection.JavaConverters._ + +class LRUCacheTest { + val testCache: CaffeineCache[String, String] = LRUCache[String, String]("testCache") + + @Test + def testGetsNothingWhenThereIsNothing(): Unit = { + assert(testCache.getIfPresent("key") == null) + assert(testCache.estimatedSize() == 0) + } + + @Test + def testGetsSomethingWhenThereIsSomething(): Unit = { + assert(testCache.getIfPresent("key") == null) + testCache.put("key", "value") + assert(testCache.getIfPresent("key") == "value") + assert(testCache.estimatedSize() == 1) + } + + @Test + def testEvictsWhenSomethingIsSet(): Unit = { + assert(testCache.estimatedSize() == 0) + assert(testCache.getIfPresent("key") == null) + testCache.put("key", "value") + assert(testCache.estimatedSize() == 1) + assert(testCache.getIfPresent("key") == "value") + testCache.invalidate("key") + assert(testCache.estimatedSize() == 0) + assert(testCache.getIfPresent("key") == null) + } +} From ab1d2b3c43ca507ddd4403c371babeeeb363684b Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Mon, 12 Feb 2024 15:31:45 -0500 Subject: [PATCH 02/21] Double gauge --- online/src/main/scala/ai/chronon/online/Metrics.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/online/src/main/scala/ai/chronon/online/Metrics.scala b/online/src/main/scala/ai/chronon/online/Metrics.scala index 612619e27..48318b705 100644 --- a/online/src/main/scala/ai/chronon/online/Metrics.scala +++ b/online/src/main/scala/ai/chronon/online/Metrics.scala @@ -196,6 +196,8 @@ object Metrics { def gauge(metric: String, value: Long): Unit = stats.gauge(prefix(metric), value, tags) + def gauge(metric: String, value: Double): Unit = stats.gauge(prefix(metric), value, tags) + def toTags: Array[String] = { val joinNames: Array[String] = Option(join).map(_.split(",")).getOrElse(Array.empty[String]).map(_.sanitize) assert( From 355f1019ef80ac91fa2274368bd67bc83309509e Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Mon, 12 Feb 2024 15:49:29 -0500 Subject: [PATCH 03/21] Revert accidental build sbt change --- build.sbt | 1 + 1 file changed, 1 insertion(+) diff --git a/build.sbt b/build.sbt index ed489613f..8f4e3cd7b 100644 --- a/build.sbt +++ b/build.sbt @@ -293,6 +293,7 @@ lazy val online = project // statsd 3.0 has local aggregation - TODO: upgrade "com.datadoghq" % "java-dogstatsd-client" % "2.7", "org.rogach" %% "scallop" % "4.0.1", + "net.jodah" % "typetools" % "0.4.1", "com.github.ben-manes.caffeine" % "caffeine" % "2.9.3" ), libraryDependencies ++= fromMatrix(scalaVersion.value, "spark-all", "scala-parallel-collections", "netty-buffer") From b52d98880f543d5ff654921939169700bbafc1c1 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Mon, 12 Feb 2024 16:07:01 -0500 Subject: [PATCH 04/21] Move GroupByRequestMeta to object --- .../scala/ai/chronon/online/FetcherBase.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index 345303112..9a4640b49 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -50,14 +50,7 @@ class FetcherBase(kvStore: KVStore, timeoutMillis: Long = 10000, debug: Boolean = false) extends MetadataStore(kvStore, metaDataSet, timeoutMillis) { - - private case class GroupByRequestMeta( - groupByServingInfoParsed: GroupByServingInfoParsed, - batchRequest: GetRequest, - streamingRequestOpt: Option[GetRequest], - endTs: Option[Long], - context: Metrics.Context - ) + import FetcherBase._ // a groupBy request is split into batchRequest and optionally a streamingRequest // this method decodes bytes (of the appropriate avro schema) into chronon rows aggregates further if necessary @@ -535,3 +528,13 @@ class FetcherBase(kvStore: KVStore, } } } + +object FetcherBase { + private[online] case class GroupByRequestMeta( + groupByServingInfoParsed: GroupByServingInfoParsed, + batchRequest: GetRequest, + streamingRequestOpt: Option[GetRequest], + endTs: Option[Long], + context: Metrics.Context + ) +} From a28ce288b8c78ef52fbada50fa3b01ba01f546a7 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Mon, 12 Feb 2024 16:18:12 -0500 Subject: [PATCH 05/21] Add FetcherCache and tests --- .../ai/chronon/online/FetcherCache.scala | 214 ++++++++++ .../ai/chronon/online/FetcherCacheTest.scala | 381 ++++++++++++++++++ 2 files changed, 595 insertions(+) create mode 100644 online/src/main/scala/ai/chronon/online/FetcherCache.scala create mode 100644 online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala diff --git a/online/src/main/scala/ai/chronon/online/FetcherCache.scala b/online/src/main/scala/ai/chronon/online/FetcherCache.scala new file mode 100644 index 000000000..b67fd57d9 --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/FetcherCache.scala @@ -0,0 +1,214 @@ +package ai.chronon.online + +import ai.chronon.aggregator.windowing.FinalBatchIr +import ai.chronon.api.Extensions.MetadataOps +import ai.chronon.api.GroupBy +import ai.chronon.online.FetcherBase.GroupByRequestMeta +import ai.chronon.online.Fetcher.Request +import ai.chronon.online.FetcherCache.{ + BatchIrCache, + BatchResponses, + CachedBatchResponse, + CachedFinalIrBatchResponse, + CachedMapBatchResponse, + KvStoreBatchResponse +} +import ai.chronon.online.KVStore.{GetRequest, TimedValue} +import com.github.benmanes.caffeine.cache.{Cache => CaffeineCache} + +import scala.util.{Success, Try} +import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters.mapAsScalaConcurrentMapConverter + +/* + * FetcherCache is an extension to FetcherBase that provides caching functionality. It caches KV store + * requests to decrease feature serving latency. + * */ +trait FetcherCache { + val batchIrCacheName = "batch_cache" + val maybeBatchIrCache: Option[BatchIrCache] = + Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size")) + .map(size => new BatchIrCache(batchIrCacheName, size.toInt)) + .orElse(None) + + def isCacheSizeConfigured: Boolean = maybeBatchIrCache.isDefined + + // Memoize which GroupBys have caching enabled + private[online] val isCachingEnabledForGroupBy: collection.concurrent.Map[String, Boolean] = + new ConcurrentHashMap[String, Boolean]().asScala + + def isCachingEnabled(groupBy: GroupBy): Boolean = { + if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false + + val groupByName = groupBy.getMetaData.getName + isCachingEnabledForGroupBy.getOrElse( + groupByName, { + groupBy.getMetaData.customJsonLookUp("enable_caching") match { + case b: Boolean => + println(s"Caching is ${if (b) "enabled" else "disabled"} for $groupByName") + isCachingEnabledForGroupBy.putIfAbsent(groupByName, b) + b + case null => + println(s"Caching is disabled for $groupByName, enable_caching is not set.") + isCachingEnabledForGroupBy.putIfAbsent(groupByName, false) + false + case _ => false + } + } + ) + } + + protected val caffeineMetricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.JoinFetching) + + /** + * Obtain the Map[String, AnyRef] response from a batch response. + * + * If batch IR caching is enabled, this method will try to fetch the IR from the cache. If it's not in the cache, + * it will decode it from the batch bytes and store it. + * + * @param batchResponses the batch responses + * @param batchBytes the batch bytes corresponding to the batchResponses. Can be `null`. + * @param servingInfo the GroupByServingInfoParsed that contains the info to decode the bytes + * @param decodingFunction the function to decode bytes into Map[String, AnyRef] + * @param keys the keys used to fetch this particular batch response, for caching purposes + */ + private[online] def getMapResponseFromBatchResponse(batchResponses: BatchResponses, + batchBytes: Array[Byte], + decodingFunction: Array[Byte] => Map[String, AnyRef], + servingInfo: GroupByServingInfoParsed, + keys: Map[String, Any]): Map[String, AnyRef] = { + if (!isCachingEnabled(servingInfo.groupBy)) return decodingFunction(batchBytes) + + batchResponses match { + case _: KvStoreBatchResponse => + val batchRequestCacheKey = + BatchIrCache.Key(servingInfo.groupByOps.batchDataset, keys, servingInfo.batchEndTsMillis) + val decodedBytes = decodingFunction(batchBytes) + if (decodedBytes != null) + maybeBatchIrCache.get.cache.put(batchRequestCacheKey, CachedMapBatchResponse(decodedBytes)) + decodedBytes + case cachedResponse: CachedBatchResponse => + cachedResponse match { + case CachedFinalIrBatchResponse(_: FinalBatchIr) => decodingFunction(batchBytes) + case CachedMapBatchResponse(mapResponse: Map[String, AnyRef]) => mapResponse + } + } + } + + /** + * Obtain the FinalBatchIr from a batch response. + * + * If batch IR caching is enabled, this method will try to fetch the IR from the cache. If it's not in the cache, + * it will decode it from the batch bytes and store it. + * + * @param batchResponses the batch responses + * @param batchBytes the batch bytes corresponding to the batchResponses. Can be `null`. + * @param servingInfo the GroupByServingInfoParsed that contains the info to decode the bytes + * @param decodingFunction the function to decode bytes into FinalBatchIr + * @param keys the keys used to fetch this particular batch response, for caching purposes + */ + private[online] def getBatchIrFromBatchResponse( + batchResponses: BatchResponses, + batchBytes: Array[Byte], + servingInfo: GroupByServingInfoParsed, + decodingFunction: (Array[Byte], GroupByServingInfoParsed) => FinalBatchIr, + keys: Map[String, Any]): FinalBatchIr = { + if (!isCachingEnabled(servingInfo.groupBy)) return decodingFunction(batchBytes, servingInfo) + + batchResponses match { + case _: KvStoreBatchResponse => + val batchRequestCacheKey = + BatchIrCache.Key(servingInfo.groupByOps.batchDataset, keys, servingInfo.batchEndTsMillis) + val decodedBytes = decodingFunction(batchBytes, servingInfo) + if (decodedBytes != null) + maybeBatchIrCache.get.cache.put(batchRequestCacheKey, CachedFinalIrBatchResponse(decodedBytes)) + decodedBytes + case cachedResponse: CachedBatchResponse => + cachedResponse match { + case CachedFinalIrBatchResponse(finalBatchIr: FinalBatchIr) => finalBatchIr + case CachedMapBatchResponse(_: Map[String, AnyRef]) => decodingFunction(batchBytes, servingInfo) + } + } + } + + /** + * Given a list of GetRequests, return a map of GetRequests to cached FinalBatchIrs. + */ + def getCachedRequests( + groupByRequestToKvRequest: Seq[(Request, Try[GroupByRequestMeta])]): Map[GetRequest, CachedBatchResponse] = { + if (!isCacheSizeConfigured) return Map.empty + + groupByRequestToKvRequest + .map { + case (request, Success(GroupByRequestMeta(servingInfo, batchRequest, _, _, _))) => + if (!isCachingEnabled(servingInfo.groupBy)) { Map.empty } + else { + val batchRequestCacheKey = + BatchIrCache.Key(batchRequest.dataset, request.keys, servingInfo.batchEndTsMillis) + + // Metrics so we can get per-groupby cache metrics + val metricsContext = + request.context.getOrElse(Metrics.Context(Metrics.Environment.JoinFetching, servingInfo.groupBy)) + + maybeBatchIrCache.get.cache.getIfPresent(batchRequestCacheKey) match { + case null => + metricsContext.increment(s"${batchIrCacheName}_gb_misses") + val emptyMap: Map[GetRequest, CachedBatchResponse] = Map.empty + emptyMap + case cachedIr: CachedBatchResponse => + metricsContext.increment(s"${batchIrCacheName}_gb_hits") + Map(batchRequest -> cachedIr) + } + } + case _ => + val emptyMap: Map[GetRequest, CachedBatchResponse] = Map.empty + emptyMap + } + .foldLeft(Map.empty[GetRequest, CachedBatchResponse])(_ ++ _) + } +} + +object FetcherCache { + private[online] class BatchIrCache(val cacheName: String, val maximumSize: Int = 10000) { + import BatchIrCache._ + + val cache: CaffeineCache[Key, Value] = + LRUCache[Key, Value](cacheName = cacheName, maximumSize = maximumSize) + } + + private[online] object BatchIrCache { + // We use the dataset, keys, and batchEndTsMillis to identify a batch request. + // There's one edge case to be aware of: if a batch job is re-run in the same day, the batchEndTsMillis will + // be the same but the underlying data may have have changed. If that new batch data is needed immediately, the + // Fetcher service should be restarted. + case class Key(dataset: String, keys: Map[String, Any], batchEndTsMillis: Long) + + // FinalBatchIr is for GroupBys using temporally accurate aggregation. + // Map[String, Any] is for GroupBys using snapshot accurate aggregation or no aggregation. + type Value = BatchResponses + } + + // BatchResponses encapsulates either a batch response from kv store or a cached batch response. + sealed abstract class BatchResponses { + def getBatchBytes(batchEndTsMillis: Long): Array[Byte] + } + object BatchResponses { + def apply(kvStoreResponse: Try[Seq[TimedValue]]): KvStoreBatchResponse = KvStoreBatchResponse(kvStoreResponse) + def apply(cachedResponse: FinalBatchIr): CachedFinalIrBatchResponse = CachedFinalIrBatchResponse(cachedResponse) + def apply(cachedResponse: Map[String, AnyRef]): CachedMapBatchResponse = CachedMapBatchResponse(cachedResponse) + } + case class KvStoreBatchResponse(response: Try[Seq[TimedValue]]) extends BatchResponses { + def getBatchBytes(batchEndTsMillis: Long): Array[Byte] = + response + .map(_.maxBy(_.millis)) + .filter(_.millis >= batchEndTsMillis) + .map(_.bytes) + .getOrElse(null) + } + sealed abstract class CachedBatchResponse extends BatchResponses { + // This is the case where we don't have bytes because the decoded IR was cached so we didn't hit the KV store again. + def getBatchBytes(batchEndTsMillis: Long): Null = null + } + case class CachedFinalIrBatchResponse(response: FinalBatchIr) extends CachedBatchResponse + case class CachedMapBatchResponse(response: Map[String, AnyRef]) extends CachedBatchResponse +} diff --git a/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala b/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala new file mode 100644 index 000000000..bb476b2f0 --- /dev/null +++ b/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala @@ -0,0 +1,381 @@ +package ai.chronon.online + +import ai.chronon.aggregator.windowing.FinalBatchIr +import ai.chronon.api.Extensions.GroupByOps +import ai.chronon.api.{Builders, GroupBy} +import ai.chronon.online.FetcherBase._ +import ai.chronon.online.{AvroCodec, FetcherCache, GroupByServingInfoParsed, KVStore, Metrics} +import ai.chronon.online.Fetcher.Request +import ai.chronon.online.FetcherCache.{BatchIrCache, BatchResponses, CachedMapBatchResponse} +import ai.chronon.online.KVStore.TimedValue +import ai.chronon.online.Metrics.Context +import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertNull, assertTrue, fail} +import org.junit.Test +import org.mockito.Mockito._ +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito +import org.mockito.stubbing.Stubber +import org.scalatestplus.mockito.MockitoSugar + +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + +trait MockitoHelper extends MockitoSugar { + // Overriding doReturn to fix a known Java/Scala interoperability issue when using Mockito. ("doReturn: ambiguous + // reference to overloaded definition"). An alternative would be to use the 'mockito-scala' library. + def doReturn(toBeReturned: Any): Stubber = { + Mockito.doReturn(toBeReturned, Nil: _*) + } +} + +class FetcherCacheTest extends MockitoHelper { + class TestableFetcherCache(cache: Option[BatchIrCache]) extends FetcherCache { + override val maybeBatchIrCache: Option[BatchIrCache] = cache + } + val batchIrCacheMaximumSize = 50 + + @Test + def test_BatchIrCache_CorrectlyCachesBatchIrs(): Unit = { + val cacheName = "test" + val batchIrCache = new BatchIrCache(cacheName, batchIrCacheMaximumSize) + val dataset = "TEST_GROUPBY_BATCH" + val batchEndTsMillis = 1000L + + def createBatchir(i: Int) = + BatchResponses(FinalBatchIr(collapsed = Array(i), tailHops = Array(Array(Array(i)), Array(Array(i))))) + def createCacheKey(i: Int) = BatchIrCache.Key(dataset, Map("key" -> i), batchEndTsMillis) + + // Create a bunch of test batchIrs and store them in cache + val batchIrs: Map[BatchIrCache.Key, BatchIrCache.Value] = + (0 until batchIrCacheMaximumSize).map(i => createCacheKey(i) -> createBatchir(i)).toMap + batchIrCache.cache.putAll(batchIrs.asJava) + + // Check that the cache contains all the batchIrs we created + batchIrs.foreach(entry => { + val cachedBatchIr = batchIrCache.cache.getIfPresent(entry._1) + assertEquals(cachedBatchIr, entry._2) + }) + } + + @Test + def test_BatchIrCache_CorrectlyCachesMapResponse(): Unit = { + val cacheName = "test" + val batchIrCache = new BatchIrCache(cacheName, batchIrCacheMaximumSize) + val dataset = "TEST_GROUPBY_BATCH" + val batchEndTsMillis = 1000L + + def createMapResponse(i: Int) = + BatchResponses(Map("group_by_key" -> i.asInstanceOf[AnyRef])) + def createCacheKey(i: Int) = BatchIrCache.Key(dataset, Map("key" -> i), batchEndTsMillis) + + // Create a bunch of test mapResponses and store them in cache + val mapResponses: Map[BatchIrCache.Key, BatchIrCache.Value] = + (0 until batchIrCacheMaximumSize).map(i => createCacheKey(i) -> createMapResponse(i)).toMap + batchIrCache.cache.putAll(mapResponses.asJava) + + // Check that the cache contains all the mapResponses we created + mapResponses.foreach(entry => { + val cachedBatchIr = batchIrCache.cache.getIfPresent(entry._1) + assertEquals(cachedBatchIr, entry._2) + }) + } + + // Test that the cache keys are compared by equality, not by reference. In practice, this means that if two keys + // have the same (dataset, keys, batchEndTsMillis), they will only be stored once in the cache. + @Test + def test_BatchIrCache_KeysAreComparedByEquality(): Unit = { + val cacheName = "test" + val batchIrCache = new BatchIrCache(cacheName, batchIrCacheMaximumSize) + + val dataset = "TEST_GROUPBY_BATCH" + val batchEndTsMillis = 1000L + + def createCacheValue(i: Int) = + BatchResponses(FinalBatchIr(collapsed = Array(i), tailHops = Array(Array(Array(i)), Array(Array(i))))) + def createCacheKey(i: Int) = BatchIrCache.Key(dataset, Map("key" -> i), batchEndTsMillis) + + assert(batchIrCache.cache.estimatedSize() == 0) + batchIrCache.cache.put(createCacheKey(1), createCacheValue(1)) + assert(batchIrCache.cache.estimatedSize() == 1) + // Create a second key object with the same values as the first key, make sure it's not stored separately + batchIrCache.cache.put(createCacheKey(1), createCacheValue(1)) + assert(batchIrCache.cache.estimatedSize() == 1) + } + + @Test + def test_getCachedRequests_ReturnsCorrectCachedDataWhenCacheIsEnabled(): Unit = { + val cacheName = "test" + val testCache = Some(new BatchIrCache(cacheName, batchIrCacheMaximumSize)) + val fetcherCache = new TestableFetcherCache(testCache) { + override def isCachingEnabled(groupBy: GroupBy) = true + } + + // Prepare groupByRequestToKvRequest + val batchEndTsMillis = 0L + val keys = Map("key" -> "value") + val eventTs = 1000L + val dataset = "TEST_GROUPBY_BATCH" + val mockGroupByServingInfoParsed = mock[GroupByServingInfoParsed] + val mockContext = mock[Metrics.Context] + val request = Request("req_name", keys, Some(eventTs), Some(mock[Context])) + val getRequest = KVStore.GetRequest("key".getBytes, dataset, Some(eventTs)) + val requestMeta = + GroupByRequestMeta(mockGroupByServingInfoParsed, getRequest, Some(getRequest), Some(eventTs), mockContext) + val groupByRequestToKvRequest: Seq[(Request, Try[GroupByRequestMeta])] = Seq((request, Success(requestMeta))) + + // getCachedRequests should return an empty list when the cache is empty + val cachedRequestBeforePopulating = fetcherCache.getCachedRequests(groupByRequestToKvRequest) + assert(cachedRequestBeforePopulating.isEmpty) + + // Add a GetRequest and a FinalBatchIr + val key = BatchIrCache.Key(getRequest.dataset, keys, batchEndTsMillis) + val finalBatchIr = BatchResponses(FinalBatchIr(Array(1), Array(Array(Array(1)), Array(Array(1))))) + testCache.get.cache.put(key, finalBatchIr) + + // getCachedRequests should return the GetRequest and FinalBatchIr we cached + val cachedRequestsAfterAddingItem = fetcherCache.getCachedRequests(groupByRequestToKvRequest) + assert(cachedRequestsAfterAddingItem.head._1 == getRequest) + assert(cachedRequestsAfterAddingItem.head._2 == finalBatchIr) + } + + @Test + def test_getCachedRequests_DoesNotCacheWhenCacheIsDisabledForGroupBy(): Unit = { + val testCache = new BatchIrCache("test", batchIrCacheMaximumSize) + val spiedTestCache = spy(testCache) + val fetcherCache = new TestableFetcherCache(Some(testCache)) { + // Cache is enabled globally, but disabled for a specific groupBy + override def isCachingEnabled(groupBy: GroupBy) = false + } + + // Prepare groupByRequestToKvRequest + val keys = Map("key" -> "value") + val eventTs = 1000L + val dataset = "TEST_GROUPBY_BATCH" + val mockGroupByServingInfoParsed = mock[GroupByServingInfoParsed] + val mockContext = mock[Metrics.Context] + val request = Request("req_name", keys, Some(eventTs)) + val getRequest = KVStore.GetRequest("key".getBytes, dataset, Some(eventTs)) + val requestMeta = + GroupByRequestMeta(mockGroupByServingInfoParsed, getRequest, Some(getRequest), Some(eventTs), mockContext) + val groupByRequestToKvRequest: Seq[(Request, Try[GroupByRequestMeta])] = Seq((request, Success(requestMeta))) + + val cachedRequests = fetcherCache.getCachedRequests(groupByRequestToKvRequest) + assert(cachedRequests.isEmpty) + // Cache was never called + verify(spiedTestCache, never()).cache + } + + @Test + def test_getBatchBytes_ReturnsLatestTimedValueBytesIfGreaterThanBatchEnd(): Unit = { + val kvStoreResponse = Success( + Seq(TimedValue(Array(1.toByte), 1000L), TimedValue(Array(2.toByte), 2000L)) + ) + val batchResponses = BatchResponses(kvStoreResponse) + val batchBytes = batchResponses.getBatchBytes(1500L) + assertArrayEquals(Array(2.toByte), batchBytes) + } + + @Test + def test_getBatchBytes_ReturnsNullIfLatestTimedValueTimestampIsLessThanBatchEnd(): Unit = { + val kvStoreResponse = Success( + Seq(TimedValue(Array(1.toByte), 1000L), TimedValue(Array(2.toByte), 1500L)) + ) + val batchResponses = BatchResponses(kvStoreResponse) + val batchBytes = batchResponses.getBatchBytes(2000L) + assertNull(batchBytes) + } + + @Test + def test_getBatchBytes_ReturnsNullWhenCachedBatchResponse(): Unit = { + val finalBatchIr = mock[FinalBatchIr] + val batchResponses = BatchResponses(finalBatchIr) + val batchBytes = batchResponses.getBatchBytes(1000L) + assertNull(batchBytes) + } + + @Test + def test_getBatchBytes_ReturnsNullWhenKvStoreBatchResponseFails(): Unit = { + val kvStoreResponse = Failure(new RuntimeException("KV Store error")) + val batchResponses = BatchResponses(kvStoreResponse) + val batchBytes = batchResponses.getBatchBytes(1000L) + assertNull(batchBytes) + } + + @Test + def test_getBatchIrFromBatchResponse_ReturnsCorrectIRsWithCacheEnabled(): Unit = { + // Use a real cache + val batchIrCache = new BatchIrCache("test_cache", batchIrCacheMaximumSize) + + // Create all necessary mocks + val servingInfo = mock[GroupByServingInfoParsed] + val groupByOps = mock[GroupByOps] + val toBatchIr = mock[(Array[Byte], GroupByServingInfoParsed) => FinalBatchIr] + when(servingInfo.groupByOps).thenReturn(groupByOps) + when(groupByOps.batchDataset).thenReturn("test_dataset") + when(servingInfo.groupByOps.batchDataset).thenReturn("test_dataset") + when(servingInfo.batchEndTsMillis).thenReturn(1000L) + + // Dummy data + val batchBytes = Array[Byte](1, 1) + val keys = Map("key" -> "value") + val cacheKey = BatchIrCache.Key(servingInfo.groupByOps.batchDataset, keys, servingInfo.batchEndTsMillis) + + val fetcherCache = new TestableFetcherCache(Some(batchIrCache)) + val spiedFetcherCache = Mockito.spy(fetcherCache) + doReturn(true).when(spiedFetcherCache).isCachingEnabled(any()) + + // 1. Cached BatchResponse returns the same IRs passed in + val finalBatchIr1 = mock[FinalBatchIr] + val cachedBatchResponse = BatchResponses(finalBatchIr1) + val cachedIr = + spiedFetcherCache.getBatchIrFromBatchResponse(cachedBatchResponse, batchBytes, servingInfo, toBatchIr, keys) + assertEquals(finalBatchIr1, cachedIr) + verify(toBatchIr, never())(any(classOf[Array[Byte]]), any()) // no decoding needed + + // 2. Un-cached BatchResponse has IRs added to cache + val finalBatchIr2 = mock[FinalBatchIr] + val kvStoreBatchResponses = BatchResponses(Success(Seq(TimedValue(batchBytes, 1000L)))) + when(toBatchIr(any(), any())).thenReturn(finalBatchIr2) + val uncachedIr = + spiedFetcherCache.getBatchIrFromBatchResponse(kvStoreBatchResponses, batchBytes, servingInfo, toBatchIr, keys) + assertEquals(finalBatchIr2, uncachedIr) + assertEquals(batchIrCache.cache.getIfPresent(cacheKey), BatchResponses(finalBatchIr2)) // key was added + verify(toBatchIr, times(1))(any(), any()) // decoding did happen + } + + @Test + def test_getBatchIrFromBatchResponse_DecodesBatchBytesIfCacheDisabled(): Unit = { + // Set up mocks and dummy data + val servingInfo = mock[GroupByServingInfoParsed] + val batchBytes = Array[Byte](1, 2, 3) + val keys = Map("key" -> "value") + val finalBatchIr = mock[FinalBatchIr] + val toBatchIr = mock[(Array[Byte], GroupByServingInfoParsed) => FinalBatchIr] + val kvStoreBatchResponses = BatchResponses(Success(Seq(TimedValue(batchBytes, 1000L)))) + + val spiedFetcherCache = Mockito.spy(new TestableFetcherCache(None)) + when(toBatchIr(any(), any())).thenReturn(finalBatchIr) + + // When getBatchIrFromBatchResponse is called, it decodes the bytes and doesn't hit the cache + val ir = + spiedFetcherCache.getBatchIrFromBatchResponse(kvStoreBatchResponses, batchBytes, servingInfo, toBatchIr, keys) + verify(toBatchIr, times(1))(batchBytes, servingInfo) // decoding did happen + assertEquals(finalBatchIr, ir) + } + + @Test + def test_getBatchIrFromBatchResponse_ReturnsCorrectMapResponseWithCacheEnabled(): Unit = { + // Use a real cache + val batchIrCache = new BatchIrCache("test_cache", batchIrCacheMaximumSize) + // Set up mocks and dummy data + val servingInfo = mock[GroupByServingInfoParsed] + val groupByOps = mock[GroupByOps] + val outputCodec = mock[AvroCodec] + when(servingInfo.groupByOps).thenReturn(groupByOps) + when(groupByOps.batchDataset).thenReturn("test_dataset") + when(servingInfo.groupByOps.batchDataset).thenReturn("test_dataset") + when(servingInfo.batchEndTsMillis).thenReturn(1000L) + val batchBytes = Array[Byte](1, 2, 3) + val keys = Map("key" -> "value") + val cacheKey = BatchIrCache.Key(servingInfo.groupByOps.batchDataset, keys, servingInfo.batchEndTsMillis) + + val spiedFetcherCache = Mockito.spy(new TestableFetcherCache(Some(batchIrCache))) + doReturn(true).when(spiedFetcherCache).isCachingEnabled(any()) + + // 1. Cached BatchResponse returns the same Map responses passed in + val mapResponse1 = mock[Map[String, AnyRef]] + val cachedBatchResponse = BatchResponses(mapResponse1) + val decodingFunction1 = (bytes: Array[Byte]) => { + fail("Decoding function should not be called when batch response is cached") + mapResponse1 + } + val cachedMapResponse = spiedFetcherCache.getMapResponseFromBatchResponse(cachedBatchResponse, + batchBytes, + decodingFunction1, + servingInfo, + keys) + assertEquals(mapResponse1, cachedMapResponse) + + // 2. Un-cached BatchResponse has Map responses added to cache + val mapResponse2 = mock[Map[String, AnyRef]] + val kvStoreBatchResponses = BatchResponses(Success(Seq(TimedValue(batchBytes, 1000L)))) + def decodingFunction2 = (bytes: Array[Byte]) => mapResponse2 + val decodedMapResponse = spiedFetcherCache.getMapResponseFromBatchResponse(kvStoreBatchResponses, + batchBytes, + decodingFunction2, + servingInfo, + keys) + assertEquals(mapResponse2, decodedMapResponse) + assertEquals(batchIrCache.cache.getIfPresent(cacheKey), CachedMapBatchResponse(mapResponse2)) // key was added + } + + @Test + def test_getMapResponseFromBatchResponse_DecodesBatchBytesIfCacheDisabled(): Unit = { + // Set up mocks and dummy data + val servingInfo = mock[GroupByServingInfoParsed] + val batchBytes = Array[Byte](1, 2, 3) + val keys = Map("key" -> "value") + val mapResponse = mock[Map[String, AnyRef]] + val outputCodec = mock[AvroCodec] + val kvStoreBatchResponses = BatchResponses(Success(Seq(TimedValue(batchBytes, 1000L)))) + when(servingInfo.outputCodec).thenReturn(outputCodec) + when(outputCodec.decodeMap(any())).thenReturn(mapResponse) + + val spiedFetcherCache = Mockito.spy(new TestableFetcherCache(None)) + + // When getMapResponseFromBatchResponse is called, it decodes the bytes and doesn't hit the cache + val decodedMapResponse = spiedFetcherCache.getMapResponseFromBatchResponse(kvStoreBatchResponses, + batchBytes, + servingInfo.outputCodec.decodeMap, + servingInfo, + keys) + verify(servingInfo.outputCodec.decodeMap(any()), times(1)) // decoding did happen + assertEquals(mapResponse, decodedMapResponse) + } + + @Test + def test_isCachingEnabled_CorrectlyDetermineIfCacheIsEnabled(): Unit = { + val baseFetcher = new TestableFetcherCache(Some(new BatchIrCache("test", batchIrCacheMaximumSize))) + def buildGroupByWithCustomJson(name: String, customJson: String = null): GroupBy = + Builders.GroupBy( + metaData = Builders.MetaData(name = name, customJson = customJson) + ) + + assertFalse(baseFetcher.isCachingEnabled(buildGroupByWithCustomJson("test_groupby_1"))) + assertFalse(baseFetcher.isCachingEnabled(buildGroupByWithCustomJson("test_groupby_2", "{}"))) + assertTrue( + baseFetcher + .isCachingEnabled(buildGroupByWithCustomJson("test_groupby_3", "{\"enable_caching\": true}")) + ) + assertFalse( + baseFetcher + .isCachingEnabled(buildGroupByWithCustomJson("test_groupby_4", "{\"enable_caching\": false}")) + ) + assertFalse( + baseFetcher + .isCachingEnabled( + buildGroupByWithCustomJson("test_groupby_5", "{\"enable_caching\": \"string instead of bool\"}") + ) + ) + } + + @Test + def test_isCachingEnabled_Memoizes(): Unit = { + val baseFetcher = new TestableFetcherCache(Some(new BatchIrCache("test", batchIrCacheMaximumSize))) + def buildGroupByWithCustomJson(name: String, customJson: String = null): GroupBy = + Builders.GroupBy( + metaData = Builders.MetaData(name = name, customJson = customJson) + ) + + // the map is clean at the start + assertFalse(baseFetcher.isCachingEnabledForGroupBy.contains("test_memo_gb_1")) + assertFalse(baseFetcher.isCachingEnabledForGroupBy.contains("test_memo_gb_2")) + + // memoization works for both true and false + baseFetcher.isCachingEnabled(buildGroupByWithCustomJson("test_memo_gb_1")) + assertFalse(baseFetcher.isCachingEnabledForGroupBy("test_memo_gb_1")) + + baseFetcher.isCachingEnabled(buildGroupByWithCustomJson("test_memo_gb_2", "{\"enable_caching\": true}")) + assertTrue(baseFetcher.isCachingEnabledForGroupBy("test_memo_gb_2")) + } +} From d7801bbff3e595ff1ad3361eaba62a3128d5bbad Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Wed, 21 Feb 2024 11:44:09 -0500 Subject: [PATCH 06/21] Update FetcherBase to use cache --- .../scala/ai/chronon/online/FetcherBase.scala | 227 ++++++++++++------ 1 file changed, 159 insertions(+), 68 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index 9a4640b49..a8297593b 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -22,6 +22,7 @@ import ai.chronon.aggregator.windowing.{FinalBatchIr, SawtoothOnlineAggregator, import ai.chronon.api.Constants.ChrononMetadataKey import ai.chronon.api._ import ai.chronon.online.Fetcher.{ColumnSpec, PrefixedRequest, Request, Response} +import ai.chronon.online.FetcherCache.{BatchResponses, CachedBatchResponse, KvStoreBatchResponse} import ai.chronon.online.KVStore.{GetRequest, GetResponse, TimedValue} import ai.chronon.online.Metrics.Name import ai.chronon.api.Extensions.{DerivationOps, GroupByOps, JoinOps, ThrowableOps} @@ -49,36 +50,57 @@ class FetcherBase(kvStore: KVStore, metaDataSet: String = ChrononMetadataKey, timeoutMillis: Long = 10000, debug: Boolean = false) - extends MetadataStore(kvStore, metaDataSet, timeoutMillis) { + extends MetadataStore(kvStore, metaDataSet, timeoutMillis) + with FetcherCache { import FetcherBase._ - // a groupBy request is split into batchRequest and optionally a streamingRequest - // this method decodes bytes (of the appropriate avro schema) into chronon rows aggregates further if necessary - private def constructGroupByResponse(batchResponsesTry: Try[Seq[TimedValue]], + /** + * A groupBy request is split into batchRequest and optionally a streamingRequest. This method decodes bytes + * (of the appropriate avro schema) into chronon rows aggregates further if necessary. + * + * @param batchResponses a BatchResponses, which encapsulates either a response from kv store or a cached batch IR. + * @param streamingResponsesOpt a response from kv store, if the GroupBy was streaming data. + * @param oldServingInfo the GroupByServingInfo used to fetch the GroupBys. + * @param queryTimeMs the Request timestamp + * @param startTimeMs time when we started fetching the KV store + * @param overallLatency the time it took to get the values from the KV store + * @param context the Metrics.Context to use for recording metrics + * @param totalResponseValueBytes the total size of the response from the KV store + * @param keys the keys used to fetch the GroupBy + * @return + */ + private def constructGroupByResponse(batchResponses: BatchResponses, streamingResponsesOpt: Option[Seq[TimedValue]], oldServingInfo: GroupByServingInfoParsed, queryTimeMs: Long, startTimeMs: Long, overallLatency: Long, context: Metrics.Context, - totalResponseValueBytes: Int): Map[String, AnyRef] = { - val latestBatchValue = batchResponsesTry.map(_.maxBy(_.millis)) - val servingInfo = - latestBatchValue.map(timedVal => updateServingInfo(timedVal.millis, oldServingInfo)).getOrElse(oldServingInfo) - batchResponsesTry.map { - reportKvResponse(context.withSuffix("batch"), _, queryTimeMs, overallLatency, totalResponseValueBytes) + totalResponseValueBytes: Int, + keys: Map[String, Any] // The keys are used only for caching + ): Map[String, AnyRef] = { + val servingInfo = getServingInfo(oldServingInfo, batchResponses) + + // Batch metrics + batchResponses match { + case kvStoreResponse: KvStoreBatchResponse => + kvStoreResponse.response.map( + reportKvResponse(context.withSuffix("batch"), _, queryTimeMs, overallLatency, totalResponseValueBytes) + ) + case _: CachedBatchResponse => // no-op; } - // bulk upload didn't remove an older batch value - so we manually discard - val batchBytes: Array[Byte] = batchResponsesTry - .map(_.maxBy(_.millis)) - .filter(_.millis >= servingInfo.batchEndTsMillis) - .map(_.bytes) - .getOrElse(null) + // The bulk upload may not have removed an older batch values. We manually discard all but the latest one. + val batchBytes: Array[Byte] = batchResponses.getBatchBytes(servingInfo.batchEndTsMillis) + val responseMap: Map[String, AnyRef] = if (servingInfo.groupBy.aggregations == null) { // no-agg - servingInfo.selectedCodec.decodeMap(batchBytes) + getMapResponseFromBatchResponse(batchResponses, + batchBytes, + servingInfo.selectedCodec.decodeMap, + servingInfo, + keys) } else if (streamingResponsesOpt.isEmpty) { // snapshot accurate - servingInfo.outputCodec.decodeMap(batchBytes) + getMapResponseFromBatchResponse(batchResponses, batchBytes, servingInfo.outputCodec.decodeMap, servingInfo, keys) } else { // temporal accurate val streamingResponses = streamingResponsesOpt.get val mutations: Boolean = servingInfo.groupByOps.dataModel == DataModel.Entities @@ -91,60 +113,65 @@ class FetcherBase(kvStore: KVStore, null } else if (batchBytes == null && (streamingResponses == null || streamingResponses.isEmpty)) { if (debug) logger.info("Both batch and streaming data are null") - null - } else { - reportKvResponse(context.withSuffix("streaming"), - streamingResponses, - queryTimeMs, - overallLatency, - totalResponseValueBytes) - - val batchIr = toBatchIr(batchBytes, servingInfo) - val output: Array[Any] = if (servingInfo.isTilingEnabled) { - val streamingIrs: Iterator[TiledIr] = streamingResponses.iterator - .filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis) - .map { tVal => - val (tile, _) = servingInfo.tiledCodec.decodeTileIr(tVal.bytes) - TiledIr(tVal.millis, tile) - } + return null + } - if (debug) { - val gson = new Gson() - logger.info(s""" + // Streaming metrics + reportKvResponse(context.withSuffix("streaming"), + streamingResponses, + queryTimeMs, + overallLatency, + totalResponseValueBytes) + + // If caching is enabled, we try to fetch the batch IR from the cache so we avoid the work of decoding it. + val batchIr: FinalBatchIr = + getBatchIrFromBatchResponse(batchResponses, batchBytes, servingInfo, toBatchIr, keys) + + val output: Array[Any] = if (servingInfo.isTilingEnabled) { + val streamingIrs: Iterator[TiledIr] = streamingResponses.iterator + .filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis) + .map { tVal => + val (tile, _) = servingInfo.tiledCodec.decodeTileIr(tVal.bytes) + TiledIr(tVal.millis, tile) + } + + if (debug) { + val gson = new Gson() + logger.info(s""" |batch ir: ${gson.toJson(batchIr)} |streamingIrs: ${gson.toJson(streamingIrs)} |batchEnd in millis: ${servingInfo.batchEndTsMillis} |queryTime in millis: $queryTimeMs |""".stripMargin) - } + } - aggregator.lambdaAggregateFinalizedTiled(batchIr, streamingIrs, queryTimeMs) - } else { - val selectedCodec = servingInfo.groupByOps.dataModel match { - case DataModel.Events => servingInfo.valueAvroCodec - case DataModel.Entities => servingInfo.mutationValueAvroCodec - } + aggregator.lambdaAggregateFinalizedTiled(batchIr, streamingIrs, queryTimeMs) + } else { + val selectedCodec = servingInfo.groupByOps.dataModel match { + case DataModel.Events => servingInfo.valueAvroCodec + case DataModel.Entities => servingInfo.mutationValueAvroCodec + } - val streamingRows: Array[Row] = streamingResponses.iterator - .filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis) - .map(tVal => selectedCodec.decodeRow(tVal.bytes, tVal.millis, mutations)) - .toArray + val streamingRows: Array[Row] = streamingResponses.iterator + .filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis) + .map(tVal => selectedCodec.decodeRow(tVal.bytes, tVal.millis, mutations)) + .toArray - if (debug) { - val gson = new Gson() - logger.info(s""" + if (debug) { + val gson = new Gson() + logger.info(s""" |batch ir: ${gson.toJson(batchIr)} |streamingRows: ${gson.toJson(streamingRows)} |batchEnd in millis: ${servingInfo.batchEndTsMillis} |queryTime in millis: $queryTimeMs |""".stripMargin) - } - - aggregator.lambdaAggregateFinalized(batchIr, streamingRows.iterator, queryTimeMs, mutations) } - servingInfo.outputCodec.fieldNames.iterator.zip(output.iterator.map(_.asInstanceOf[AnyRef])).toMap + + aggregator.lambdaAggregateFinalized(batchIr, streamingRows.iterator, queryTimeMs, mutations) } + servingInfo.outputCodec.fieldNames.iterator.zip(output.iterator.map(_.asInstanceOf[AnyRef])).toMap } + context.distribution("group_by.latency.millis", System.currentTimeMillis() - startTimeMs) responseMap } @@ -167,6 +194,43 @@ class FetcherBase(kvStore: KVStore, ((responseBytes.toDouble / totalResponseBytes.toDouble) * latencyMillis).toLong) } + /** + * Get the latest serving information based on a batch response. + * + * The underlying metadata store used to store the latest GroupByServingInfoParsed will be updated if needed. + * + * @param oldServingInfo The previous serving information before fetching the latest KV store data. + * @param batchResponses the latest batch responses (either a fresh KV store response or a cached batch ir). + * @return the GroupByServingInfoParsed containing the latest serving information. + */ + private[online] def getServingInfo(oldServingInfo: GroupByServingInfoParsed, + batchResponses: BatchResponses): GroupByServingInfoParsed = { + batchResponses match { + case batchTimedValuesTry: KvStoreBatchResponse => { + val latestBatchValue: Try[TimedValue] = batchTimedValuesTry.response.map(_.maxBy(_.millis)) + latestBatchValue.map(timedVal => updateServingInfo(timedVal.millis, oldServingInfo)).getOrElse(oldServingInfo) + } + case _: CachedBatchResponse => { + // If there was cached batch data, there's no point try to update the serving info; it would be the same. + // However, there's one edge case to be handled. If all batch requests are cached and we never hit the kv store, + // we will never try to update the serving info. In that case, if new batch data were to land, we would never + // know of it. So, we force a refresh here to ensure that we are still periodically asynchronously hitting the + // KV store to update the serving info. (See CHIP-1) + getGroupByServingInfo.refresh(oldServingInfo.groupByOps.metaData.name) + + oldServingInfo + } + } + } + + /** + * If `batchEndTs` is ahead of `groupByServingInfo.batchEndTsMillis`, update the MetadataStore with the new + * timestamp. In practice, this means that new batch data has landed, so future kvstore requests should fetch + * streaming data after the new batchEndTsMillis. + * + * @param batchEndTs the new batchEndTs from the latest batch data + * @param groupByServingInfo the current GroupByServingInfo + */ private def updateServingInfo(batchEndTs: Long, groupByServingInfo: GroupByServingInfoParsed): GroupByServingInfoParsed = { val name = groupByServingInfo.groupBy.metaData.name @@ -245,15 +309,25 @@ class FetcherBase(kvStore: KVStore, } request -> groupByRequestMetaTry }.toSeq - val allRequests: Seq[GetRequest] = groupByRequestToKvRequest.flatMap { - case (_, Success(GroupByRequestMeta(_, batchRequest, streamingRequestOpt, _, _))) => - Some(batchRequest) ++ streamingRequestOpt + + // If caching is enabled, we check if any of the GetRequests are already cached. If so, we store them in a Map + // and avoid the work of re-fetching them. + val cachedRequests: Map[GetRequest, CachedBatchResponse] = getCachedRequests(groupByRequestToKvRequest) + // Collect cache metrics once per fetchGroupBys call; Caffeine metrics aren't tagged by groupBy + maybeBatchIrCache.foreach(cache => + LRUCache.collectCaffeineCacheMetrics(caffeineMetricsContext, cache.cache, cache.cacheName)) + + val allRequestsToFetch: Seq[GetRequest] = groupByRequestToKvRequest.flatMap { + case (_, Success(GroupByRequestMeta(_, batchRequest, streamingRequestOpt, _, _))) => { + // If a request is cached, don't include it in the list of requests to fetch. + if (cachedRequests.contains(batchRequest)) streamingRequestOpt else Some(batchRequest) ++ streamingRequestOpt + } case _ => Seq.empty } val startTimeMs = System.currentTimeMillis() - val kvResponseFuture: Future[Seq[GetResponse]] = if (allRequests.nonEmpty) { - kvStore.multiGet(allRequests) + val kvResponseFuture: Future[Seq[GetResponse]] = if (allRequestsToFetch.nonEmpty) { + kvStore.multiGet(allRequestsToFetch) } else { Future(Seq.empty[GetResponse]) } @@ -270,20 +344,33 @@ class FetcherBase(kvStore: KVStore, .filter(_.isSuccess) .flatMap(_.get.map(v => Option(v.bytes).map(_.length).getOrElse(0))) .sum + val responses: Seq[Response] = groupByRequestToKvRequest.iterator.map { case (request, requestMetaTry) => val responseMapTry: Try[Map[String, AnyRef]] = requestMetaTry.map { requestMeta => val GroupByRequestMeta(groupByServingInfo, batchRequest, streamingRequestOpt, _, context) = requestMeta - context.count("multi_get.batch.size", allRequests.length) + + context.count("multi_get.batch.size", allRequestsToFetch.length) context.distribution("multi_get.bytes", totalResponseValueBytes) context.distribution("multi_get.response.length", kvResponses.length) context.distribution("multi_get.latency.millis", multiGetMillis) + // pick the batch version with highest timestamp - val batchResponseTryAll = responsesMap - .getOrElse(batchRequest, - Failure( - new IllegalStateException( - s"Couldn't find corresponding response for $batchRequest in responseMap"))) + val batchResponses: BatchResponses = + // Check if the get request was cached. If so, use the cache. Otherwise, try to get it from response. + cachedRequests.get(batchRequest) match { + case None => + BatchResponses( + responsesMap + .getOrElse( + batchRequest, + // Fail if response is neither in responsesMap nor in cache + Failure(new IllegalStateException( + s"Couldn't find corresponding response for $batchRequest in responseMap or cache")) + )) + case Some(cachedResponse: CachedBatchResponse) => cachedResponse + } + val streamingResponsesOpt = streamingRequestOpt.map(responsesMap.getOrElse(_, Success(Seq.empty)).getOrElse(Seq.empty)) val queryTs = request.atMillis.getOrElse(System.currentTimeMillis()) @@ -293,14 +380,15 @@ class FetcherBase(kvStore: KVStore, logger.info( s"Constructing response for groupBy: ${groupByServingInfo.groupByOps.metaData.getName} " + s"for keys: ${request.keys}") - constructGroupByResponse(batchResponseTryAll, + constructGroupByResponse(batchResponses, streamingResponsesOpt, groupByServingInfo, queryTs, startTimeMs, multiGetMillis, context, - totalResponseValueBytes) + totalResponseValueBytes, + request.keys) } catch { case ex: Exception => // not all exceptions are due to stale schema, so we want to control how often we hit kv store @@ -350,6 +438,9 @@ class FetcherBase(kvStore: KVStore, } } + /** + * Convert an array of bytes to a FinalBatchIr. + */ def toBatchIr(bytes: Array[Byte], gbInfo: GroupByServingInfoParsed): FinalBatchIr = { if (bytes == null) return null val batchRecord = From 9f711d553989cc3404a6ec1740dd24847627ef15 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Thu, 22 Feb 2024 17:04:09 -0500 Subject: [PATCH 07/21] Scala 2.13 support? --- online/src/main/scala/ai/chronon/online/FetcherCache.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/online/src/main/scala/ai/chronon/online/FetcherCache.scala b/online/src/main/scala/ai/chronon/online/FetcherCache.scala index b67fd57d9..14ee823c8 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherCache.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherCache.scala @@ -19,6 +19,7 @@ import com.github.benmanes.caffeine.cache.{Cache => CaffeineCache} import scala.util.{Success, Try} import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters.mapAsScalaConcurrentMapConverter +import scala.collection.Seq /* * FetcherCache is an extension to FetcherBase that provides caching functionality. It caches KV store From df3242cddcec6f6bc6ad1dd51e46ed006fafa39e Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Mon, 26 Feb 2024 17:05:30 -0500 Subject: [PATCH 08/21] Add getServingInfo unit tests --- .../scala/ai/chronon/online/FetcherBase.scala | 4 +- .../ai/chronon/online/FetcherBaseTest.scala | 51 +++++++++++++++++-- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index a8297593b..6892ec93a 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -231,8 +231,8 @@ class FetcherBase(kvStore: KVStore, * @param batchEndTs the new batchEndTs from the latest batch data * @param groupByServingInfo the current GroupByServingInfo */ - private def updateServingInfo(batchEndTs: Long, - groupByServingInfo: GroupByServingInfoParsed): GroupByServingInfoParsed = { + private[online] def updateServingInfo(batchEndTs: Long, + groupByServingInfo: GroupByServingInfoParsed): GroupByServingInfoParsed = { val name = groupByServingInfo.groupBy.metaData.name if (batchEndTs > groupByServingInfo.batchEndTsMillis) { logger.info(s"""$name's value's batch timestamp of $batchEndTs is diff --git a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala index 51b599ce7..e033c4888 100644 --- a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala +++ b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala @@ -16,7 +16,12 @@ package ai.chronon.online +import ai.chronon.aggregator.windowing.FinalBatchIr +import ai.chronon.api.Extensions.GroupByOps +import ai.chronon.api.MetaData import ai.chronon.online.Fetcher.{ColumnSpec, Request, Response} +import ai.chronon.online.FetcherCache.BatchResponses +import ai.chronon.online.KVStore.TimedValue import org.junit.{Before, Test} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ @@ -25,12 +30,14 @@ import org.mockito.stubbing.Answer import org.mockito.{Answers, ArgumentCaptor} import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar +import org.junit.Assert.assertSame import scala.concurrent.duration.DurationInt import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.{Failure, Success} +import scala.util.Try -class FetcherBaseTest extends MockitoSugar with Matchers { +class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { val GroupBy = "relevance.short_term_user_features" val Column = "pdp_view_count_14d" val GuestKey = "guest" @@ -118,7 +125,7 @@ class FetcherBaseTest extends MockitoSugar with Matchers { // Fetch a single query val keyMap = Map(GuestKey -> GuestId) val query = ColumnSpec(GroupBy, Column, None, Some(keyMap)) - + doAnswer(new Answer[Future[Seq[Fetcher.Response]]] { def answer(invocation: InvocationOnMock): Future[Seq[Response]] = { Future.successful(Seq()) @@ -130,7 +137,7 @@ class FetcherBaseTest extends MockitoSugar with Matchers { queryResults.contains(query) shouldBe true queryResults.get(query).map(_.values) match { case Some(Failure(ex: IllegalStateException)) => succeed - case _ => fail() + case _ => fail() } // GroupBy request sent to KV store for the query @@ -141,4 +148,42 @@ class FetcherBaseTest extends MockitoSugar with Matchers { actualRequest.get.name shouldBe query.groupByName + "." + query.columnName actualRequest.get.keys shouldBe query.keyMapping.get } + + @Test + def test_getServingInfo_ShouldCallUpdateServingInfoIfBatchResponseIsFromKvStore(): Unit = { + val baseFetcher = new FetcherBase(mock[KVStore]) + val spiedFetcherBase = spy(baseFetcher) + val oldServingInfo = mock[GroupByServingInfoParsed] + val updatedServingInfo = mock[GroupByServingInfoParsed] + val batchTimedValuesSuccess = Success(Seq(TimedValue(Array(1.toByte), 2000L))) + val kvStoreBatchResponses = BatchResponses(batchTimedValuesSuccess) + doReturn(updatedServingInfo).when(spiedFetcherBase).updateServingInfo(any(), any()) + + // updateServingInfo is called + val result = spiedFetcherBase.getServingInfo(oldServingInfo, kvStoreBatchResponses) + assertSame(result, updatedServingInfo) + verify(spiedFetcherBase).updateServingInfo(any(), any()) + } + + @Test + def test_getServingInfo_ShouldRefreshServingInfoIfBatchResponseIsCached(): Unit = { + val baseFetcher = new FetcherBase(mock[KVStore]) + val spiedFetcherBase = spy(baseFetcher) + val oldServingInfo = mock[GroupByServingInfoParsed] + val metaData = mock[MetaData] + val groupByOpsMock = mock[GroupByOps] + val cachedBatchResponses = BatchResponses(mock[FinalBatchIr]) + val ttlCache = mock[TTLCache[String, Try[GroupByServingInfoParsed]]] + doReturn(ttlCache).when(spiedFetcherBase).getGroupByServingInfo + doReturn(Success(oldServingInfo)).when(ttlCache).refresh(any[String]) + metaData.name = "test" + groupByOpsMock.metaData = metaData + when(oldServingInfo.groupByOps).thenReturn(groupByOpsMock) + + // FetcherBase.updateServingInfo is not called, but getGroupByServingInfo.refresh() is. + val result = spiedFetcherBase.getServingInfo(oldServingInfo, cachedBatchResponses) + assertSame(result, oldServingInfo) + verify(ttlCache).refresh(any()) + verify(spiedFetcherBase, never()).updateServingInfo(any(), any()) + } } From 89d68bb3eaa66c68d6bea36b19ff9ad88d14a54a Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Wed, 28 Feb 2024 09:39:41 -0500 Subject: [PATCH 09/21] Refactor getServingInfo tests --- .../ai/chronon/online/FetcherBaseTest.scala | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala index e033c4888..bc7b60d1b 100644 --- a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala +++ b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala @@ -30,7 +30,6 @@ import org.mockito.stubbing.Answer import org.mockito.{Answers, ArgumentCaptor} import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar -import org.junit.Assert.assertSame import scala.concurrent.duration.DurationInt import scala.concurrent.{Await, ExecutionContext, Future} @@ -149,41 +148,45 @@ class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { actualRequest.get.keys shouldBe query.keyMapping.get } + // updateServingInfo() is called when the batch response is from the KV store. @Test def test_getServingInfo_ShouldCallUpdateServingInfoIfBatchResponseIsFromKvStore(): Unit = { - val baseFetcher = new FetcherBase(mock[KVStore]) - val spiedFetcherBase = spy(baseFetcher) val oldServingInfo = mock[GroupByServingInfoParsed] val updatedServingInfo = mock[GroupByServingInfoParsed] + doReturn(updatedServingInfo).when(fetcherBase).updateServingInfo(any(), any()) + val batchTimedValuesSuccess = Success(Seq(TimedValue(Array(1.toByte), 2000L))) val kvStoreBatchResponses = BatchResponses(batchTimedValuesSuccess) - doReturn(updatedServingInfo).when(spiedFetcherBase).updateServingInfo(any(), any()) + + val result = fetcherBase.getServingInfo(oldServingInfo, kvStoreBatchResponses) // updateServingInfo is called - val result = spiedFetcherBase.getServingInfo(oldServingInfo, kvStoreBatchResponses) - assertSame(result, updatedServingInfo) - verify(spiedFetcherBase).updateServingInfo(any(), any()) + result shouldEqual updatedServingInfo + verify(fetcherBase).updateServingInfo(any(), any()) } + // If a batch response is cached, the serving info should be refreshed. This is needed to prevent + // the serving info from becoming stale if all the requests are cached. @Test def test_getServingInfo_ShouldRefreshServingInfoIfBatchResponseIsCached(): Unit = { - val baseFetcher = new FetcherBase(mock[KVStore]) - val spiedFetcherBase = spy(baseFetcher) + val ttlCache = mock[TTLCache[String, Try[GroupByServingInfoParsed]]] + doReturn(ttlCache).when(fetcherBase).getGroupByServingInfo + val oldServingInfo = mock[GroupByServingInfoParsed] - val metaData = mock[MetaData] + doReturn(Success(oldServingInfo)).when(ttlCache).refresh(any[String]) + + val metaDataMock = mock[MetaData] val groupByOpsMock = mock[GroupByOps] + doReturn("test").when(metaDataMock).name + doReturn(metaDataMock).when(groupByOpsMock).metaData + doReturn(groupByOpsMock).when(oldServingInfo).groupByOps + val cachedBatchResponses = BatchResponses(mock[FinalBatchIr]) - val ttlCache = mock[TTLCache[String, Try[GroupByServingInfoParsed]]] - doReturn(ttlCache).when(spiedFetcherBase).getGroupByServingInfo - doReturn(Success(oldServingInfo)).when(ttlCache).refresh(any[String]) - metaData.name = "test" - groupByOpsMock.metaData = metaData - when(oldServingInfo.groupByOps).thenReturn(groupByOpsMock) + val result = fetcherBase.getServingInfo(oldServingInfo, cachedBatchResponses) // FetcherBase.updateServingInfo is not called, but getGroupByServingInfo.refresh() is. - val result = spiedFetcherBase.getServingInfo(oldServingInfo, cachedBatchResponses) - assertSame(result, oldServingInfo) + result shouldEqual oldServingInfo verify(ttlCache).refresh(any()) - verify(spiedFetcherBase, never()).updateServingInfo(any(), any()) + verify(fetcherBase, never()).updateServingInfo(any(), any()) } } From fd0df49008e15060a20c964a3563965068afae17 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Wed, 28 Feb 2024 15:25:30 -0500 Subject: [PATCH 10/21] Fix stub --- .../src/test/scala/ai/chronon/online/FetcherBaseTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala index bc7b60d1b..6eda6c8ac 100644 --- a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala +++ b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala @@ -36,7 +36,7 @@ import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.{Failure, Success} import scala.util.Try -class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { +class FetcherBaseTest extends MockitoSugar with Matchers { val GroupBy = "relevance.short_term_user_features" val Column = "pdp_view_count_14d" val GuestKey = "guest" @@ -177,8 +177,8 @@ class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { val metaDataMock = mock[MetaData] val groupByOpsMock = mock[GroupByOps] - doReturn("test").when(metaDataMock).name - doReturn(metaDataMock).when(groupByOpsMock).metaData + metaDataMock.name = "test" + groupByOpsMock.metaData = metaDataMock doReturn(groupByOpsMock).when(oldServingInfo).groupByOps val cachedBatchResponses = BatchResponses(mock[FinalBatchIr]) From 77c711a91fcb0e33a1847a190682cff34d2d55cf Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Tue, 2 Apr 2024 11:56:14 -0400 Subject: [PATCH 11/21] Fix Mockito "ambiguous reference to overloaded definition" error --- online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala | 2 +- .../src/test/scala/ai/chronon/online/FetcherCacheTest.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala index 6eda6c8ac..3b89b6889 100644 --- a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala +++ b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala @@ -36,7 +36,7 @@ import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.{Failure, Success} import scala.util.Try -class FetcherBaseTest extends MockitoSugar with Matchers { +class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { val GroupBy = "relevance.short_term_user_features" val Column = "pdp_view_count_14d" val GuestKey = "guest" diff --git a/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala b/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala index bb476b2f0..45673825a 100644 --- a/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala +++ b/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala @@ -21,8 +21,8 @@ import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} trait MockitoHelper extends MockitoSugar { - // Overriding doReturn to fix a known Java/Scala interoperability issue when using Mockito. ("doReturn: ambiguous - // reference to overloaded definition"). An alternative would be to use the 'mockito-scala' library. + // We override doReturn to fix a known Java/Scala interoperability issue. Without this fix, we see "doReturn: + // ambiguous reference to overloaded definition" errors. An alternative would be to use the 'mockito-scala' library. def doReturn(toBeReturned: Any): Stubber = { Mockito.doReturn(toBeReturned, Nil: _*) } From 0789bea24dd4da475d63e250a8ceaff1562fca27 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Tue, 2 Apr 2024 12:39:41 -0400 Subject: [PATCH 12/21] Fix "Both batch and streaming data are null" check --- .../src/main/scala/ai/chronon/online/FetcherBase.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index 6892ec93a..531f3eef2 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -111,7 +111,14 @@ class FetcherBase(kvStore: KVStore, s"Request time of $queryTimeMs is less than batch time ${aggregator.batchEndTs}" + s" for groupBy ${servingInfo.groupByOps.metaData.getName}")) null - } else if (batchBytes == null && (streamingResponses == null || streamingResponses.isEmpty)) { + } else if ( + // Check if there's no streaming data. + (streamingResponses == null || streamingResponses.isEmpty) && + // Check if there's no batch data. This is only possible if the batch response is from a KV Store request + // (KvStoreBatchResponse) that returned null bytes. It's not possible to have null batch data with cached batch + // responses as we only cache non-null data. + (batchResponses.isInstanceOf[KvStoreBatchResponse] && batchBytes == null) + ) { if (debug) logger.info("Both batch and streaming data are null") return null } From acbe30aeb623a191a0dd2559be84dce8cd64471e Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Wed, 3 Apr 2024 17:05:07 -0400 Subject: [PATCH 13/21] Address PR review: add comments and use logger --- .../ai/chronon/online/FetcherCache.scala | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherCache.scala b/online/src/main/scala/ai/chronon/online/FetcherCache.scala index 14ee823c8..c847de307 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherCache.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherCache.scala @@ -20,12 +20,15 @@ import scala.util.{Success, Try} import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters.mapAsScalaConcurrentMapConverter import scala.collection.Seq +import org.slf4j.{Logger, LoggerFactory} /* * FetcherCache is an extension to FetcherBase that provides caching functionality. It caches KV store * requests to decrease feature serving latency. * */ trait FetcherCache { + @transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass) + val batchIrCacheName = "batch_cache" val maybeBatchIrCache: Option[BatchIrCache] = Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size")) @@ -46,11 +49,11 @@ trait FetcherCache { groupByName, { groupBy.getMetaData.customJsonLookUp("enable_caching") match { case b: Boolean => - println(s"Caching is ${if (b) "enabled" else "disabled"} for $groupByName") + logger.info(s"Caching is ${if (b) "enabled" else "disabled"} for $groupByName") isCachingEnabledForGroupBy.putIfAbsent(groupByName, b) b case null => - println(s"Caching is disabled for $groupByName, enable_caching is not set.") + logger.info(s"Caching is disabled for $groupByName, enable_caching is not set.") isCachingEnabledForGroupBy.putIfAbsent(groupByName, false) false case _ => false @@ -189,7 +192,12 @@ object FetcherCache { type Value = BatchResponses } - // BatchResponses encapsulates either a batch response from kv store or a cached batch response. + /** + * Encapsulates the response for a GetRequest for batch data. This response could be the values received from + * a KV Store request, or cached values. + * + * (The fetcher uses these batch values to construct the response for a request for feature values.) + * */ sealed abstract class BatchResponses { def getBatchBytes(batchEndTsMillis: Long): Array[Byte] } @@ -198,6 +206,8 @@ object FetcherCache { def apply(cachedResponse: FinalBatchIr): CachedFinalIrBatchResponse = CachedFinalIrBatchResponse(cachedResponse) def apply(cachedResponse: Map[String, AnyRef]): CachedMapBatchResponse = CachedMapBatchResponse(cachedResponse) } + + /** Encapsulates batch response values received from a KV Store request. */ case class KvStoreBatchResponse(response: Try[Seq[TimedValue]]) extends BatchResponses { def getBatchBytes(batchEndTsMillis: Long): Array[Byte] = response @@ -206,10 +216,16 @@ object FetcherCache { .map(_.bytes) .getOrElse(null) } + + /** Encapsulates a batch response that was found in the Fetcher's internal IR cache. */ sealed abstract class CachedBatchResponse extends BatchResponses { // This is the case where we don't have bytes because the decoded IR was cached so we didn't hit the KV store again. def getBatchBytes(batchEndTsMillis: Long): Null = null } + + /** Encapsulates a decoded batch response that was found in the Fetcher's internal IR cache. */ case class CachedFinalIrBatchResponse(response: FinalBatchIr) extends CachedBatchResponse + + /** Encapsulates a decoded batch response that was found in the Fetcher's internal IR cache */ case class CachedMapBatchResponse(response: Map[String, AnyRef]) extends CachedBatchResponse } From 80bd7cf8870c4910a194f3d83d5de9d8c917a4f9 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Thu, 9 May 2024 13:55:57 -0400 Subject: [PATCH 14/21] Fewer comments for constructGroupByResponse --- .../scala/ai/chronon/online/FetcherBase.scala | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index f248d5878..434aed0c7 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -58,24 +58,13 @@ class FetcherBase(kvStore: KVStore, /** * A groupBy request is split into batchRequest and optionally a streamingRequest. This method decodes bytes * (of the appropriate avro schema) into chronon rows aggregates further if necessary. - * - * @param batchResponses a BatchResponses, which encapsulates either a response from kv store or a cached batch IR. - * @param streamingResponsesOpt a response from kv store, if the GroupBy was streaming data. - * @param oldServingInfo the GroupByServingInfo used to fetch the GroupBys. - * @param queryTimeMs the Request timestamp - * @param startTimeMs time when we started fetching the KV store - * @param overallLatency the time it took to get the values from the KV store - * @param context the Metrics.Context to use for recording metrics - * @param totalResponseValueBytes the total size of the response from the KV store - * @param keys the keys used to fetch the GroupBy - * @return */ private def constructGroupByResponse(batchResponses: BatchResponses, streamingResponsesOpt: Option[Seq[TimedValue]], oldServingInfo: GroupByServingInfoParsed, - queryTimeMs: Long, - startTimeMs: Long, - overallLatency: Long, + queryTimeMs: Long, // the timestamp of the Request being served. + startTimeMs: Long, // timestamp right before the KV store fetch. + overallLatency: Long, // the time it took to get the values from the KV store context: Metrics.Context, totalResponseValueBytes: Int, keys: Map[String, Any] // The keys are used only for caching From 2873a67e33867cb281aabdab35d2e7a4f23464e0 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Thu, 9 May 2024 14:54:15 -0400 Subject: [PATCH 15/21] Use the FlagStore to determine if caching is enabled --- .../scala/ai/chronon/online/FetcherBase.scala | 13 +++++ .../ai/chronon/online/FetcherCache.scala | 28 ++-------- .../ai/chronon/online/FetcherBaseTest.scala | 33 +++++++++++- .../ai/chronon/online/FetcherCacheTest.scala | 51 +------------------ 4 files changed, 50 insertions(+), 75 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index 434aed0c7..839e0629e 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -249,6 +249,19 @@ class FetcherBase(kvStore: KVStore, } } + override def isCachingEnabled(groupBy: GroupBy): Boolean = { + if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false + + val isCachingFlagEnabled = flagStore.isSet("enable_fetcher_batch_ir_cache", + Map("groupby_streaming_dataset" -> groupBy.getMetaData.getName).asJava) + + if (debug) + println( + s"Online IR caching is ${if (isCachingFlagEnabled) "enabled" else "disabled"} for ${groupBy.getMetaData.getName}") + + isCachingFlagEnabled + } + // 1. fetches GroupByServingInfo // 2. encodes keys as keyAvroSchema // 3. Based on accuracy, fetches streaming + batch data and aggregates further. diff --git a/online/src/main/scala/ai/chronon/online/FetcherCache.scala b/online/src/main/scala/ai/chronon/online/FetcherCache.scala index c847de307..c9364a36d 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherCache.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherCache.scala @@ -35,32 +35,10 @@ trait FetcherCache { .map(size => new BatchIrCache(batchIrCacheName, size.toInt)) .orElse(None) + // Caching needs to be configured globally def isCacheSizeConfigured: Boolean = maybeBatchIrCache.isDefined - - // Memoize which GroupBys have caching enabled - private[online] val isCachingEnabledForGroupBy: collection.concurrent.Map[String, Boolean] = - new ConcurrentHashMap[String, Boolean]().asScala - - def isCachingEnabled(groupBy: GroupBy): Boolean = { - if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false - - val groupByName = groupBy.getMetaData.getName - isCachingEnabledForGroupBy.getOrElse( - groupByName, { - groupBy.getMetaData.customJsonLookUp("enable_caching") match { - case b: Boolean => - logger.info(s"Caching is ${if (b) "enabled" else "disabled"} for $groupByName") - isCachingEnabledForGroupBy.putIfAbsent(groupByName, b) - b - case null => - logger.info(s"Caching is disabled for $groupByName, enable_caching is not set.") - isCachingEnabledForGroupBy.putIfAbsent(groupByName, false) - false - case _ => false - } - } - ) - } + // Caching needs to be enabled for the specific groupBy + def isCachingEnabled(groupBy: GroupBy): Boolean = false protected val caffeineMetricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.JoinFetching) diff --git a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala index 3b89b6889..7f3e635f9 100644 --- a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala +++ b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala @@ -18,10 +18,11 @@ package ai.chronon.online import ai.chronon.aggregator.windowing.FinalBatchIr import ai.chronon.api.Extensions.GroupByOps -import ai.chronon.api.MetaData +import ai.chronon.api.{Builders, GroupBy, MetaData} import ai.chronon.online.Fetcher.{ColumnSpec, Request, Response} import ai.chronon.online.FetcherCache.BatchResponses import ai.chronon.online.KVStore.TimedValue +import org.junit.Assert.{assertFalse, assertTrue, fail} import org.junit.{Before, Test} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ @@ -189,4 +190,34 @@ class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { verify(ttlCache).refresh(any()) verify(fetcherBase, never()).updateServingInfo(any(), any()) } + + @Test + def test_isCachingEnabled_CorrectlyDetermineIfCacheIsEnabled(): Unit = { + val flagStore: FlagStore = (flagName: String, attributes: java.util.Map[String, String]) => { + flagName match { + case "enable_fetcher_batch_ir_cache" => + attributes.get("groupby_streaming_dataset") match { + case "test_groupby_2" => false + case "test_groupby_3" => true + case other @ _ => + fail(s"Unexpected groupby_streaming_dataset: $other") + false + } + case _ => false + } + } + + kvStore = mock[KVStore](Answers.RETURNS_DEEP_STUBS) + when(kvStore.executionContext).thenReturn(ExecutionContext.global) + val fetcherBaseWithFlagStore = spy(new FetcherBase(kvStore, flagStore = flagStore)) + when(fetcherBaseWithFlagStore.isCacheSizeConfigured).thenReturn(true) + + def buildGroupByWithCustomJson(name: String): GroupBy = Builders.GroupBy(metaData = Builders.MetaData(name = name)) + + // no name set + assertFalse(fetcherBaseWithFlagStore.isCachingEnabled(Builders.GroupBy())) + + assertFalse(fetcherBaseWithFlagStore.isCachingEnabled(buildGroupByWithCustomJson("test_groupby_2"))) + assertTrue(fetcherBaseWithFlagStore.isCachingEnabled(buildGroupByWithCustomJson("test_groupby_3"))) + } } diff --git a/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala b/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala index 45673825a..1168261c8 100644 --- a/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala +++ b/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala @@ -2,14 +2,13 @@ package ai.chronon.online import ai.chronon.aggregator.windowing.FinalBatchIr import ai.chronon.api.Extensions.GroupByOps -import ai.chronon.api.{Builders, GroupBy} +import ai.chronon.api.GroupBy import ai.chronon.online.FetcherBase._ -import ai.chronon.online.{AvroCodec, FetcherCache, GroupByServingInfoParsed, KVStore, Metrics} import ai.chronon.online.Fetcher.Request import ai.chronon.online.FetcherCache.{BatchIrCache, BatchResponses, CachedMapBatchResponse} import ai.chronon.online.KVStore.TimedValue import ai.chronon.online.Metrics.Context -import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertNull, assertTrue, fail} +import org.junit.Assert.{assertArrayEquals, assertEquals, assertNull, fail} import org.junit.Test import org.mockito.Mockito._ import org.mockito.ArgumentMatchers.any @@ -332,50 +331,4 @@ class FetcherCacheTest extends MockitoHelper { verify(servingInfo.outputCodec.decodeMap(any()), times(1)) // decoding did happen assertEquals(mapResponse, decodedMapResponse) } - - @Test - def test_isCachingEnabled_CorrectlyDetermineIfCacheIsEnabled(): Unit = { - val baseFetcher = new TestableFetcherCache(Some(new BatchIrCache("test", batchIrCacheMaximumSize))) - def buildGroupByWithCustomJson(name: String, customJson: String = null): GroupBy = - Builders.GroupBy( - metaData = Builders.MetaData(name = name, customJson = customJson) - ) - - assertFalse(baseFetcher.isCachingEnabled(buildGroupByWithCustomJson("test_groupby_1"))) - assertFalse(baseFetcher.isCachingEnabled(buildGroupByWithCustomJson("test_groupby_2", "{}"))) - assertTrue( - baseFetcher - .isCachingEnabled(buildGroupByWithCustomJson("test_groupby_3", "{\"enable_caching\": true}")) - ) - assertFalse( - baseFetcher - .isCachingEnabled(buildGroupByWithCustomJson("test_groupby_4", "{\"enable_caching\": false}")) - ) - assertFalse( - baseFetcher - .isCachingEnabled( - buildGroupByWithCustomJson("test_groupby_5", "{\"enable_caching\": \"string instead of bool\"}") - ) - ) - } - - @Test - def test_isCachingEnabled_Memoizes(): Unit = { - val baseFetcher = new TestableFetcherCache(Some(new BatchIrCache("test", batchIrCacheMaximumSize))) - def buildGroupByWithCustomJson(name: String, customJson: String = null): GroupBy = - Builders.GroupBy( - metaData = Builders.MetaData(name = name, customJson = customJson) - ) - - // the map is clean at the start - assertFalse(baseFetcher.isCachingEnabledForGroupBy.contains("test_memo_gb_1")) - assertFalse(baseFetcher.isCachingEnabledForGroupBy.contains("test_memo_gb_2")) - - // memoization works for both true and false - baseFetcher.isCachingEnabled(buildGroupByWithCustomJson("test_memo_gb_1")) - assertFalse(baseFetcher.isCachingEnabledForGroupBy("test_memo_gb_1")) - - baseFetcher.isCachingEnabled(buildGroupByWithCustomJson("test_memo_gb_2", "{\"enable_caching\": true}")) - assertTrue(baseFetcher.isCachingEnabledForGroupBy("test_memo_gb_2")) - } } From d02474698af56af8434fe98134583c9ba86d8adf Mon Sep 17 00:00:00 2001 From: "Caio Camatta (Stripe)" <108533014+caiocamatta-stripe@users.noreply.github.com> Date: Fri, 21 Jun 2024 16:12:12 -0400 Subject: [PATCH 16/21] Apply suggestions from code review Co-authored-by: Pengyu Hou <3771747+pengyu-hou@users.noreply.github.com> Signed-off-by: Caio Camatta (Stripe) <108533014+caiocamatta-stripe@users.noreply.github.com> --- online/src/main/scala/ai/chronon/online/FetcherBase.scala | 4 ++-- online/src/main/scala/ai/chronon/online/LRUCache.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index 839e0629e..3ce7975b9 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -321,7 +321,7 @@ class FetcherBase(kvStore: KVStore, }.toSeq // If caching is enabled, we check if any of the GetRequests are already cached. If so, we store them in a Map - // and avoid the work of re-fetching them. + // and avoid the work of re-fetching them. It is mainly for batch data requests. val cachedRequests: Map[GetRequest, CachedBatchResponse] = getCachedRequests(groupByRequestToKvRequest) // Collect cache metrics once per fetchGroupBys call; Caffeine metrics aren't tagged by groupBy maybeBatchIrCache.foreach(cache => @@ -329,7 +329,7 @@ class FetcherBase(kvStore: KVStore, val allRequestsToFetch: Seq[GetRequest] = groupByRequestToKvRequest.flatMap { case (_, Success(GroupByRequestMeta(_, batchRequest, streamingRequestOpt, _, _))) => { - // If a request is cached, don't include it in the list of requests to fetch. + // If a batch request is cached, don't include it in the list of requests to fetch because the batch IRs already cached if (cachedRequests.contains(batchRequest)) streamingRequestOpt else Some(batchRequest) ++ streamingRequestOpt } case _ => Seq.empty diff --git a/online/src/main/scala/ai/chronon/online/LRUCache.scala b/online/src/main/scala/ai/chronon/online/LRUCache.scala index 0caa2f7e6..5a4d259fe 100644 --- a/online/src/main/scala/ai/chronon/online/LRUCache.scala +++ b/online/src/main/scala/ai/chronon/online/LRUCache.scala @@ -26,13 +26,13 @@ object LRUCache { private def buildCaffeineCache[KEY <: Object, VALUE <: Object]( cacheName: String, maximumSize: Int = 10000): CaffeineCache[KEY, VALUE] = { - println(s"Chronon Cache build started. cacheName=$cacheName") + logger.info(s"Chronon Cache build started. cacheName=$cacheName") val cache: CaffeineCache[KEY, VALUE] = Caffeine .newBuilder() .maximumSize(maximumSize) .recordStats() .build[KEY, VALUE]() - println(s"Chronon Cache build finished. cacheName=$cacheName") + logger.info(s"Chronon Cache build finished. cacheName=$cacheName") cache } From b9b54a0a10bd33e1842f18399950c3907aab2598 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Mon, 24 Jun 2024 15:06:54 -0700 Subject: [PATCH 17/21] Address review, add comments, rename tests --- .../java/ai/chronon/online/FlagStore.java | 9 ++++++- .../scala/ai/chronon/online/FetcherBase.scala | 9 ++++--- .../ai/chronon/online/FetcherCache.scala | 9 ++++++- .../scala/ai/chronon/online/LRUCache.scala | 2 ++ .../ai/chronon/online/FetcherCacheTest.scala | 26 +++++++++---------- 5 files changed, 36 insertions(+), 19 deletions(-) diff --git a/online/src/main/java/ai/chronon/online/FlagStore.java b/online/src/main/java/ai/chronon/online/FlagStore.java index cb39668b3..bdc0c1bd3 100644 --- a/online/src/main/java/ai/chronon/online/FlagStore.java +++ b/online/src/main/java/ai/chronon/online/FlagStore.java @@ -3,7 +3,14 @@ import java.io.Serializable; import java.util.Map; -// Interface to allow rolling out features/infrastructure changes in a safe & controlled manner +/** + * Interface to allow rolling out features/infrastructure changes in a safe & controlled manner. + * + * The "Flag"s in FlagStore referes to 'feature flags', a technique that allows enabling or disabling features at + * runtime. + * + * Chronon users can provide their own implementation in the Api. + */ public interface FlagStore extends Serializable { Boolean isSet(String flagName, Map attributes); } diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index 3ce7975b9..b0b7bd44f 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -252,11 +252,12 @@ class FetcherBase(kvStore: KVStore, override def isCachingEnabled(groupBy: GroupBy): Boolean = { if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false - val isCachingFlagEnabled = flagStore.isSet("enable_fetcher_batch_ir_cache", - Map("groupby_streaming_dataset" -> groupBy.getMetaData.getName).asJava) + val isCachingFlagEnabled = + flagStore != null && flagStore.isSet("enable_fetcher_batch_ir_cache", + Map("groupby_streaming_dataset" -> groupBy.getMetaData.getName).asJava) if (debug) - println( + logger.info( s"Online IR caching is ${if (isCachingFlagEnabled) "enabled" else "disabled"} for ${groupBy.getMetaData.getName}") isCachingFlagEnabled @@ -321,7 +322,7 @@ class FetcherBase(kvStore: KVStore, }.toSeq // If caching is enabled, we check if any of the GetRequests are already cached. If so, we store them in a Map - // and avoid the work of re-fetching them. It is mainly for batch data requests. + // and avoid the work of re-fetching them. It is mainly for batch data requests. val cachedRequests: Map[GetRequest, CachedBatchResponse] = getCachedRequests(groupByRequestToKvRequest) // Collect cache metrics once per fetchGroupBys call; Caffeine metrics aren't tagged by groupBy maybeBatchIrCache.foreach(cache => diff --git a/online/src/main/scala/ai/chronon/online/FetcherCache.scala b/online/src/main/scala/ai/chronon/online/FetcherCache.scala index c9364a36d..41739ee4e 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherCache.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherCache.scala @@ -25,13 +25,20 @@ import org.slf4j.{Logger, LoggerFactory} /* * FetcherCache is an extension to FetcherBase that provides caching functionality. It caches KV store * requests to decrease feature serving latency. + * + * To use it, + * 1. Set the system property `ai.chronon.fetcher.batch_ir_cache_size_elements` to the desired cache size + * in therms of elements. This will create a cache shared across all GroupBys. To determine a size, start with a + * small number (e.g. 1,000) and measure how much memory it uses, then adjust accordingly. + * 2. Enable caching for a specific GroupBy by overriding `isCachingEnabled` and returning `true` for that GroupBy. + * FetcherBase already provides an implementation of `isCachingEnabled` that uses the FlagStore. * */ trait FetcherCache { @transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass) val batchIrCacheName = "batch_cache" val maybeBatchIrCache: Option[BatchIrCache] = - Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size")) + Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size_elements")) .map(size => new BatchIrCache(batchIrCacheName, size.toInt)) .orElse(None) diff --git a/online/src/main/scala/ai/chronon/online/LRUCache.scala b/online/src/main/scala/ai/chronon/online/LRUCache.scala index 5a4d259fe..5bdc643f8 100644 --- a/online/src/main/scala/ai/chronon/online/LRUCache.scala +++ b/online/src/main/scala/ai/chronon/online/LRUCache.scala @@ -1,6 +1,7 @@ package ai.chronon.online import com.github.benmanes.caffeine.cache.{Caffeine, Cache => CaffeineCache} +import org.slf4j.{Logger, LoggerFactory} /** * Utility to create a cache with LRU semantics. @@ -9,6 +10,7 @@ import com.github.benmanes.caffeine.cache.{Caffeine, Cache => CaffeineCache} * in the Fetcher. This helps decrease to feature serving latency. */ object LRUCache { + @transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass) /** * Build a bounded, thread-safe Caffeine cache that stores KEY-VALUE pairs. diff --git a/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala b/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala index 1168261c8..e2f0e5e13 100644 --- a/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala +++ b/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala @@ -34,7 +34,7 @@ class FetcherCacheTest extends MockitoHelper { val batchIrCacheMaximumSize = 50 @Test - def test_BatchIrCache_CorrectlyCachesBatchIrs(): Unit = { + def testBatchIrCacheCorrectlyCachesBatchIrs(): Unit = { val cacheName = "test" val batchIrCache = new BatchIrCache(cacheName, batchIrCacheMaximumSize) val dataset = "TEST_GROUPBY_BATCH" @@ -57,7 +57,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_BatchIrCache_CorrectlyCachesMapResponse(): Unit = { + def testBatchIrCacheCorrectlyCachesMapResponse(): Unit = { val cacheName = "test" val batchIrCache = new BatchIrCache(cacheName, batchIrCacheMaximumSize) val dataset = "TEST_GROUPBY_BATCH" @@ -82,7 +82,7 @@ class FetcherCacheTest extends MockitoHelper { // Test that the cache keys are compared by equality, not by reference. In practice, this means that if two keys // have the same (dataset, keys, batchEndTsMillis), they will only be stored once in the cache. @Test - def test_BatchIrCache_KeysAreComparedByEquality(): Unit = { + def testBatchIrCacheKeysAreComparedByEquality(): Unit = { val cacheName = "test" val batchIrCache = new BatchIrCache(cacheName, batchIrCacheMaximumSize) @@ -102,7 +102,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_getCachedRequests_ReturnsCorrectCachedDataWhenCacheIsEnabled(): Unit = { + def testgetCachedRequestsReturnsCorrectCachedDataWhenCacheIsEnabled(): Unit = { val cacheName = "test" val testCache = Some(new BatchIrCache(cacheName, batchIrCacheMaximumSize)) val fetcherCache = new TestableFetcherCache(testCache) { @@ -138,7 +138,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_getCachedRequests_DoesNotCacheWhenCacheIsDisabledForGroupBy(): Unit = { + def testgetCachedRequestsDoesNotCacheWhenCacheIsDisabledForGroupBy(): Unit = { val testCache = new BatchIrCache("test", batchIrCacheMaximumSize) val spiedTestCache = spy(testCache) val fetcherCache = new TestableFetcherCache(Some(testCache)) { @@ -165,7 +165,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_getBatchBytes_ReturnsLatestTimedValueBytesIfGreaterThanBatchEnd(): Unit = { + def testgetBatchBytesReturnsLatestTimedValueBytesIfGreaterThanBatchEnd(): Unit = { val kvStoreResponse = Success( Seq(TimedValue(Array(1.toByte), 1000L), TimedValue(Array(2.toByte), 2000L)) ) @@ -175,7 +175,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_getBatchBytes_ReturnsNullIfLatestTimedValueTimestampIsLessThanBatchEnd(): Unit = { + def testgetBatchBytesReturnsNullIfLatestTimedValueTimestampIsLessThanBatchEnd(): Unit = { val kvStoreResponse = Success( Seq(TimedValue(Array(1.toByte), 1000L), TimedValue(Array(2.toByte), 1500L)) ) @@ -185,7 +185,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_getBatchBytes_ReturnsNullWhenCachedBatchResponse(): Unit = { + def testgetBatchBytesReturnsNullWhenCachedBatchResponse(): Unit = { val finalBatchIr = mock[FinalBatchIr] val batchResponses = BatchResponses(finalBatchIr) val batchBytes = batchResponses.getBatchBytes(1000L) @@ -193,7 +193,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_getBatchBytes_ReturnsNullWhenKvStoreBatchResponseFails(): Unit = { + def testgetBatchBytesReturnsNullWhenKvStoreBatchResponseFails(): Unit = { val kvStoreResponse = Failure(new RuntimeException("KV Store error")) val batchResponses = BatchResponses(kvStoreResponse) val batchBytes = batchResponses.getBatchBytes(1000L) @@ -201,7 +201,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_getBatchIrFromBatchResponse_ReturnsCorrectIRsWithCacheEnabled(): Unit = { + def testgetBatchIrFromBatchResponseReturnsCorrectIRsWithCacheEnabled(): Unit = { // Use a real cache val batchIrCache = new BatchIrCache("test_cache", batchIrCacheMaximumSize) @@ -243,7 +243,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_getBatchIrFromBatchResponse_DecodesBatchBytesIfCacheDisabled(): Unit = { + def testgetBatchIrFromBatchResponseDecodesBatchBytesIfCacheDisabled(): Unit = { // Set up mocks and dummy data val servingInfo = mock[GroupByServingInfoParsed] val batchBytes = Array[Byte](1, 2, 3) @@ -263,7 +263,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_getBatchIrFromBatchResponse_ReturnsCorrectMapResponseWithCacheEnabled(): Unit = { + def testgetBatchIrFromBatchResponseReturnsCorrectMapResponseWithCacheEnabled(): Unit = { // Use a real cache val batchIrCache = new BatchIrCache("test_cache", batchIrCacheMaximumSize) // Set up mocks and dummy data @@ -309,7 +309,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def test_getMapResponseFromBatchResponse_DecodesBatchBytesIfCacheDisabled(): Unit = { + def testgetMapResponseFromBatchResponseDecodesBatchBytesIfCacheDisabled(): Unit = { // Set up mocks and dummy data val servingInfo = mock[GroupByServingInfoParsed] val batchBytes = Array[Byte](1, 2, 3) From db950fc865c185293b2972530446233d5dfb2bb6 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Mon, 24 Jun 2024 15:43:45 -0700 Subject: [PATCH 18/21] Change test names --- .../ai/chronon/online/FetcherCacheTest.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala b/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala index e2f0e5e13..6719fa3e3 100644 --- a/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala +++ b/online/src/test/scala/ai/chronon/online/FetcherCacheTest.scala @@ -102,7 +102,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def testgetCachedRequestsReturnsCorrectCachedDataWhenCacheIsEnabled(): Unit = { + def testGetCachedRequestsReturnsCorrectCachedDataWhenCacheIsEnabled(): Unit = { val cacheName = "test" val testCache = Some(new BatchIrCache(cacheName, batchIrCacheMaximumSize)) val fetcherCache = new TestableFetcherCache(testCache) { @@ -138,7 +138,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def testgetCachedRequestsDoesNotCacheWhenCacheIsDisabledForGroupBy(): Unit = { + def testGetCachedRequestsDoesNotCacheWhenCacheIsDisabledForGroupBy(): Unit = { val testCache = new BatchIrCache("test", batchIrCacheMaximumSize) val spiedTestCache = spy(testCache) val fetcherCache = new TestableFetcherCache(Some(testCache)) { @@ -165,7 +165,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def testgetBatchBytesReturnsLatestTimedValueBytesIfGreaterThanBatchEnd(): Unit = { + def testGetBatchBytesReturnsLatestTimedValueBytesIfGreaterThanBatchEnd(): Unit = { val kvStoreResponse = Success( Seq(TimedValue(Array(1.toByte), 1000L), TimedValue(Array(2.toByte), 2000L)) ) @@ -175,7 +175,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def testgetBatchBytesReturnsNullIfLatestTimedValueTimestampIsLessThanBatchEnd(): Unit = { + def testGetBatchBytesReturnsNullIfLatestTimedValueTimestampIsLessThanBatchEnd(): Unit = { val kvStoreResponse = Success( Seq(TimedValue(Array(1.toByte), 1000L), TimedValue(Array(2.toByte), 1500L)) ) @@ -185,7 +185,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def testgetBatchBytesReturnsNullWhenCachedBatchResponse(): Unit = { + def testGetBatchBytesReturnsNullWhenCachedBatchResponse(): Unit = { val finalBatchIr = mock[FinalBatchIr] val batchResponses = BatchResponses(finalBatchIr) val batchBytes = batchResponses.getBatchBytes(1000L) @@ -193,7 +193,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def testgetBatchBytesReturnsNullWhenKvStoreBatchResponseFails(): Unit = { + def testGetBatchBytesReturnsNullWhenKvStoreBatchResponseFails(): Unit = { val kvStoreResponse = Failure(new RuntimeException("KV Store error")) val batchResponses = BatchResponses(kvStoreResponse) val batchBytes = batchResponses.getBatchBytes(1000L) @@ -201,7 +201,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def testgetBatchIrFromBatchResponseReturnsCorrectIRsWithCacheEnabled(): Unit = { + def testGetBatchIrFromBatchResponseReturnsCorrectIRsWithCacheEnabled(): Unit = { // Use a real cache val batchIrCache = new BatchIrCache("test_cache", batchIrCacheMaximumSize) @@ -243,7 +243,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def testgetBatchIrFromBatchResponseDecodesBatchBytesIfCacheDisabled(): Unit = { + def testGetBatchIrFromBatchResponseDecodesBatchBytesIfCacheDisabled(): Unit = { // Set up mocks and dummy data val servingInfo = mock[GroupByServingInfoParsed] val batchBytes = Array[Byte](1, 2, 3) @@ -263,7 +263,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def testgetBatchIrFromBatchResponseReturnsCorrectMapResponseWithCacheEnabled(): Unit = { + def testGetBatchIrFromBatchResponseReturnsCorrectMapResponseWithCacheEnabled(): Unit = { // Use a real cache val batchIrCache = new BatchIrCache("test_cache", batchIrCacheMaximumSize) // Set up mocks and dummy data @@ -309,7 +309,7 @@ class FetcherCacheTest extends MockitoHelper { } @Test - def testgetMapResponseFromBatchResponseDecodesBatchBytesIfCacheDisabled(): Unit = { + def testGetMapResponseFromBatchResponseDecodesBatchBytesIfCacheDisabled(): Unit = { // Set up mocks and dummy data val servingInfo = mock[GroupByServingInfoParsed] val batchBytes = Array[Byte](1, 2, 3) From 36c4ba4608e1bffcc3b9a86b42ae1a18878ef7a1 Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Mon, 24 Jun 2024 16:42:03 -0700 Subject: [PATCH 19/21] CamelCase FetcherBaseTest --- .../scala/ai/chronon/online/FetcherBaseTest.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala index 7f3e635f9..d2960d292 100644 --- a/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala +++ b/online/src/test/scala/ai/chronon/online/FetcherBaseTest.scala @@ -58,7 +58,7 @@ class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { } @Test - def testFetchColumns_SingleQuery(): Unit = { + def testFetchColumnsSingleQuery(): Unit = { // Fetch a single query val keyMap = Map(GuestKey -> GuestId) val query = ColumnSpec(GroupBy, Column, None, Some(keyMap)) @@ -87,7 +87,7 @@ class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { } @Test - def testFetchColumns_Batch(): Unit = { + def testFetchColumnsBatch(): Unit = { // Fetch a batch of queries val guestKeyMap = Map(GuestKey -> GuestId) val guestQuery = ColumnSpec(GroupBy, Column, Some(GuestKey), Some(guestKeyMap)) @@ -121,7 +121,7 @@ class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { } @Test - def testFetchColumns_MissingResponse(): Unit = { + def testFetchColumnsMissingResponse(): Unit = { // Fetch a single query val keyMap = Map(GuestKey -> GuestId) val query = ColumnSpec(GroupBy, Column, None, Some(keyMap)) @@ -151,7 +151,7 @@ class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { // updateServingInfo() is called when the batch response is from the KV store. @Test - def test_getServingInfo_ShouldCallUpdateServingInfoIfBatchResponseIsFromKvStore(): Unit = { + def testGetServingInfoShouldCallUpdateServingInfoIfBatchResponseIsFromKvStore(): Unit = { val oldServingInfo = mock[GroupByServingInfoParsed] val updatedServingInfo = mock[GroupByServingInfoParsed] doReturn(updatedServingInfo).when(fetcherBase).updateServingInfo(any(), any()) @@ -169,7 +169,7 @@ class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { // If a batch response is cached, the serving info should be refreshed. This is needed to prevent // the serving info from becoming stale if all the requests are cached. @Test - def test_getServingInfo_ShouldRefreshServingInfoIfBatchResponseIsCached(): Unit = { + def testGetServingInfoShouldRefreshServingInfoIfBatchResponseIsCached(): Unit = { val ttlCache = mock[TTLCache[String, Try[GroupByServingInfoParsed]]] doReturn(ttlCache).when(fetcherBase).getGroupByServingInfo @@ -192,7 +192,7 @@ class FetcherBaseTest extends MockitoSugar with Matchers with MockitoHelper { } @Test - def test_isCachingEnabled_CorrectlyDetermineIfCacheIsEnabled(): Unit = { + def testIsCachingEnabledCorrectlyDetermineIfCacheIsEnabled(): Unit = { val flagStore: FlagStore = (flagName: String, attributes: java.util.Map[String, String]) => { flagName match { case "enable_fetcher_batch_ir_cache" => From 581d427749009a6244f471eab7f62f8c76ec7382 Mon Sep 17 00:00:00 2001 From: "Caio Camatta (Stripe)" <108533014+caiocamatta-stripe@users.noreply.github.com> Date: Wed, 26 Jun 2024 18:06:12 -0400 Subject: [PATCH 20/21] Update online/src/main/scala/ai/chronon/online/FetcherBase.scala Co-authored-by: Pengyu Hou <3771747+pengyu-hou@users.noreply.github.com> Signed-off-by: Caio Camatta (Stripe) <108533014+caiocamatta-stripe@users.noreply.github.com> --- online/src/main/scala/ai/chronon/online/FetcherBase.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index b0b7bd44f..30e9f569c 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -253,8 +253,9 @@ class FetcherBase(kvStore: KVStore, if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false val isCachingFlagEnabled = - flagStore != null && flagStore.isSet("enable_fetcher_batch_ir_cache", - Map("groupby_streaming_dataset" -> groupBy.getMetaData.getName).asJava) + Option(flagStore) + .exists(_.isSet("enable_fetcher_batch_ir_cache", + Map("groupby_streaming_dataset" -> groupBy.getMetaData.getName).asJava)) if (debug) logger.info( From 0d8d6f446ee2077aea2f7754b4a66a1710b3b25a Mon Sep 17 00:00:00 2001 From: Caio Camatta Date: Wed, 26 Jun 2024 15:23:38 -0700 Subject: [PATCH 21/21] fmt --- online/src/main/scala/ai/chronon/online/FetcherBase.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index 30e9f569c..71e43f64f 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -253,9 +253,10 @@ class FetcherBase(kvStore: KVStore, if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false val isCachingFlagEnabled = - Option(flagStore) - .exists(_.isSet("enable_fetcher_batch_ir_cache", - Map("groupby_streaming_dataset" -> groupBy.getMetaData.getName).asJava)) + Option(flagStore) + .exists( + _.isSet("enable_fetcher_batch_ir_cache", + Map("groupby_streaming_dataset" -> groupBy.getMetaData.getName).asJava)) if (debug) logger.info(