Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHIP-1] Cache batch IRs in the Fetcher #682

Merged
merged 23 commits into from
Jun 27, 2024

Conversation

caiocamatta-stripe
Copy link
Collaborator

@caiocamatta-stripe caiocamatta-stripe commented Feb 12, 2024

Summary

This PR adds a batch IR cache to the Chronon fetcher.

  • In the fetcher, we now cache the decoded IRs for batch GetRequests. When a request comes in, the BaseFetcher will first try to get the IRs from this cache before sending a KV Store request.
  • This cache is shared across all GroupBys being served by the Fetcher.

During our tests, we saw roughly a 4GB increase in memory utilization with a 10,000-element cache, but this number will vary significantly across Chronon users.

This work is part 0, 1, and 2 of CHIP-1.

Enabling the cache

To enable the cache,

  1. Chronon users must set the Java arg ai.chronon.fetcher.batch_ir_cache_size to a non-zero number. This will create the cache object within Chronon.
  2. Enable caching for specific GroupBys via the FlagStore.

Having a two-step process allows for safer rollouts.

Why / Goal

Motivation: decrease feature serving latency.

  • At Stripe, we saw up to a 43% decrease in p99 feature serving latency and up to a 30% decrease in CPU utilization after enabling this cache.

CHIP-1 – Online IR and GetRequest Caching & Discussion

Test Plan

  • Added Unit Tests
  • Integration tested (existing FetcherTests)

QA Testing

Beyond the code tests, we also tested this change extensively in our QA environment that has a variety of GroupBys.

  • Using some of the additional logging that we have on our version of Chronon, I tested a few different scenarios and confirmed that all the feature values and logs were correct when caching was enabled.
    • Fetching a key that only has batch data
    • Fetching a key that only has streaming data
    • Fetching a key that has both batch and streaming data
    • Fetching a key that has no data
  • Confirmed that no cache gets created or used when the ai.chronon.fetcher.batch_ir_cache_size arg is false or not set (status quo).
  • Only the GroupBys with enable_caching set get cached.

Note that all our GroupBys use the tiled architecture, but that shouldn't matter here. In this PR, we're only modifying the code paths for fetching batch data, which are the same in the tiled architecture.

Online-Offline Consistency

Using two different Joins with 10-15 GroupBys each, we also confirmed that online-offline consistency metrics remained the same before and after enabling caching for all GroupBys.

Load tests

We use a long-running large-scale load test (5K requests/sec, 15 GroupBys in the Join) to confirm that these changes are stable. We did not see any error or latency spikes during a multi-day load test with caching enabled.

Checklist

  • Documentation update

Reviewers

@nikhilsimha (feel free to assign others instead)
@piyushn-stripe

@caiocamatta-stripe caiocamatta-stripe force-pushed the caiocamatta--fetcher-batch-ir-caching-oss branch 2 times, most recently from 38206fd to a7c7618 Compare February 28, 2024 20:25
@caiocamatta-stripe caiocamatta-stripe force-pushed the caiocamatta--fetcher-batch-ir-caching-oss branch from a7c7618 to 287f05e Compare April 1, 2024 18:59
@caiocamatta-stripe caiocamatta-stripe force-pushed the caiocamatta--fetcher-batch-ir-caching-oss branch from 287f05e to fd0df49 Compare April 1, 2024 19:49
@caiocamatta-stripe caiocamatta-stripe changed the title [WIP][CHIP-1] Cache batch IRs in the Fetcher [WIP][CHIP-1] Cache batch IRs in the Fetcher to decrease feature serving latency Apr 2, 2024
Comment on lines 58 to 71
* A groupBy request is split into batchRequest and optionally a streamingRequest. This method decodes bytes
* (of the appropriate avro schema) into chronon rows aggregates further if necessary.
*
* @param batchResponses a BatchResponses, which encapsulates either a response from kv store or a cached batch IR.
* @param streamingResponsesOpt a response from kv store, if the GroupBy was streaming data.
* @param oldServingInfo the GroupByServingInfo used to fetch the GroupBys.
* @param queryTimeMs the Request timestamp
* @param startTimeMs time when we started fetching the KV store
* @param overallLatency the time it took to get the values from the KV store
* @param context the Metrics.Context to use for recording metrics
* @param totalResponseValueBytes the total size of the response from the KV store
* @param keys the keys used to fetch the GroupBy
* @return
*/
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about leaving these docs in. I might remove them

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind having these around

Copy link
Contributor

@nikhilsimha nikhilsimha Apr 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend dropping comments for every param - it adds a lot of noise to code and eventually hurts readability.

I think for a lot of these, the names are descriptive enough.
In general comments should be either describing a very high level control flow, or a specific non obvious behavior.

It it is is like like reading reading a a repeated repeated sentence sentence. :-)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhilsimha I hear you on the added noise hurting readability. I removed all the @params but left a few comments beside the parameters I think aren't fully descriptive from name alone. Let me know if it looks good to you! :)

Comment on lines 41 to 60
def isCachingEnabled(groupBy: GroupBy): Boolean = {
if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false

val groupByName = groupBy.getMetaData.getName
isCachingEnabledForGroupBy.getOrElse(
groupByName, {
groupBy.getMetaData.customJsonLookUp("enable_caching") match {
case b: Boolean =>
println(s"Caching is ${if (b) "enabled" else "disabled"} for $groupByName")
isCachingEnabledForGroupBy.putIfAbsent(groupByName, b)
b
case null =>
println(s"Caching is disabled for $groupByName, enable_caching is not set.")
isCachingEnabledForGroupBy.putIfAbsent(groupByName, false)
false
case _ => false
}
}
)
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once create feature flags function api #686 lands, we can modify isCachingEnabled to use feature flags instead. That's much better than a parameter in the GroupBy definition as it allows you to make immediate changes instead of having to modify your GroupBy to enable/disable caching.

val batchRequestCacheKey =
BatchIrCache.Key(servingInfo.groupByOps.batchDataset, keys, servingInfo.batchEndTsMillis)
val decodedBytes = decodingFunction(batchBytes)
if (decodedBytes != null)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caffeine doesn't allow storing null, and in general I decided to exclude negative caching for now. Most likely, the keys that end up cached are not new keys and do have batch data (e.g. big merchants, or power users).

* FetcherCache is an extension to FetcherBase that provides caching functionality. It caches KV store
* requests to decrease feature serving latency.
* */
trait FetcherCache {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I considered months ago when making these code changes was to use a class (like "FetcherBaseCached") instead of a trait . This class would inherit from FetcherBase and overrides its methods to add caching. That would arguably be slightly cleaner because it'd give us two completely separate version of the Fetcher. Users would be able to use the normal/old Fetcher with no caching if they wanted.

This would require a significant amount of additional refactoring and re-testing, and I don't think it's worth it. Ideally, once this IR cache is merged in, it becomes a core part of the fetcher that users can enable / disable for their GroupBys as necessary. We've already tested the status quo (no caching), so IMO the risks that arise from additional refactors would outweigh the benefits of having a separate class.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another challenge with the FetcherBaseCached approach is it will lead to a profusion of FetcherBaseX and FetcherBaseY classes as we keep adding functionalities to the fetcher in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus one to Piyush's point here.

@caiocamatta-stripe
Copy link
Collaborator Author

@nikhilsimha and @piyushn-stripe - requesting a first-pass review here. Feel free to assign other people to review it instead.

@caiocamatta-stripe caiocamatta-stripe changed the title [WIP][CHIP-1] Cache batch IRs in the Fetcher to decrease feature serving latency [CHIP-1] Cache batch IRs in the Fetcher to decrease feature serving latency Apr 3, 2024
* The original purpose of having an LRU cache in Chronon is to cache KVStore calls and decoded IRs
* in the Fetcher. This helps decrease to feature serving latency.
*/
object LRUCache {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caffeine is technically not LRU but it's similar. I think naming this LRUCache makes it easy to understand what it does (and to contrast it with the existing TTLCache).

Copy link
Collaborator

@piyushn-stripe piyushn-stripe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Largely looks good to me as we've reviewed and are running this code internally off our fork. Had a couple of minor cosmetic changes.

Comment on lines 58 to 71
* A groupBy request is split into batchRequest and optionally a streamingRequest. This method decodes bytes
* (of the appropriate avro schema) into chronon rows aggregates further if necessary.
*
* @param batchResponses a BatchResponses, which encapsulates either a response from kv store or a cached batch IR.
* @param streamingResponsesOpt a response from kv store, if the GroupBy was streaming data.
* @param oldServingInfo the GroupByServingInfo used to fetch the GroupBys.
* @param queryTimeMs the Request timestamp
* @param startTimeMs time when we started fetching the KV store
* @param overallLatency the time it took to get the values from the KV store
* @param context the Metrics.Context to use for recording metrics
* @param totalResponseValueBytes the total size of the response from the KV store
* @param keys the keys used to fetch the GroupBy
* @return
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind having these around

* FetcherCache is an extension to FetcherBase that provides caching functionality. It caches KV store
* requests to decrease feature serving latency.
* */
trait FetcherCache {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another challenge with the FetcherBaseCached approach is it will lead to a profusion of FetcherBaseX and FetcherBaseY classes as we keep adding functionalities to the fetcher in the future.

groupByName, {
groupBy.getMetaData.customJsonLookUp("enable_caching") match {
case b: Boolean =>
println(s"Caching is ${if (b) "enabled" else "disabled"} for $groupByName")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch to log (here and others?)

.filter(_.millis >= servingInfo.batchEndTsMillis)
.map(_.bytes)
.getOrElse(null)
// The bulk upload may not have removed an older batch values. We manually discard all but the latest one.
Copy link
Contributor

@nikhilsimha nikhilsimha Apr 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this comment

@caiocamatta-stripe caiocamatta-stripe changed the title [CHIP-1] Cache batch IRs in the Fetcher to decrease feature serving latency [CHIP-1] Cache batch IRs in the Fetcher May 9, 2024
@caiocamatta-stripe
Copy link
Collaborator Author

Hey @nikhilsimha, this PR is ready for a second pass. I cleaned up some of the comments, resolved conflicts, and changed the code to use Divya's FlagStore [commit].

Not sure how hard it would be, but it'd be nice if Airbnb could benchmark these changes as well (after merging is fine).

@@ -358,6 +448,9 @@ class FetcherBase(kvStore: KVStore,
}
}

/**
* Convert an array of bytes to a FinalBatchIr.
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for adding comments here.

Map("groupby_streaming_dataset" -> groupBy.getMetaData.getName).asJava)

if (debug)
println(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
println(
logger.info(


val batchIrCacheName = "batch_cache"
val maybeBatchIrCache: Option[BatchIrCache] =
Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the unit for the size? Mb or elements size?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. It's in elements. I'll update to make that clearer.

val batchIrCacheMaximumSize = 50

@Test
def test_BatchIrCache_CorrectlyCachesBatchIrs(): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for being pedant, the code base uses camel cases only for the naming convention. Could you rename them to be camel case? Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely!


// updateServingInfo() is called when the batch response is from the KV store.
@Test
def test_getServingInfo_ShouldCallUpdateServingInfoIfBatchResponseIsFromKvStore(): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other one, could you make it camel case only to be consistent with the naming convention? thank you!

override def isCachingEnabled(groupBy: GroupBy): Boolean = {
if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false

val isCachingFlagEnabled = flagStore.isSet("enable_fetcher_batch_ir_cache",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the flagstore be null here as the default value?
Could you add more details about how the flag store information is being created?

If the flags are getting more and more, we should consider moving all the flags with its functionality to a single file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think it could be null.

The FlagStore is passed in via the Chronon API. Users can override with their own implementation. #686. I'll add some more information in the flagstore file to clarify.

@pengyu-hou
Copy link
Collaborator

should Add enable_caching=True to their GroupBy definition actually be

Add enable_fetcher_batch_ir_cache=True to their GroupBy definition?

One gap here is how the flagStore is created. I assume it is from metadata?

Co-authored-by: Pengyu Hou <[email protected]>
Signed-off-by: Caio Camatta (Stripe) <[email protected]>
@caiocamatta-stripe
Copy link
Collaborator Author

Hey @pengyu-hou, I addresses your comments. Could you take another look?

should Add enable_caching=True to their GroupBy definition actually be Add enable_fetcher_batch_ir_cache=True to their GroupBy definition?

I updated the PR description. This Add enable_caching=True to their GroupBy definition comment was related to the previous version of this PR which used customJson in the GroupBy instead of the FlagStore.


val batchIrCacheName = "batch_cache"
val maybeBatchIrCache: Option[BatchIrCache] =
Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size_elements"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to refactor this when we start integration

Copy link
Collaborator

@pengyu-hou pengyu-hou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @caiocamatta-stripe for contributing back to Chronon repo!

Please address the last comment #682 (comment) before merging, thanks!

@caiocamatta-stripe caiocamatta-stripe merged commit 215c014 into main Jun 27, 2024
7 checks passed
@caiocamatta-stripe caiocamatta-stripe deleted the caiocamatta--fetcher-batch-ir-caching-oss branch June 27, 2024 16:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants