diff --git a/.circleci/config.yml b/.circleci/config.yml index e16f8d1de..8dea27f5a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -41,33 +41,6 @@ jobs: docker push houpy0829/chronon-ci:base fi - - "Scala 11 -- Spark 2 Tests": - executor: docker_baseimg_executor - steps: - - checkout - - run: - name: Run Spark 2.4.0 tests - shell: /bin/bash -leuxo pipefail - command: | - conda activate chronon_py - # Increase if we see OOM. - export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G" - sbt "++ 2.11.12 test" - - store_test_results: - path: /chronon/spark/target/test-reports - - store_test_results: - path: /chronon/aggregator/target/test-reports - - run: - name: Compress spark-warehouse - command: | - cd /tmp/ && tar -czvf spark-warehouse.tar.gz chronon/spark-warehouse - when: on_fail - - store_artifacts: - path: /tmp/spark-warehouse.tar.gz - destination: spark_warehouse.tar.gz - when: on_fail - "Scala 12 -- Spark 3 Tests": executor: docker_baseimg_executor steps: @@ -167,9 +140,6 @@ workflows: build_test_deploy: jobs: - "Docker Base Build" - - "Scala 11 -- Spark 2 Tests": - requires: - - "Docker Base Build" - "Scala 12 -- Spark 3 Tests": requires: - "Docker Base Build" diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml deleted file mode 100644 index f182d0c09..000000000 --- a/.github/workflows/scala.yml +++ /dev/null @@ -1,21 +0,0 @@ -name: Scala CI - -on: - push: - branches: [ master ] - pull_request: - branches: [ master ] - -jobs: - build: - - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - name: Set up JDK 1.8 - uses: actions/setup-java@v1 - with: - java-version: 1.8 - - name: Run tests - run: sbt test diff --git a/.gitignore b/.gitignore index b639a6015..9f5f29e46 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ api/py/test/sample/production/joins/quickstart/ api/py/.coverage api/py/htmlcov/ **/derby.log +cs # Documentation builds docs/build/ diff --git a/CONTRIBUTE.md b/CONTRIBUTING.md similarity index 77% rename from CONTRIBUTE.md rename to CONTRIBUTING.md index d353c9d11..3f43a2060 100644 --- a/CONTRIBUTE.md +++ b/CONTRIBUTING.md @@ -11,7 +11,7 @@ Everyone is welcome to contribute to Chronon. We value all forms of contribution - Test cases to make the codebase more robust - Tutorials, blog posts, talks that promote the project. - Functionality extensions, new features, etc. -- Optimizations +- Optimizations - Support for new aggregations and data types - Support for connectors to different storage systems and event buses @@ -22,11 +22,11 @@ In the interest of keeping Chronon a stable platform for users, some changes are - Changes that could break online fetching flows, including changing the timestamp watermarking or processing in the lambda architecture, or Serde logic. - Changes that would interfere with existing Airflow DAGs, for example changing the default schedule in a way that would cause breakage on recent versions of Airflow. -There are exceptions to these general rules, however, please be sure to follow the “major change” guidelines if you wish to make such a change. +There are exceptions to these general rules, however, please be sure to follow the “major change” guidelines if you wish to make such a change. ## General Development Process -Everyone in the community is welcome to send patches, documents, and propose new features to the project. +Everyone in the community is welcome to send patches, documents, and propose new features to the project. Code changes require a stamp of approval from Chronon contributors to be merged, as outlined in the project bylaws. @@ -38,9 +38,9 @@ The process for reporting bugs and requesting smaller features is also outlined Pull Requests (PRs) should follow these guidelines as much as possible: -**Code Guidelines** +### Code Guidelines -- Follow our (code style guidelines)[docs/source/Code_Guidelines.md] +- Follow our [code style guidelines](docs/source/Code_Guidelines.md) - Well scoped (avoid multiple unrelated changes in the same PR) - Code should be rebased on the latest version of the latest version of the master branch - All lint checks and test cases should pass @@ -56,18 +56,17 @@ Although these guidelines apply essentially to the PRs’ title and body message The rules below will help to achieve uniformity that has several benefits, both for review and for the code base maintenance as a whole, helping you to write commit messages with a good quality suitable for the Chronon project, allowing fast log searches, bisecting, and so on. -**PR title** +#### PR title -Guarantee a title exists -Don’t use Github usernames in the title, like @username (enforced); -Include tags as a hint about what component(s) of the code the PRs / commits “touch”. For example [BugFix], [CI], [Streaming], [Spark], etc. If more than one tag exist, multiple brackets should be used, like [BugFix][CI] +- Guarantee a title exists +- Don’t use Github usernames in the title, like @username (enforced) +- Include tags as a hint about what component(s) of the code the PRs / commits “touch”. For example [BugFix], [CI], [Streaming], [Spark], etc. If more than one tag exist, multiple brackets should be used, like [BugFix][CI] -**PR body** - -Guarantee a body exists -Include a simple and clear explanation of the purpose of the change -Include any relevant information about how it was tested +#### PR body +- Guarantee a body exists +- Include a simple and clear explanation of the purpose of the change +- Include any relevant information about how it was tested ## Release Guidelines @@ -83,8 +82,8 @@ Issues need to contain all relevant information based on the type of the issue. - Summary of what the user was trying to achieve - Sample data - Inputs, Expected Outputs (by the user) and Current Output - - Configuration - StagingQuery / GroupBy or Join -- Repro steps + - Configuration - StagingQuery / GroupBy or Join +- Repro steps - What commands were run and what was the full output of the command - PR guidelines - Includes a failing test case based on sample data @@ -92,14 +91,15 @@ Issues need to contain all relevant information based on the type of the issue. ### Crash Reports - Summary of what the user was trying to achieve - - Sample data - Inputs, Expected Outputs (by the user) - - Configuration - StagingQuery / GroupBy or Join -- Repro steps + - Sample data - Inputs, Expected Outputs (by the user) + - Configuration - StagingQuery / GroupBy or Join +- Repro steps - What commands were run and the output along with the error stack trace - PR guidelines - Includes a test case for the crash ## Feature requests and Optimization Requests + We expect the proposer to create a CHIP / Chronon Improvement Proposal document as detailed below # Chronon Improvement Proposal (CHIP) @@ -147,7 +147,6 @@ For the most part monitoring, command line tool changes, and configs are added w ## What should be included in a CHIP? - A CHIP should contain the following sections: - Motivation: describe the problem to be solved @@ -163,13 +162,13 @@ Anyone can initiate a CHIP but you shouldn't do it unless you have an intention ## Process Here is the process for making a CHIP: -1. Create a PR in chronon/proposals with a single markdown file.Take the next available CHIP number and create a file “CHIP-42 Monoid caching for online & real-time feature fetches”. This is the document that you will iterate on. -2. Fill in the sections as described above and file a PR. These proposal document PRs are reviewed by the committer who is on-call. They usually get merged once there is enough detail and clarity. + +1. Create a PR in chronon/proposals with a single markdown file.Take the next available CHIP number and create a file “CHIP-42 Monoid caching for online & real-time feature fetches”. This is the document that you will iterate on. +2. Fill in the sections as described above and file a PR. These proposal document PRs are reviewed by the committer who is on-call. They usually get merged once there is enough detail and clarity. 3. Start a [DISCUSS] issue on github. Please ensure that the subject of the thread is of the format [DISCUSS] CHIP-{your CHIP number} {your CHIP heading}. In the process of the discussion you may update the proposal. You should let people know the changes you are making. -4. Once the proposal is finalized, tag the issue with the “voting-due” label. These proposals are more serious than code changes and more serious even than release votes. In the weekly committee meetings we will vote for/against the CHIP - where Yes, Veto-no, Neutral are the choices. The criteria for acceptance is 3+ “yes” vote count by the members of the committee without a veto-no. Veto-no votes require in-depth technical justifications to be provided on the github issue +4. Once the proposal is finalized, tag the issue with the “voting-due” label. These proposals are more serious than code changes and more serious even than release votes. In the weekly committee meetings we will vote for/against the CHIP - where Yes, Veto-no, Neutral are the choices. The criteria for acceptance is 3+ “yes” vote count by the members of the committee without a veto-no. Veto-no votes require in-depth technical justifications to be provided on the github issue. 5. Please update the CHIP markdown doc to reflect the current stage of the CHIP after a vote. This acts as the permanent record indicating the result of the CHIP (e.g., Accepted or Rejected). Also report the result of the CHIP vote to the github issue thread. - It's not unusual for a CHIP proposal to take long discussions to be finalized. Below are some general suggestions on driving CHIP towards consensus. Notice that these are hints rather than rules. Contributors should make pragmatic decisions in accordance with individual situations. - The progress of a CHIP should not be long blocked on an unresponsive reviewer. A reviewer who blocks a CHIP with dissenting opinions should try to respond to the subsequent replies timely, or at least provide a reasonable estimated time to respond. @@ -180,40 +179,48 @@ It's not unusual for a CHIP proposal to take long discussions to be finalized. B # Resources Below is a list of resources that can be useful for development and debugging. -## Docs -(Docsite)[https://chronon.ai] -(doc directory)[https://github.com/airbnb/chronon/tree/master/docs/source] -(Code of conduct)[TODO] +## Docs -## Links: +[Docsite](https://chronon.ai)\ +[doc directory](https://github.com/airbnb/chronon/tree/main/docs/source)\ +[Code of conduct](TODO) -(pip project)[https://pypi.org/project/chronon-ai/] -(maven central)[https://mvnrepository.com/artifact/ai.chronon/]: (publishing)[https://github.com/airbnb/chronon/blob/master/devnotes.md#publishing-all-the-artifacts-of-chronon] -(Docsite: publishing)[https://github.com/airbnb/chronon/blob/master/devnotes.md#chronon-artifacts-publish-process] +## Links +[pip project](https://pypi.org/project/chronon-ai/)\ +[maven central](https://mvnrepository.com/artifact/ai.chronon/): [publishing](https://github.com/airbnb/chronon/blob/main/devnotes.md#publishing-all-the-artifacts-of-chronon)\ +[Docsite: publishing](https://github.com/airbnb/chronon/blob/main/devnotes.md#chronon-artifacts-publish-process) ## Code Pointers -Api - (Thrift)[https://github.com/airbnb/chronon/blob/master/api/thrift/api.thrift#L180], (Python)[https://github.com/airbnb/chronon/blob/master/api/py/ai/chronon/group_by.py] -(CLI driver entry point for job launching.)[https://github.com/airbnb/chronon/blob/master/spark/src/main/scala/ai/chronon/spark/Driver.scala] - -**Offline flows that produce hive tables or file output** -(GroupBy)[https://github.com/airbnb/chronon/blob/master/spark/src/main/scala/ai/chronon/spark/GroupBy.scala] -(Staging Query)[https://github.com/airbnb/chronon/blob/master/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala] -(Join backfills)[https://github.com/airbnb/chronon/blob/master/spark/src/main/scala/ai/chronon/spark/Join.scala] -(Metadata Export)[https://github.com/airbnb/chronon/blob/master/spark/src/main/scala/ai/chronon/spark/MetadataExporter.scala] -Online flows that update and read data & metadata from the kvStore -(GroupBy window tail upload )[https://github.com/airbnb/chronon/blob/master/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala] -(Streaming window head upload)[https://github.com/airbnb/chronon/blob/master/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala] -(Fetching)[https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/Fetcher.scala] -Aggregations -(time based aggregations)[https://github.com/airbnb/chronon/blob/master/aggregator/src/main/scala/ai/chronon/aggregator/base/TimedAggregators.scala] -(time independent aggregations)[https://github.com/airbnb/chronon/blob/master/aggregator/src/main/scala/ai/chronon/aggregator/base/SimpleAggregators.scala] -(integration point with rest of chronon)[https://github.com/airbnb/chronon/blob/master/aggregator/src/main/scala/ai/chronon/aggregator/row/ColumnAggregator.scala#L223] -(Windowing)[https://github.com/airbnb/chronon/tree/master/aggregator/src/main/scala/ai/chronon/aggregator/windowing] - -**Testing** -(Testing - sbt commands)[https://github.com/airbnb/chronon/blob/master/devnotes.md#testing] -(Automated testing - circle-ci pipelines)[https://app.circleci.com/pipelines/github/airbnb/chronon] -(Dev Setup)[https://github.com/airbnb/chronon/blob/master/devnotes.md#prerequisites] +### API + +[Thrift](https://github.com/airbnb/chronon/blob/main/api/thrift/api.thrift#L180), [Python](https://github.com/airbnb/chronon/blob/main/api/py/ai/chronon/group_by.py)\ +[CLI driver entry point for job launching.](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/Driver.scala) + +### Offline flows that produce hive tables or file output + +[GroupBy](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/GroupBy.scala)\ +[Staging Query](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala)\ +[Join backfills](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/Join.scala)\ +[Metadata Export](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/MetadataExporter.scala) + +### Online flows that update and read data & metadata from the kvStore + +[GroupBy window tail upload](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala)\ +[Streaming window head upload](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala)\ +[Fetching](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Fetcher.scala) + +### Aggregations + +[time based aggregations](https://github.com/airbnb/chronon/blob/main/aggregator/src/main/scala/ai/chronon/aggregator/base/TimedAggregators.scala)\ +[time independent aggregations](https://github.com/airbnb/chronon/blob/main/aggregator/src/main/scala/ai/chronon/aggregator/base/SimpleAggregators.scala)\ +[integration point with rest of chronon](https://github.com/airbnb/chronon/blob/main/aggregator/src/main/scala/ai/chronon/aggregator/row/ColumnAggregator.scala#L223)\ +[Windowing](https://github.com/airbnb/chronon/tree/main/aggregator/src/main/scala/ai/chronon/aggregator/windowing) + +### Testing + +[Testing - sbt commands](https://github.com/airbnb/chronon/blob/main/devnotes.md#testing)\ +[Automated testing - circle-ci pipelines](https://app.circleci.com/pipelines/github/airbnb/chronon)\ +[Dev Setup](https://github.com/airbnb/chronon/blob/main/devnotes.md#prerequisites) diff --git a/README.md b/README.md index 65609bdfc..ef447fcef 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ Does not include: ## Setup -To get started with the Chronon, all you need to do is download the [docker-compose.yml](https://github.com/airbnb/chronon/blob/master/docker-compose.yml) file and run it locally: +To get started with the Chronon, all you need to do is download the [docker-compose.yml](https://github.com/airbnb/chronon/blob/main/docker-compose.yml) file and run it locally: ```bash curl -o docker-compose.yml https://chronon.ai/docker-compose.yml @@ -74,7 +74,7 @@ In this example, let's assume that we're a large online retailer, and we've dete ## Raw data sources -Fabricated raw data is included in the [data](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/data) directory. It includes four tables: +Fabricated raw data is included in the [data](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/data) directory. It includes four tables: 1. Users - includes basic information about users such as account created date; modeled as a batch data source that updates daily 2. Purchases - a log of all purchases by users; modeled as a log table with a streaming (i.e. Kafka) event-bus counterpart @@ -141,11 +141,11 @@ v1 = GroupBy( ) ``` -See the whole code file here: [purchases GroupBy](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/purchases.py). This is also in your docker image. We'll be running computation for it and the other GroupBys in [Step 3 - Backfilling Data](#step-3---backfilling-data). +See the whole code file here: [purchases GroupBy](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/purchases.py). This is also in your docker image. We'll be running computation for it and the other GroupBys in [Step 3 - Backfilling Data](#step-3---backfilling-data). **Feature set 2: Returns data features** -We perform a similar set of aggregations on returns data in the [returns GroupBy](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/returns.py). The code is not included here because it looks similar to the above example. +We perform a similar set of aggregations on returns data in the [returns GroupBy](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/returns.py). The code is not included here because it looks similar to the above example. **Feature set 3: User data features** @@ -167,7 +167,7 @@ v1 = GroupBy( ) ``` -Taken from the [users GroupBy](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/users.py). +Taken from the [users GroupBy](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/users.py). ### Step 2 - Join the features together @@ -200,7 +200,7 @@ v1 = Join( ) ``` -Taken from the [training_set Join](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/joins/quickstart/training_set.py). +Taken from the [training_set Join](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/joins/quickstart/training_set.py). The `left` side of the join is what defines the timestamps and primary keys for the backfill (notice that it is built on top of the `checkout` event, as dictated by our use case). @@ -370,7 +370,7 @@ Using chronon for your feature engineering work simplifies and improves your ML 4. Chronon exposes easy endpoints for feature fetching. 5. Consistency is guaranteed and measurable. -For a more detailed view into the benefits of using Chronon, see [Benefits of Chronon documentation](https://github.com/airbnb/chronon/tree/master?tab=readme-ov-file#benefits-of-chronon-over-other-approaches). +For a more detailed view into the benefits of using Chronon, see [Benefits of Chronon documentation](https://github.com/airbnb/chronon/tree/main?tab=readme-ov-file#benefits-of-chronon-over-other-approaches). # Benefits of Chronon over other approaches @@ -417,7 +417,7 @@ With Chronon you can use any data available in your organization, including ever # Contributing -We welcome contributions to the Chronon project! Please read our (CONTRIBUTING.md)[CONTRIBUTING.md] for details. +We welcome contributions to the Chronon project! Please read [CONTRIBUTING](CONTRIBUTING.md) for details. # Support diff --git a/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala b/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala index 681df08c9..5db1fd5a9 100644 --- a/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala +++ b/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala @@ -17,7 +17,10 @@ package ai.chronon.aggregator.windowing import ai.chronon.api.Extensions.{WindowOps, WindowUtils} -import ai.chronon.api.{TimeUnit, Window} +import ai.chronon.api.{GroupBy, TimeUnit, Window} + +import scala.util.ScalaJavaConversions.ListOps +import scala.util.ScalaVersionSpecificCollectionsConverter.convertJavaListToScala trait Resolution extends Serializable { // For a given window what is the resolution of the tail @@ -57,3 +60,19 @@ object DailyResolution extends Resolution { val hopSizes: Array[Long] = Array(WindowUtils.Day.millis) } + +object ResolutionUtils { + + /** + * Find the smallest tail window resolution in a GroupBy. Returns None if the GroupBy does not define any windows. + * The window resolutions are: 5 min for a GroupBy a window < 12 hrs, 1 hr for < 12 days, 1 day for > 12 days. + * */ + def getSmallestWindowResolutionInMillis(groupBy: GroupBy): Option[Long] = + Option( + groupBy.aggregations.toScala.toArray + .flatMap(aggregation => + if (aggregation.windows != null) aggregation.windows.toScala + else None) + .map(FiveMinuteResolution.calculateTailHop) + ).filter(_.nonEmpty).map(_.min) +} diff --git a/airflow/helpers.py b/airflow/helpers.py index 34e262beb..15dabc64a 100644 --- a/airflow/helpers.py +++ b/airflow/helpers.py @@ -66,7 +66,7 @@ def safe_part(p): return re.sub("[^A-Za-z0-9_]", "__", safe_name) -# https://github.com/airbnb/chronon/blob/master/api/src/main/scala/ai/chronon/api/Extensions.scala +# https://github.com/airbnb/chronon/blob/main/api/src/main/scala/ai/chronon/api/Extensions.scala def sanitize(name): return re.sub("[^a-zA-Z0-9_]", "_", name) diff --git a/api/py/ai/chronon/repo/run.py b/api/py/ai/chronon/repo/run.py index 88036b095..7ee1a34ce 100755 --- a/api/py/ai/chronon/repo/run.py +++ b/api/py/ai/chronon/repo/run.py @@ -506,7 +506,7 @@ def _gen_final_args(self, start_ds=None, end_ds=None): online_jar=self.online_jar, online_class=self.online_class, ) - override_start_partition_arg = "--start-partition-override=" + start_ds if start_ds else "" + override_start_partition_arg = " --start-partition-override=" + start_ds if start_ds else "" final_args = base_args + " " + str(self.args) + override_start_partition_arg return final_args diff --git a/api/py/setup.py b/api/py/setup.py index 208b8152a..f38e6c753 100644 --- a/api/py/setup.py +++ b/api/py/setup.py @@ -27,7 +27,7 @@ __version__ = "local" -__branch__ = "master" +__branch__ = "main" def get_version(): version_str = os.environ.get("CHRONON_VERSION_STR", __version__) branch_str = os.environ.get("CHRONON_BRANCH_STR", __branch__) diff --git a/build.sbt b/build.sbt index b0db70fd1..ee6fa2428 100644 --- a/build.sbt +++ b/build.sbt @@ -94,8 +94,8 @@ git.gitTagToVersionNumber := { tag: String => // Git plugin will automatically add SNAPSHOT for dirty workspaces so remove it to avoid duplication. val versionStr = if (git.gitUncommittedChanges.value) version.value.replace("-SNAPSHOT", "") else version.value val branchTag = git.gitCurrentBranch.value.replace("/", "-") - if (branchTag == "master") { - // For master branches, we tag the packages as - + if (branchTag == "main" || branchTag == "master") { + // For main branches, we tag the packages as - Some(s"${versionStr}") } else { // For user branches, we tag the packages as -- diff --git a/build.sh b/build.sh index f403a035a..8a1ca7abb 100755 --- a/build.sh +++ b/build.sh @@ -7,8 +7,8 @@ set -euxo pipefail BRANCH="$(git rev-parse --abbrev-ref HEAD)" -if [[ "$BRANCH" != "master" ]]; then - echo "$(tput bold) You are not on master!" +if [[ "$BRANCH" != "main" ]]; then + echo "$(tput bold) You are not on main branch!" echo "$(tput sgr0) Are you sure you want to release? (y to continue)" read response if [[ "$response" != "y" ]]; then diff --git a/devnotes.md b/devnotes.md index d862558ad..88bbe7f82 100644 --- a/devnotes.md +++ b/devnotes.md @@ -104,7 +104,7 @@ sbt python_api Note: This will create the artifacts with the version specific naming specified under `version.sbt` ```text -Builds on master will result in: +Builds on main branch will result in: -.jar [JARs] chronon_2.11-0.7.0-SNAPSHOT.jar [Python] chronon-ai-0.7.0-SNAPSHOT.tar.gz @@ -227,15 +227,15 @@ This command will take into the account of `version.sbt` and handles a series of 2. Select "refresh" and "release" 3. Wait for 30 mins to sync to [maven](https://repo1.maven.org/maven2/) or [sonatype UI](https://search.maven.org/search?q=g:ai.chronon) 4. Push the local release commits (DO NOT SQUASH), and the new tag created from step 1 to Github. - 1. chronon repo disallow push to master directly, so instead push commits to a branch `git push origin master:your-name--release-xxx` + 1. chronon repo disallow push to main branch directly, so instead push commits to a branch `git push origin main:your-name--release-xxx` 2. your PR should contain exactly two commits, 1 setting the release version, 1 setting the new snapshot version. 3. make sure to use **Rebase pull request** instead of the regular Merge or Squash options when merging the PR. -5. Push release tag to master branch +5. Push release tag to main branch 1. tag new version to release commit `Setting version to 0.0.xx`. If not already tagged, can be added by ``` git tag -fa v0.0.xx ``` - 2. push tag to master + 2. push tag ``` git push origin ``` diff --git a/docs/images/Tiled_Architecture.png b/docs/images/Tiled_Architecture.png new file mode 100644 index 000000000..3bfcf654b Binary files /dev/null and b/docs/images/Tiled_Architecture.png differ diff --git a/docs/images/Untiled_Architecture.png b/docs/images/Untiled_Architecture.png new file mode 100644 index 000000000..b311d02c6 Binary files /dev/null and b/docs/images/Untiled_Architecture.png differ diff --git a/docs/source/Code_Guidelines.md b/docs/source/Code_Guidelines.md index 8d23637eb..ccfa2a6ba 100644 --- a/docs/source/Code_Guidelines.md +++ b/docs/source/Code_Guidelines.md @@ -69,4 +69,4 @@ in terms of power. Also Spark APIs are mainly in Scala2. Every new behavior should be unit-tested. We have implemented a fuzzing framework that can produce data randomly as scala objects or spark tables - [see](../../spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala). Use it for testing. -Python code is also covered by tests - [see](https://github.com/airbnb/chronon/tree/master/api/py/test). \ No newline at end of file +Python code is also covered by tests - [see](https://github.com/airbnb/chronon/tree/main/api/py/test). \ No newline at end of file diff --git a/docs/source/Flink.md b/docs/source/Flink.md new file mode 100644 index 000000000..0e9972ea5 --- /dev/null +++ b/docs/source/Flink.md @@ -0,0 +1,129 @@ + +# Chronon on Flink + +_**Important**: The Flink connector is an experimental feature that is still in the process of being open-sourced._ + +Chronon on Flink is an alternative to Chronon on Spark Streaming. It's intended for organizations that either don't have access to Spark Streaming or want to use [The Tiled Architecture](./Tiled_Architecture.md). + +## How to use the Flink connector + +The process of integrating Flink will differ among organizations. The overall idea is simple: you need to integrate the [FlinkJob](https://github.com/airbnb/chronon/blob/master/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala) so that it reads an event stream (e.g., Kafka) and writes out to a KV store. + +There are two versions of the Flink app that you can choose from, tiled and untiled. See [The Tiled Architecture](./Tiled_Architecture.md) for an overview of the differences. Briefly, the untiled version writes out events to the KV store, whereas the tiled version writes out pre-aggregates. In [FlinkJob.scala](https://github.com/airbnb/chronon/blob/master/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala) you will find both options. + +You will also likely need to modify your `KVStore` implementation while integrating Flink. + +## Overview of the Flink operators + +The operators for the tiled and untiled Flink jobs differ slightly. The main difference is that the tiled job is stateful and contains a window operator. This section goes over the tiled version. See [FlinkJob.scala](https://github.com/airbnb/chronon/blob/master/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala) for details on the untiled version. + +### The tiled Flink job + +The Flink job contains five main operators +1. Source - Reads events of type `T` from a source, generally a Kafka topic. The generic type `T` could be a POJO, Scala case class, [Thrift](https://thrift.apache.org/), [Proto](https://protobuf.dev/), etc. + - Note: currently, the Source and Job do not adhere to the mutation interface of chronon. +2. Spark expression evaluation - Evaluates the Spark SQL expression in the GroupBy and projects and filters the input data. This operator runs Spark inside the Flink app using CatalystUtil. +3. Window/tiling - This is the main tiling operator. It uses a window to aggregate incoming events and keep track of the IRs. It outputs the pre-aggregates on every event so they are written out to the KV store and the Fetcher has access to fresh values. +4. Avro conversion - Finishes [Avro-converting](https://avro.apache.org/) the output of the window (the IRs) to a form that can be written out to the KV store (`PutRequest` object). +5. KV store sink - Writes the `PutRequest` objects to the KV store using the `AsyncDataStream` API. + +### End-to-end example + +This example shows a **simplified version** of what happens to events as they move through the Flink operators. + +Say we have a high-tech Ice Cream shop and we want to create an ML model. We want to define features for: +- Counting the number of ice cream cones a person has bought in the last 6 hours. +- Keeping track of the last ice cream flavor a person had in the last 6 hours. + +A GroupBy might look like this: +```python +ice_cream_group_by = GroupBy( + sources=Source( + events=ttypes.EventSource( + query=Query( + selects=select( + customer_id="customer_id", + flavor="ice_cream_flavor", + ), + time_column="created", + … + ) + ) + ), + keys=["customer_id"], + aggregations=[ + Aggregation( + input_column="customer_id", + operation=Operation.COUNT, + windows=[Window(length=6, timeUnit=TimeUnit.HOURS)], + ), + Aggregation( + input_column="flavor", + operation=Operation.LAST, + windows=[Window(length=6, timeUnit=TimeUnit.HOURS)], + ), + ], + accuracy=Accuracy.TEMPORAL, + online=True, + … +) +``` + + +#### 1. Source + +The Source operator consumes events from Kafka and deserializes them into typed objects. For our Ice Cream shop example, we have this Proto: + +```Scala +IceCreamEventProto( + customer_id: _root_.scala.Predef.String = "", + created: _root_.scala.Long = 0L, + ice_cream_flavor: _root_.scala.Predef.String = "", + ice_cream_cone_size: _root_.scala.Predef.String = "", +) +``` + +#### 2. Spark expression evaluation + +This operator transforms the object of type `T` based on the GroupBy's defined selects and filters, outputting a Map[String, Any]. + +```scala +// Input +IceCreamEventProto( + customer_id = "Alice", + created = 1000L, + ice_cream_flavor = "chocolate", + ice_cream_cone_size = "large" // Not used in the GroupBy definition +) +// Output +Map( + "customer_id" -> "Alice", + "created" -> 1000L, + "flavor" -> "chocolate" +) +``` + +#### 3. Window operator + +This window operator pre-aggregates incoming `Map(String -> Any)` and produces an array of IRs. Example: + +Event 1 `Map("Alice", 1000L, "chocolate")`. +- Pre-aggregates for key "Alice": `[count: 1, last_flavor: "chocolate"]` + +Event 2 for "Bob": `Map("Bob", 1200L, "strawberry")` +- Pre-aggregates for key "Bob": `[count: 1, last_flavor: "strawberry"]` + +Event 3 for "Alice": `Map("Alice", 1500L, "olive oil")` +- Pre-aggregates for key "Alice": `[count: 2, last_flavor: "olive oil"]` + +#### 4. Avro conversion + +This operator uses Avro to finish encoding the array of IRs into bytes and creates a `PutRequest`. + +Input: `[2, "olive oil"]` + +Output: `PutRequest(keyBytes: a927dcc=, valueBytes: d823eaa82==, ...)` (varies depending on your specific `KVStore` implementation). + +#### 5. KVStore Sink + +The final operator asynchronously writes the `PutRequests` to the KV store. These tiles are later decoded by the Fetcher and merged to calculate the final feature values. \ No newline at end of file diff --git a/docs/source/Tiled_Architecture.md b/docs/source/Tiled_Architecture.md new file mode 100644 index 000000000..1ac618a95 --- /dev/null +++ b/docs/source/Tiled_Architecture.md @@ -0,0 +1,61 @@ + +# The Tiled Architecture + +**Important**: Tiling is a new feature that is still in the process of being open-sourced. + +## What is tiling? + +Tiling, or the tiled architecture, is a modification to Chronon's online architecture to store pre-aggregates (also known as "IRs" or Intermediate Representations) in the Key-Value store instead of individual events. + +The primary purpose of tiling is to improve the handling of hot keys, increase scalability, and decrease feature serving latency. + +Tiling requires [Flink](https://flink.apache.org/). + +### Chronon without tiling +The regular, untiled version works as pictured in Figure 1. +- The "write" path: reads an event stream, processes the events in Spark, then writes them out to a datastore. +- The "read" path: reads O(events) events from the store, aggregates them, and returns the feature values to the user. + +![Architecture](../images/Untiled_Architecture.png) +_Figure 1: The untiled architecture_ + +At scale, aggregating O(n) events each time there is a request can become costly. For example, if you have an event stream producing 10 events/sec for a certain key, a request for a feature with a 12-hour window will have to fetch and aggregate 432,000 events every single time. For a simple GroupBy that counts the number of events for a key, Chronon would iterate over 432,000 items and count the total. + +### Chronon with tiling +The tiled architecture, depicted in Figure 2, works differently: +- The "write" path: reads an event stream, processes and pre-aggregates the events in a stateful Flink app, then writes out the pre-aggregates to "tiles" in the store. +- The "read" path: reads O(tiles) tiles from the store, merges the pre-aggregates, and returns the feature values to the user. + +![Architecture](../images/Tiled_Architecture.png) +_Figure 2: The tiled architecture_ + +Tiling shifts a significant part of the aggregation work to the write path, which allows for faster feature serving. + +Using the same example as above (an event stream producing 10 events/sec for a certain key, and a GroupBy with a 12-hour window), a request for feature values would fetch and merge 12 or 13 1-hour tiles. For a simple GroupBy that counts the number of events for a key, Chronon would iterate over 13 numbers and add them together. That's significantly less work. + +#### Example: Fetching and serving tiled features +Suppose you have a GroupBy with two aggregations, `COUNT` and `LAST`, both using 3-hour windows, and you are storing 1-hour tiles in KV Store. To serve them, the Chronon Fetcher would fetch three tiles: +``` +[0:00, 1:00) -> [2, "B"] +[1:00, 2:00) -> [9, "A"] +[2:00, 3:00) -> [3, "C"] +``` +Then, it would combine the IRs to get the final feature values: `[14, "C"]`. + +## When to use tiling + +In general, tiling improves scalability and decreases feature serving latency. Some use cases are: +- You want to decrease feature serving latency. At Stripe, migrating to tiling decreased serving latency by 33% at 4K rps. +- You don't have access to Spark Streaming +- You don't have access to a datastore with range queries +- You want to reduce fanout to your datastore. +- You need to support aggregating over hot key entities + +In particular, organizations operating at significant scale with many hot-key entities should consider using the tiled architecture. If the number of events per entity key is at most a few thousand, the untiled approach would still perform well. + + +## How to enable tiling + +To enable tiling, you first need to start using Flink on the write path. See the [Chronon on Flink documentation](./Flink.md) for instructions. As part of this process, you may also need to modify your KV store implementation to know how to write and fetch tiles. + +Once the Flink app is set up and writing tiles to your datastore, the final step is to enable tiled reads in the Fetcher. Just add `enable_tiling=true` to the [customJson](https://github.com/airbnb/chronon/blob/48b789dd2c216c62bbf1d74fbf4e779f23db541f/api/py/ai/chronon/group_by.py#L561) of any GroupBy definition. diff --git a/docs/source/authoring_features/ChainingFeatures.md b/docs/source/authoring_features/ChainingFeatures.md index 54bdc69e5..8abac087d 100644 --- a/docs/source/authoring_features/ChainingFeatures.md +++ b/docs/source/authoring_features/ChainingFeatures.md @@ -79,9 +79,9 @@ enriched_listings = Join( ``` ### Configuration Example -[Chaining GroupBy](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/sample_team/sample_chaining_group_by.py) +[Chaining GroupBy](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/sample_team/sample_chaining_group_by.py) -[Chaining Join](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/joins/sample_team/sample_chaining_join.py) +[Chaining Join](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/joins/sample_team/sample_chaining_join.py) ## Clarifications - The goal of chaining is to use output of a Join as input to downstream computations like GroupBy or a Join. As of today we support the case 1 and case 2 in future plan diff --git a/docs/source/authoring_features/GroupBy.md b/docs/source/authoring_features/GroupBy.md index 07f724f94..6c39480fb 100644 --- a/docs/source/authoring_features/GroupBy.md +++ b/docs/source/authoring_features/GroupBy.md @@ -27,7 +27,7 @@ This can be achieved by using the output of one `GroupBy` as the input to the ne ## Supported aggregations -All supported aggregations are defined [here](https://github.com/airbnb/chronon/blob/master/api/thrift/api.thrift#L51). +All supported aggregations are defined [here](https://github.com/airbnb/chronon/blob/main/api/thrift/api.thrift#L51). Chronon supports powerful aggregation patterns and the section below goes into detail of the properties and behaviors of aggregations. @@ -181,7 +181,7 @@ If you look at the parameters column in the above table - you will see `k`. For approx_unique_count and approx_percentile - k stands for the size of the `sketch` - the larger this is, the more accurate and expensive to compute the results will be. Mapping between k and size for approx_unique_count is -[here](https://github.com/apache/incubator-datasketches-java/blob/master/src/main/java/org/apache/datasketches/cpc/CpcSketch.java#L180) +[here](https://github.com/apache/incubator-datasketches-java/blob/main/src/main/java/org/apache/datasketches/cpc/CpcSketch.java#L180) for approx_percentile is the first table in [here](https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html). `percentiles` for `approx_percentile` is an array of doubles between 0 and 1, where you want percentiles at. (Ex: "[0.25, 0.5, 0.75]") @@ -193,7 +193,7 @@ The following examples are broken down by source type. We strongly suggest makin ## Realtime Event GroupBy examples -This example is based on the [returns](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/returns.py) GroupBy from the quickstart guide that performs various aggregations over the `refund_amt` column over various windows. +This example is based on the [returns](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/returns.py) GroupBy from the quickstart guide that performs various aggregations over the `refund_amt` column over various windows. ```python source = Source( @@ -236,7 +236,7 @@ v1 = GroupBy( ## Bucketed GroupBy Example -In this example we take the [Purchases GroupBy](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/purchases.py) from the Quickstart tutorial and modify it to include buckets based on a hypothetical `"credit_card_type"` column. +In this example we take the [Purchases GroupBy](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/purchases.py) from the Quickstart tutorial and modify it to include buckets based on a hypothetical `"credit_card_type"` column. ```python source = Source( @@ -283,7 +283,7 @@ v1 = GroupBy( ## Simple Batch Event GroupBy examples -Example GroupBy with windowed aggregations. Taken from [purchases.py](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/purchases.py). +Example GroupBy with windowed aggregations. Taken from [purchases.py](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/purchases.py). Important things to note about this case relative to the streaming GroupBy: * The default accuracy here is `SNAPSHOT` meaning that updates to the online KV store only happen in batch, and also backfills will be midnight accurate rather than intra day accurate. @@ -329,7 +329,7 @@ v1 = GroupBy( ### Batch Entity GroupBy examples -This is taken from the [Users GroupBy](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/users.py) from the quickstart tutorial. +This is taken from the [Users GroupBy](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/users.py) from the quickstart tutorial. ```python diff --git a/docs/source/authoring_features/Join.md b/docs/source/authoring_features/Join.md index f86181463..57e2aaa06 100644 --- a/docs/source/authoring_features/Join.md +++ b/docs/source/authoring_features/Join.md @@ -6,7 +6,7 @@ Let's use an example to explain this further. In the [Quickstart](../getting_sta This is important because it means that when we serve the model online, inference will be made at checkout time, and therefore backfilled features for training data should correspond to a historical checkout event, with features computed as of those checkout times. In other words, every row of training data for the model has identical feature values to what the model would have seen had it made a production inference request at that time. -To see how we do this, let's take a look at the left side of the join definition (taken from [Quickstart Training Set Join](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/joins/quickstart/training_set.py)). +To see how we do this, let's take a look at the left side of the join definition (taken from [Quickstart Training Set Join](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/joins/quickstart/training_set.py)). ```python source = Source( diff --git a/docs/source/authoring_features/Source.md b/docs/source/authoring_features/Source.md index cc1f95146..49a6041e8 100644 --- a/docs/source/authoring_features/Source.md +++ b/docs/source/authoring_features/Source.md @@ -18,7 +18,7 @@ All sources are basically composed of the following pieces*: ## Streaming EventSource -Taken from the [returns.py](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/returns.py) example GroupBy in the quickstart tutorial. +Taken from the [returns.py](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/returns.py) example GroupBy in the quickstart tutorial. ```python source = Source( @@ -84,7 +84,7 @@ As you can see, a pre-requisite to using the streaming `EntitySource` is a chang ## Batch EntitySource -Taken from the [users.py](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/users.py) example GroupBy in the quickstart tutorial. +Taken from the [users.py](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/users.py) example GroupBy in the quickstart tutorial. ```python source = Source( diff --git a/docs/source/authoring_features/StagingQuery.md b/docs/source/authoring_features/StagingQuery.md index ded0b6b51..a3e853064 100644 --- a/docs/source/authoring_features/StagingQuery.md +++ b/docs/source/authoring_features/StagingQuery.md @@ -57,9 +57,9 @@ v1 = Join( ``` Note: The output namespace of the staging query is dependent on the metaData value for output_namespace. By default, the -metadata is extracted from [teams.json](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/teams.json) (or default team if one is not set). +metadata is extracted from [teams.json](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/teams.json) (or default team if one is not set). -**[See more configuration examples here](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/staging_queries)** +**[See more configuration examples here](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/staging_queries)** ## Date Logic and Template Parameters diff --git a/docs/source/getting_started/Tutorial.md b/docs/source/getting_started/Tutorial.md index 1ddfa9b32..485e73f89 100644 --- a/docs/source/getting_started/Tutorial.md +++ b/docs/source/getting_started/Tutorial.md @@ -19,7 +19,7 @@ Does not include: ## Setup -To get started with the Chronon, all you need to do is download the [docker-compose.yml](https://github.com/airbnb/chronon/blob/master/docker-compose.yml) file and run it locally: +To get started with the Chronon, all you need to do is download the [docker-compose.yml](https://github.com/airbnb/chronon/blob/main/docker-compose.yml) file and run it locally: ```bash curl -o docker-compose.yml https://chronon.ai/docker-compose.yml @@ -34,7 +34,7 @@ In this example, let's assume that we're a large online retailer, and we've dete ## Raw data sources -Fabricated raw data is included in the [data](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/data) directory. It includes four tables: +Fabricated raw data is included in the [data](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/data) directory. It includes four tables: 1. Users - includes basic information about users such as account created date; modeled as a batch data source that updates daily 2. Purchases - a log of all purchases by users; modeled as a log table with a streaming (i.e. Kafka) event-bus counterpart @@ -101,11 +101,11 @@ v1 = GroupBy( ) ``` -See the whole code file here: [purchases GroupBy](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/purchases.py). This is also in your docker image. We'll be running computation for it and the other GroupBys in [Step 3 - Backfilling Data](#step-3---backfilling-data). +See the whole code file here: [purchases GroupBy](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/purchases.py). This is also in your docker image. We'll be running computation for it and the other GroupBys in [Step 3 - Backfilling Data](#step-3---backfilling-data). **Feature set 2: Returns data features** -We perform a similar set of aggregations on returns data in the [returns GroupBy](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/returns.py). The code is not included here because it looks similar to the above example. +We perform a similar set of aggregations on returns data in the [returns GroupBy](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/returns.py). The code is not included here because it looks similar to the above example. **Feature set 3: User data features** @@ -127,7 +127,7 @@ v1 = GroupBy( ) ``` -Taken from the [users GroupBy](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/group_bys/quickstart/users.py). +Taken from the [users GroupBy](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/users.py). ### Step 2 - Join the features together @@ -160,7 +160,7 @@ v1 = Join( ) ``` -Taken from the [training_set Join](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/joins/quickstart/training_set.py). +Taken from the [training_set Join](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/joins/quickstart/training_set.py). The `left` side of the join is what defines the timestamps and primary keys for the backfill (notice that it is built on top of the `checkout` event, as dictated by our use case). @@ -330,4 +330,4 @@ Using chronon for your feature engineering work simplifies and improves your ML 4. Chronon exposes easy endpoints for feature fetching. 5. Consistency is guaranteed and measurable. -For a more detailed view into the benefits of using Chronon, see [Benefits of Chronon documentation](https://github.com/airbnb/chronon/tree/master?tab=readme-ov-file#benefits-of-chronon-over-other-approaches). +For a more detailed view into the benefits of using Chronon, see [Benefits of Chronon documentation](https://github.com/airbnb/chronon/tree/main?tab=readme-ov-file#benefits-of-chronon-over-other-approaches). diff --git a/docs/source/setup/Data_Integration.md b/docs/source/setup/Data_Integration.md index 6d0ff7985..ce1052320 100644 --- a/docs/source/setup/Data_Integration.md +++ b/docs/source/setup/Data_Integration.md @@ -10,11 +10,11 @@ Chronon jobs require Spark to run. If you already have a spark environment up an ## Configuring Spark -To configure Chronon to run on spark, you just need a `spark_submit.sh` script that can be used in Chronon's [`run.py`](https://github.com/airbnb/chronon/blob/master/api/py/ai/chronon/repo/run.py) Python script (this is the python-based CLI entry point for all jobs). +To configure Chronon to run on spark, you just need a `spark_submit.sh` script that can be used in Chronon's [`run.py`](https://github.com/airbnb/chronon/blob/main/api/py/ai/chronon/repo/run.py) Python script (this is the python-based CLI entry point for all jobs). -We recommend putting your `spark_submit.sh` within a `scripts/` subdirectory of your main `chronon` directory (see [Developer Setup docs](./Developer_Setup.md) for how to setup the main `chronon` directory.). If you do that, then you can use `run.py` as-is, as that is the [default location](https://github.com/airbnb/chronon/blob/master/api/py/ai/chronon/repo/run.py#L483) for `spark_submit.sh`. +We recommend putting your `spark_submit.sh` within a `scripts/` subdirectory of your main `chronon` directory (see [Developer Setup docs](./Developer_Setup.md) for how to setup the main `chronon` directory.). If you do that, then you can use `run.py` as-is, as that is the [default location](https://github.com/airbnb/chronon/blob/main/api/py/ai/chronon/repo/run.py#L483) for `spark_submit.sh`. -You can see an example `spark_submit.sh` script used by the quickstart guide here: [Quickstart example spark_submit.sh](https://github.com/airbnb/chronon/blob/master/api/py/test/sample/scripts/spark_submit.sh). +You can see an example `spark_submit.sh` script used by the quickstart guide here: [Quickstart example spark_submit.sh](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/scripts/spark_submit.sh). Note that this replies on an environment variable set in the `docker-compose.yml` which basically just points `$SPARK_SUBMIT` variable to the system level `spark-submit` binary. diff --git a/docs/source/setup/Developer_Setup.md b/docs/source/setup/Developer_Setup.md index 26ec17d8e..4311bd85a 100644 --- a/docs/source/setup/Developer_Setup.md +++ b/docs/source/setup/Developer_Setup.md @@ -34,7 +34,7 @@ Key points: 2. There are `group_bys` and `joins` subdirectories inside the root directory, under which there are team directories. Note that the team directory names must match what is within `teams.json` 3. Within each of these team directories are the actual user-written chronon files. Note that there can be sub-directories within each team directory for organization if desired. -For an example setup of this directory, see the [Sample](https://github.com/airbnb/chronon/tree/master/api/py/test/sample) that is also mounted to the docker image that is used in the Quickstart guide. +For an example setup of this directory, see the [Sample](https://github.com/airbnb/chronon/tree/main/api/py/test/sample) that is also mounted to the docker image that is used in the Quickstart guide. You can also use the following command to create a scratch directory from your `cwd`: diff --git a/docs/source/setup/Online_Integration.md b/docs/source/setup/Online_Integration.md index 51b3b36cd..dfb015368 100644 --- a/docs/source/setup/Online_Integration.md +++ b/docs/source/setup/Online_Integration.md @@ -10,11 +10,11 @@ This integration gives Chronon the ability to: ## Example -If you'd to start with an example, please refer to the [MongoDB Implementation in the Quickstart Guide](https://github.com/airbnb/chronon/tree/master/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online). This provides a complete working example of how to integrate Chronon with MongoDB. +If you'd to start with an example, please refer to the [MongoDB Implementation in the Quickstart Guide](https://github.com/airbnb/chronon/tree/main/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online). This provides a complete working example of how to integrate Chronon with MongoDB. ## Components -**KVStore**: The biggest part of the API implementation is the [KVStore](https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/Api.scala#L43). +**KVStore**: The biggest part of the API implementation is the [KVStore](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Api.scala#L43). ```scala object KVStore { @@ -47,11 +47,11 @@ trait KVStore { There are three functions to implement as part of this integration: 1. `create`: which takes a string and creates a new database/dataset with that name. -2. `multiGet`: which takes a `Seq` of [`GetRequest`](https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/Api.scala#L33) and converts them into a `Future[Seq[GetResponse]]` by querying the underlying KVStore. -3. `multiPut`: which takes a `Seq` of [`PutRequest`](https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/Api.scala#L38) and converts them into `Future[Seq[Boolean]]` (success/fail) by attempting to insert them into the underlying KVStore. +2. `multiGet`: which takes a `Seq` of [`GetRequest`](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Api.scala#L33) and converts them into a `Future[Seq[GetResponse]]` by querying the underlying KVStore. +3. `multiPut`: which takes a `Seq` of [`PutRequest`](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Api.scala#L38) and converts them into `Future[Seq[Boolean]]` (success/fail) by attempting to insert them into the underlying KVStore. 4. `bulkPut`: to upload a hive table into your kv store. It takes the table name and partitions as `String`s as well as the dataset as a `String`. If you have another mechanism (like an airflow upload operator) to upload data from hive into your kv stores you don't need to implement this method. -See the [MongoDB example here](https://github.com/airbnb/chronon/blob/master/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/MongoKvStore.scala). +See the [MongoDB example here](https://github.com/airbnb/chronon/blob/main/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/MongoKvStore.scala). **StreamDecoder**: This is responsible for "decoding" or converting the raw values that Chronon streaming jobs will read into events that it knows how to process. @@ -98,12 +98,12 @@ Chronon has a type system that can map to Spark's or Avro's type system. Schema | StructType | Array[Any] | -See the [Quickstart example here](https://github.com/airbnb/chronon/blob/master/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/QuickstartMutationDecoder.scala). +See the [Quickstart example here](https://github.com/airbnb/chronon/blob/main/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/QuickstartMutationDecoder.scala). -**API:** The main API that requires implementation is [API](https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/Api.scala#L151). This combines the above implementations with other client and logging configuration. +**API:** The main API that requires implementation is [API](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Api.scala#L151). This combines the above implementations with other client and logging configuration. -[ChrononMongoOnlineImpl](https://github.com/airbnb/chronon/blob/master/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/ChrononMongoOnlineImpl.scala) Is an example implemenation of the API. +[ChrononMongoOnlineImpl](https://github.com/airbnb/chronon/blob/main/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/ChrononMongoOnlineImpl.scala) Is an example implemenation of the API. Once you have the api object you can build a fetcher class using the api object like so diff --git a/docs/source/setup/Orchestration.md b/docs/source/setup/Orchestration.md index 908b6de58..1446e073e 100644 --- a/docs/source/setup/Orchestration.md +++ b/docs/source/setup/Orchestration.md @@ -6,29 +6,29 @@ Airflow is currently the best supported method for orchestration, however, other ## Airflow Integration -See the [Airflow Directory](https://github.com/airbnb/chronon/tree/master/airflow) for initial boilerplate code. +See the [Airflow Directory](https://github.com/airbnb/chronon/tree/main/airflow) for initial boilerplate code. The files in this directory can be used to create the following Chronon Airflow DAGs. -1. GroupBy DAGs, created by [group_by_dag_constructor.py](https://github.com/airbnb/chronon/tree/master/airflow/group_by_dag_constructor.py): +1. GroupBy DAGs, created by [group_by_dag_constructor.py](https://github.com/airbnb/chronon/tree/main/airflow/group_by_dag_constructor.py): 1. `chronon_batch_dag_{team_name}`: One DAG per team that uploads snapshots of computed features to the KV store for online group_bys, and frontfills daily snapshots for group_bys. 2. `chronon_streaming_dag_{team_name}`: One DAG per team that runs Streaming jobs for `online=True, realtime=True` GroupBys. These tasks run every 15 minutes and are configured to "keep alive" streaming jobs (i.e. do nothing if running, else attempt restart if dead/not started). -2. Join DAGs, created by [join_dag_constructor.py](https://github.com/airbnb/chronon/tree/master/airflow/join_dag_constructor.py): +2. Join DAGs, created by [join_dag_constructor.py](https://github.com/airbnb/chronon/tree/main/airflow/join_dag_constructor.py): 1. `chronon_join_{join_name}`: One DAG per join that performs backfill and daily frontfill of join data to the offline Hive table. -3. Staging Query DAGs, created by [staging_query_dag_constructor.py](https://github.com/airbnb/chronon/tree/master/airflow/staging_query_dag_constructor.py): +3. Staging Query DAGs, created by [staging_query_dag_constructor.py](https://github.com/airbnb/chronon/tree/main/airflow/staging_query_dag_constructor.py): 1. `chronon_staging_query_{team_name}`: One DAG per team that creates daily jobs for each Staging Query for the team. -4. Online/Offline Consistency Check DAGs, created by [online_offline_consistency_dag_constructor.py](https://github.com/airbnb/chronon/tree/master/airflow/online_offline_consistency_dag_constructor.py): +4. Online/Offline Consistency Check DAGs, created by [online_offline_consistency_dag_constructor.py](https://github.com/airbnb/chronon/tree/main/airflow/online_offline_consistency_dag_constructor.py): 1. `chronon_online_offline_comparison_{join_name}`: One DAG per join that computes the consistency of online serving data vs offline data for that join, and outputs the measurements to a stats table for each join that is configured. Note that logging must be enabled for this pipeline to work. To deploy this to your airflow environment, first copy everything in this directory over to your Airflow directory (where your other DAG files live), then set the following configurations: -1. Set your configuration variables in [constants.py](https://github.com/airbnb/chronon/tree/master/airflow/constants.py). -2. Implement the `get_kv_store_upload_operator` function in [helpers.py](https://github.com/airbnb/chronon/tree/master/airflow/helpers.py). **This is only required if you want to use Chronon online serving**. +1. Set your configuration variables in [constants.py](https://github.com/airbnb/chronon/tree/main/airflow/constants.py). +2. Implement the `get_kv_store_upload_operator` function in [helpers.py](https://github.com/airbnb/chronon/tree/main/airflow/helpers.py). **This is only required if you want to use Chronon online serving**. ## Alternate Integrations -While Airflow is currently the most well-supported integration, there is no reason why you couldn't choose a different orchestration engine to power the above flows. If you're interested in such an integration and you think that the community might benefit from your work, please consider [contributing](https://github.com/airbnb/chronon/blob/master/CONTRIBUTE.md) back to the project. +While Airflow is currently the most well-supported integration, there is no reason why you couldn't choose a different orchestration engine to power the above flows. If you're interested in such an integration and you think that the community might benefit from your work, please consider [contributing](https://github.com/airbnb/chronon/blob/main/CONTRIBUTING.md) back to the project. If you have questions about how to approach a different integration, feel free to ask for help in the [community Discord channel](https://discord.gg/GbmGATNqqP). diff --git a/docs/source/test_deploy_serve/Serve.md b/docs/source/test_deploy_serve/Serve.md index 001f04ce4..e967447c8 100644 --- a/docs/source/test_deploy_serve/Serve.md +++ b/docs/source/test_deploy_serve/Serve.md @@ -4,15 +4,15 @@ The main way to serve production Chronon data online is with the Java or Scala F The main online Java Fetcher libraries can be found here: -1. [`JavaFetcher`](https://github.com/airbnb/chronon/blob/master/online/src/main/java/ai/chronon/online/JavaFetcher.java) -2. [`JavaRequest`](https://github.com/airbnb/chronon/blob/master/online/src/main/java/ai/chronon/online/JavaRequest.java) -3. [`JavaResponse`](https://github.com/airbnb/chronon/blob/master/online/src/main/java/ai/chronon/online/JavaResponse.java) +1. [`JavaFetcher`](https://github.com/airbnb/chronon/blob/main/online/src/main/java/ai/chronon/online/JavaFetcher.java) +2. [`JavaRequest`](https://github.com/airbnb/chronon/blob/main/online/src/main/java/ai/chronon/online/JavaRequest.java) +3. [`JavaResponse`](https://github.com/airbnb/chronon/blob/main/online/src/main/java/ai/chronon/online/JavaResponse.java) And their scala counterparts: -1. [`Fetcher`](https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/Fetcher.scala) -2. [`Request`](https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/Fetcher.scala#L39) -3. [`Response`](https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/Fetcher.scala#L48) +1. [`Fetcher`](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Fetcher.scala) +2. [`Request`](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Fetcher.scala#L39) +3. [`Response`](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Fetcher.scala#L48) Example Implementation diff --git a/flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala b/flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala index 0912a2e94..6d3fecae3 100644 --- a/flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala +++ b/flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala @@ -75,6 +75,8 @@ class AsyncKVStoreWriter(onlineImpl: Api, featureGroupName: String) // The context used for the future callbacks implicit lazy val executor: ExecutionContext = AsyncKVStoreWriter.ExecutionContextInstance + // One may want to use different KV stores depending on whether tiling is on. + // The untiled version of Chronon works on "append" store semantics, and the tiled version works on "overwrite". protected def getKVStore: KVStore = { onlineImpl.genKvStore } diff --git a/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala b/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala index a88ea0aa7..16eb8dbb2 100644 --- a/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala +++ b/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala @@ -3,6 +3,7 @@ package ai.chronon.flink import org.slf4j.LoggerFactory import ai.chronon.api.Extensions.GroupByOps import ai.chronon.api.{Constants, DataModel, Query, StructType => ChrononStructType} +import ai.chronon.flink.window.TimestampedTile import ai.chronon.online.{AvroConversions, GroupByServingInfoParsed} import ai.chronon.online.KVStore.PutRequest import org.apache.flink.api.common.functions.RichFlatMapFunction @@ -13,28 +14,32 @@ import org.apache.flink.util.Collector import scala.jdk.CollectionConverters._ /** - * A Flink function that is responsible for converting the Spark expr eval output and converting that to a form - * that can be written out to the KV store (PutRequest object) - * @param groupByServingInfoParsed The GroupBy we are working with - * @tparam T The input data type + * Base class for the Avro conversion Flink operator. + * + * Subclasses should override the RichFlatMapFunction methods (flatMap) and groupByServingInfoParsed. + * + * @tparam IN The input data type which contains the data to be avro-converted to bytes. + * @tparam OUT The output data type (generally a PutRequest). */ -case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed) - extends RichFlatMapFunction[Map[String, Any], PutRequest] { - @transient lazy val logger = LoggerFactory.getLogger(getClass) +sealed abstract class BaseAvroCodecFn[IN, OUT] extends RichFlatMapFunction[IN, OUT] { + def groupByServingInfoParsed: GroupByServingInfoParsed + @transient lazy val logger = LoggerFactory.getLogger(getClass) @transient protected var avroConversionErrorCounter: Counter = _ + @transient protected var eventProcessingErrorCounter: Counter = + _ // Shared metric for errors across the entire Flink app. - protected val query: Query = groupByServingInfoParsed.groupBy.streamingSource.get.getEvents.query - protected val streamingDataset: String = groupByServingInfoParsed.groupBy.streamingDataset + protected lazy val query: Query = groupByServingInfoParsed.groupBy.streamingSource.get.getEvents.query + protected lazy val streamingDataset: String = groupByServingInfoParsed.groupBy.streamingDataset // TODO: update to use constant names that are company specific - protected val timeColumnAlias: String = Constants.TimeColumn - protected val timeColumn: String = Option(query.timeColumn).getOrElse(timeColumnAlias) + protected lazy val timeColumnAlias: String = Constants.TimeColumn + protected lazy val timeColumn: String = Option(query.timeColumn).getOrElse(timeColumnAlias) - protected val (keyToBytes, valueToBytes): (Any => Array[Byte], Any => Array[Byte]) = + protected lazy val (keyToBytes, valueToBytes): (Any => Array[Byte], Any => Array[Byte]) = getKVSerializers(groupByServingInfoParsed) - protected val (keyColumns, valueColumns): (Array[String], Array[String]) = getKVColumns - protected val extraneousRecord: Any => Array[Any] = { + protected lazy val (keyColumns, valueColumns): (Array[String], Array[String]) = getKVColumns + protected lazy val extraneousRecord: Any => Array[Any] = { case x: Map[_, _] if x.keys.forall(_.isInstanceOf[String]) => x.flatMap { case (key, value) => Array(key, value) }.toArray } @@ -70,6 +75,16 @@ case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed) val valueColumns = groupByServingInfoParsed.groupBy.aggregationInputs ++ additionalColumns (keyColumns, valueColumns) } +} + +/** + * A Flink function that is responsible for converting the Spark expr eval output and converting that to a form + * that can be written out to the KV store (PutRequest object) + * @param groupByServingInfoParsed The GroupBy we are working with + * @tparam T The input data type + */ +case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed) + extends BaseAvroCodecFn[Map[String, Any], PutRequest] { override def open(configuration: Configuration): Unit = { super.open(configuration) @@ -87,16 +102,69 @@ case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed) } catch { case e: Exception => // To improve availability, we don't rethrow the exception. We just drop the event - // and track the errors in a metric. If there are too many errors we'll get alerted/paged. + // and track the errors in a metric. Alerts should be set up on this metric. logger.error(s"Error converting to Avro bytes - $e") + eventProcessingErrorCounter.inc() avroConversionErrorCounter.inc() } def avroConvertMapToPutRequest(in: Map[String, Any]): PutRequest = { val tsMills = in(timeColumnAlias).asInstanceOf[Long] - val keyBytes = keyToBytes(keyColumns.map(in.get(_).get)) - val valueBytes = valueToBytes(valueColumns.map(in.get(_).get)) + val keyBytes = keyToBytes(keyColumns.map(in(_))) + val valueBytes = valueToBytes(valueColumns.map(in(_))) PutRequest(keyBytes, valueBytes, streamingDataset, Some(tsMills)) } +} +/** + * A Flink function that is responsible for converting an array of pre-aggregates (aka a tile) to a form + * that can be written out to the KV store (PutRequest object). + * + * @param groupByServingInfoParsed The GroupBy we are working with + * @tparam T The input data type + */ +case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed) + extends BaseAvroCodecFn[TimestampedTile, PutRequest] { + override def open(configuration: Configuration): Unit = { + super.open(configuration) + val metricsGroup = getRuntimeContext.getMetricGroup + .addGroup("chronon") + .addGroup("feature_group", groupByServingInfoParsed.groupBy.getMetaData.getName) + avroConversionErrorCounter = metricsGroup.counter("avro_conversion_errors") + eventProcessingErrorCounter = metricsGroup.counter("event_processing_error") + } + override def close(): Unit = super.close() + + override def flatMap(value: TimestampedTile, out: Collector[PutRequest]): Unit = + try { + out.collect(avroConvertTileToPutRequest(value)) + } catch { + case e: Exception => + // To improve availability, we don't rethrow the exception. We just drop the event + // and track the errors in a metric. Alerts should be set up on this metric. + logger.error(s"Error converting to Avro bytes - ", e) + eventProcessingErrorCounter.inc() + avroConversionErrorCounter.inc() + } + + def avroConvertTileToPutRequest(in: TimestampedTile): PutRequest = { + val tsMills = in.latestTsMillis + + // 'keys' is a map of (key name in schema -> key value), e.g. Map("card_number" -> "4242-4242-4242-4242") + // We convert to AnyRef because Chronon expects an AnyRef (for scala <> java interoperability reasons). + val keys: Map[String, AnyRef] = keyColumns.zip(in.keys.map(_.asInstanceOf[AnyRef])).toMap + val keyBytes = keyToBytes(in.keys.toArray) + val valueBytes = in.tileBytes + + logger.debug( + s""" + |Avro converting tile to PutRequest - tile=${in} + |groupBy=${groupByServingInfoParsed.groupBy.getMetaData.getName} tsMills=$tsMills keys=$keys + |keyBytes=${java.util.Base64.getEncoder.encodeToString(keyBytes)} + |valueBytes=${java.util.Base64.getEncoder.encodeToString(valueBytes)} + |streamingDataset=$streamingDataset""".stripMargin + ) + + PutRequest(keyBytes, valueBytes, streamingDataset, Some(tsMills)) + } } diff --git a/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala b/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala index 1a275e950..25b7f0039 100644 --- a/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala +++ b/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala @@ -1,25 +1,31 @@ package ai.chronon.flink +import ai.chronon.aggregator.windowing.ResolutionUtils +import ai.chronon.api.{DataType} import ai.chronon.api.Extensions.{GroupByOps, SourceOps} -import ai.chronon.online.GroupByServingInfoParsed +import ai.chronon.flink.window.{ + AlwaysFireOnElementTrigger, + FlinkRowAggProcessFunction, + FlinkRowAggregationFunction, + KeySelector, + TimestampedTile +} +import ai.chronon.online.{GroupByServingInfoParsed, SparkConversions} import ai.chronon.online.KVStore.PutRequest -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment} import org.apache.spark.sql.Encoder import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.functions.async.RichAsyncFunction +import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, WindowAssigner} +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.slf4j.LoggerFactory /** - * Flink job that processes a single streaming GroupBy and writes out the results - * (raw events in untiled, pre-aggregates in case of tiled) to the KV store. - * At a high level, the operators are structured as follows: - * Kafka source -> Spark expression eval -> Avro conversion -> KV store writer - * Kafka source - Reads objects of type T (specific case class, Thrift / Proto) from a Kafka topic - * Spark expression eval - Evaluates the Spark SQL expression in the GroupBy and projects and filters the input data - * Avro conversion - Converts the Spark expr eval output to a form that can be written out to the KV store (PutRequest object) - * KV store writer - Writes the PutRequest objects to the KV store using the AsyncDataStream API + * Flink job that processes a single streaming GroupBy and writes out the results to the KV store. * - * In the untiled version there are no-shuffles and thus this ends up being a single node in the Flink DAG - * (with the above 4 operators and parallelism as injected by the user) + * There are two versions of the job, tiled and untiled. The untiled version writes out raw events while the tiled + * version writes out pre-aggregates. See the `runGroupByJob` and `runTiledGroupByJob` methods for more details. * * @param eventSrc - Provider of a Flink Datastream[T] for the given topic and feature group * @param sinkFn - Async Flink writer function to help us write to the KV store @@ -33,10 +39,13 @@ class FlinkJob[T](eventSrc: FlinkSource[T], groupByServingInfoParsed: GroupByServingInfoParsed, encoder: Encoder[T], parallelism: Int) { + private[this] val logger = LoggerFactory.getLogger(getClass) + + val featureGroupName: String = groupByServingInfoParsed.groupBy.getMetaData.getName + logger.info(f"Creating Flink job. featureGroupName=${featureGroupName}") protected val exprEval: SparkExpressionEvalFn[T] = new SparkExpressionEvalFn[T](encoder, groupByServingInfoParsed.groupBy) - val featureGroupName: String = groupByServingInfoParsed.groupBy.getMetaData.getName if (groupByServingInfoParsed.groupBy.streamingSource.isEmpty) { throw new IllegalArgumentException( @@ -47,7 +56,25 @@ class FlinkJob[T](eventSrc: FlinkSource[T], // The source of our Flink application is a Kafka topic val kafkaTopic: String = groupByServingInfoParsed.groupBy.streamingSource.get.topic + /** + * The "untiled" version of the Flink app. + * + * At a high level, the operators are structured as follows: + * Kafka source -> Spark expression eval -> Avro conversion -> KV store writer + * Kafka source - Reads objects of type T (specific case class, Thrift / Proto) from a Kafka topic + * Spark expression eval - Evaluates the Spark SQL expression in the GroupBy and projects and filters the input data + * Avro conversion - Converts the Spark expr eval output to a form that can be written out to the KV store + * (PutRequest object) + * KV store writer - Writes the PutRequest objects to the KV store using the AsyncDataStream API + * + * In this untiled version, there are no shuffles and thus this ends up being a single node in the Flink DAG + * (with the above 4 operators and parallelism as injected by the user). + */ def runGroupByJob(env: StreamExecutionEnvironment): DataStream[WriteResponse] = { + logger.info( + f"Running Flink job for featureGroupName=${featureGroupName}, kafkaTopic=${kafkaTopic}. " + + f"Tiling is disabled.") + val sourceStream: DataStream[T] = eventSrc .getDataStream(kafkaTopic, featureGroupName)(env, parallelism) @@ -70,4 +97,100 @@ class FlinkJob[T](eventSrc: FlinkSource[T], featureGroupName ) } + + /** + * The "tiled" version of the Flink app. + * + * The operators are structured as follows: + * 1. Kafka source - Reads objects of type T (specific case class, Thrift / Proto) from a Kafka topic + * 2. Spark expression eval - Evaluates the Spark SQL expression in the GroupBy and projects and filters the input + * data + * 3. Window/tiling - This window aggregates incoming events, keeps track of the IRs, and sends them forward so + * they are written out to the KV store + * 4. Avro conversion - Finishes converting the output of the window (the IRs) to a form that can be written out + * to the KV store (PutRequest object) + * 5. KV store writer - Writes the PutRequest objects to the KV store using the AsyncDataStream API + * + * The window causes a split in the Flink DAG, so there are two nodes, (1+2) and (3+4+5). + */ + def runTiledGroupByJob(env: StreamExecutionEnvironment): DataStream[WriteResponse] = { + logger.info( + f"Running Flink job for featureGroupName=${featureGroupName}, kafkaTopic=${kafkaTopic}. " + + f"Tiling is enabled.") + + val tilingWindowSizeInMillis: Option[Long] = + ResolutionUtils.getSmallestWindowResolutionInMillis(groupByServingInfoParsed.groupBy) + + val sourceStream: DataStream[T] = + eventSrc + .getDataStream(kafkaTopic, featureGroupName)(env, parallelism) + + val sparkExprEvalDS: DataStream[Map[String, Any]] = sourceStream + .flatMap(exprEval) + .uid(s"spark-expr-eval-flatmap-$featureGroupName") + .name(s"Spark expression eval for $featureGroupName") + .setParallelism(sourceStream.parallelism) // Use same parallelism as previous operator + + val inputSchema: Seq[(String, DataType)] = + exprEval.getOutputSchema.fields + .map(field => (field.name, SparkConversions.toChrononType(field.name, field.dataType))) + .toSeq + + val window = TumblingEventTimeWindows + .of(Time.milliseconds(tilingWindowSizeInMillis.get)) + .asInstanceOf[WindowAssigner[Map[String, Any], TimeWindow]] + + // An alternative to AlwaysFireOnElementTrigger can be used: BufferedProcessingTimeTrigger. + // The latter will buffer writes so they happen at most every X milliseconds per GroupBy & key. + val trigger = new AlwaysFireOnElementTrigger() + + // We use Flink "Side Outputs" to track any late events that aren't computed. + val tilingLateEventsTag = OutputTag[Map[String, Any]]("tiling-late-events") + + // The tiling operator works the following way: + // 1. Input: Spark expression eval (previous operator) + // 2. Key by the entity key(s) defined in the groupby + // 3. Window by a tumbling window + // 4. Use our custom trigger that will "FIRE" on every element + // 5. the AggregationFunction merges each incoming element with the current IRs which are kept in state + // - Each time a "FIRE" is triggered (i.e. on every event), getResult() is called and the current IRs are emitted + // 6. A process window function does additional processing each time the AggregationFunction emits results + // - The only purpose of this window function is to mark tiles as closed so we can do client-side caching in SFS + // 7. Output: TimestampedTile, containing the current IRs (Avro encoded) and the timestamp of the current element + val tilingDS: DataStream[TimestampedTile] = + sparkExprEvalDS + .keyBy(KeySelector.getKeySelectionFunction(groupByServingInfoParsed.groupBy)) + .window(window) + .trigger(trigger) + .sideOutputLateData(tilingLateEventsTag) + .aggregate( + // See Flink's "ProcessWindowFunction with Incremental Aggregation" + preAggregator = new FlinkRowAggregationFunction(groupByServingInfoParsed.groupBy, inputSchema), + windowFunction = new FlinkRowAggProcessFunction(groupByServingInfoParsed.groupBy, inputSchema) + ) + .uid(s"tiling-01-$featureGroupName") + .name(s"Tiling for $featureGroupName") + .setParallelism(sourceStream.parallelism) + + // Track late events + val sideOutputStream: DataStream[Map[String, Any]] = + tilingDS + .getSideOutput(tilingLateEventsTag) + .flatMap(new LateEventCounter(featureGroupName)) + .uid(s"tiling-side-output-01-$featureGroupName") + .name(s"Tiling Side Output Late Data for $featureGroupName") + .setParallelism(sourceStream.parallelism) + + val putRecordDS: DataStream[PutRequest] = tilingDS + .flatMap(new TiledAvroCodecFn[T](groupByServingInfoParsed)) + .uid(s"avro-conversion-01-$featureGroupName") + .name(s"Avro conversion for $featureGroupName") + .setParallelism(sourceStream.parallelism) + + AsyncKVStoreWriter.withUnorderedWaits( + putRecordDS, + sinkFn, + featureGroupName + ) + } } diff --git a/flink/src/main/scala/ai/chronon/flink/FlinkSource.scala b/flink/src/main/scala/ai/chronon/flink/FlinkSource.scala index ceeb0d9c6..336525556 100644 --- a/flink/src/main/scala/ai/chronon/flink/FlinkSource.scala +++ b/flink/src/main/scala/ai/chronon/flink/FlinkSource.scala @@ -6,6 +6,8 @@ abstract class FlinkSource[T] extends Serializable { /** * Return a Flink DataStream for the given topic and feature group. + * + * When implementing a source, you should also make a conscious decision about your allowed lateness strategy. */ def getDataStream(topic: String, groupName: String)( env: StreamExecutionEnvironment, diff --git a/flink/src/main/scala/ai/chronon/flink/RichMetricsOperators.scala b/flink/src/main/scala/ai/chronon/flink/RichMetricsOperators.scala new file mode 100644 index 000000000..086ecc865 --- /dev/null +++ b/flink/src/main/scala/ai/chronon/flink/RichMetricsOperators.scala @@ -0,0 +1,27 @@ +package ai.chronon.flink + +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.metrics.Counter +import org.apache.flink.util.Collector + +/** + * Function to count late events. + * + * This function should consume the Side Output of the main tiling window. + * */ +class LateEventCounter(featureGroupName: String) extends RichFlatMapFunction[Map[String, Any], Map[String, Any]] { + @transient private var lateEventCounter: Counter = _ + + override def open(parameters: Configuration): Unit = { + val metricsGroup = getRuntimeContext.getMetricGroup + .addGroup("chronon") + .addGroup("feature_group", featureGroupName) + lateEventCounter = metricsGroup.counter("tiling.late_events") + } + + override def flatMap(in: Map[String, Any], out: Collector[Map[String, Any]]): Unit = { + lateEventCounter.inc() + out.collect(in); + } +} diff --git a/flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala b/flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala index 78e44540b..793517cbc 100644 --- a/flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala +++ b/flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala @@ -101,7 +101,7 @@ class SparkExpressionEvalFn[T](encoder: Encoder[T], groupBy: GroupBy) extends Ri } catch { case e: Exception => // To improve availability, we don't rethrow the exception. We just drop the event - // and track the errors in a metric. If there are too many errors we'll get alerted/paged. + // and track the errors in a metric. Alerts should be set up on this metric. logger.error(s"Error evaluating Spark expression - $e") exprEvalErrorCounter.inc() } diff --git a/flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala b/flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala new file mode 100644 index 000000000..090772362 --- /dev/null +++ b/flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala @@ -0,0 +1,206 @@ +package ai.chronon.flink.window + +import ai.chronon.aggregator.row.RowAggregator +import ai.chronon.api.Extensions.GroupByOps +import ai.chronon.api.{Constants, DataType, GroupBy, Row} +import ai.chronon.online.{ArrayRow, TileCodec} +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.metrics.Counter +import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +import scala.util.{Failure, Success, Try} + +/** + * TimestampedIR combines the current Intermediate Result with the timestamp of the event being processed. + * We need to keep track of the timestamp of the event processed so we can calculate processing lag down the line. + * + * Example: for a GroupBy with 2 windows, we'd have TimestampedTile( [IR for window 1, IR for window 2], timestamp ). + * + * @param ir the array of partial aggregates + * @param latestTsMillis timestamp of the current event being processed + */ +case class TimestampedIR( + ir: Array[Any], + latestTsMillis: Option[Long] +) + +/** + * Wrapper Flink aggregator around Chronon's RowAggregator. Relies on Flink to pass in + * the correct set of events for the tile. As the aggregates produced by this function + * are used on the serving side along with other pre-aggregates, we don't 'finalize' the + * Chronon RowAggregator and instead return the intermediate representation. + * + * (This cannot be a RichAggregateFunction because Flink does not support Rich functions in windows.) + */ +class FlinkRowAggregationFunction( + groupBy: GroupBy, + inputSchema: Seq[(String, DataType)] +) extends AggregateFunction[Map[String, Any], TimestampedIR, TimestampedIR] { + @transient private[flink] var rowAggregator: RowAggregator = _ + @transient lazy val logger = LoggerFactory.getLogger(getClass) + + private val valueColumns: Array[String] = inputSchema.map(_._1).toArray // column order matters + private val timeColumnAlias: String = Constants.TimeColumn + + /* + * Initialize the transient rowAggregator. + * Running this method is an idempotent operation: + * 1. The initialized RowAggregator is always the same given a `groupBy` and `inputSchema`. + * 2. The RowAggregator itself doens't hold state; Flink keeps track of the state of the IRs. + */ + private def initializeRowAggregator(): Unit = + rowAggregator = TileCodec.buildRowAggregator(groupBy, inputSchema) + + override def createAccumulator(): TimestampedIR = { + initializeRowAggregator() + TimestampedIR(rowAggregator.init, None) + } + + override def add( + element: Map[String, Any], + accumulatorIr: TimestampedIR + ): TimestampedIR = { + // Most times, the time column is a Long, but it could be a Double. + val tsMills = Try(element(timeColumnAlias).asInstanceOf[Long]) + .getOrElse(element(timeColumnAlias).asInstanceOf[Double].toLong) + val row = toChrononRow(element, tsMills) + + // Given that the rowAggregator is transient, it may be null when a job is restored from a checkpoint + if (rowAggregator == null) { + logger.debug( + f"The Flink RowAggregator was null for groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills" + ) + initializeRowAggregator() + } + + logger.debug( + f"Flink pre-aggregates BEFORE adding new element: accumulatorIr=[${accumulatorIr.ir + .mkString(", ")}] groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element" + ) + + val partialAggregates = Try { + rowAggregator.update(accumulatorIr.ir, row) + } + + partialAggregates match { + case Success(v) => { + logger.debug( + f"Flink pre-aggregates AFTER adding new element [${v.mkString(", ")}] " + + f"groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element" + ) + TimestampedIR(v, Some(tsMills)) + } + case Failure(e) => + logger.error( + s"Flink error calculating partial row aggregate. " + + s"groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element", + e + ) + throw e + } + } + + // Note we return intermediate results here as the results of this + // aggregator are used on the serving side along with other pre-aggregates + override def getResult(accumulatorIr: TimestampedIR): TimestampedIR = + accumulatorIr + + override def merge(aIr: TimestampedIR, bIr: TimestampedIR): TimestampedIR = + TimestampedIR( + rowAggregator.merge(aIr.ir, bIr.ir), + aIr.latestTsMillis + .flatMap(aL => bIr.latestTsMillis.map(bL => Math.max(aL, bL))) + .orElse(aIr.latestTsMillis.orElse(bIr.latestTsMillis)) + ) + + def toChrononRow(value: Map[String, Any], tsMills: Long): Row = { + // The row values need to be in the same order as the input schema columns + // The reason they are out of order in the first place is because the CatalystUtil does not return values in the + // same order as the schema + val values: Array[Any] = valueColumns.map(value(_)) + new ArrayRow(values, tsMills) + } +} + +/** + * TimestampedTile combines the entity keys, the encoded Intermediate Result, and the timestamp of the event being processed. + * + * We need the timestamp of the event processed so we can calculate processing lag down the line. + * + * @param keys the GroupBy entity keys + * @param tileBytes encoded tile IR + * @param latestTsMillis timestamp of the current event being processed + */ +case class TimestampedTile( + keys: List[Any], + tileBytes: Array[Byte], + latestTsMillis: Long +) + +// This process function is only meant to be used downstream of the ChrononFlinkAggregationFunction +class FlinkRowAggProcessFunction( + groupBy: GroupBy, + inputSchema: Seq[(String, DataType)] +) extends ProcessWindowFunction[TimestampedIR, TimestampedTile, List[Any], TimeWindow] { + + @transient private[flink] var tileCodec: TileCodec = _ + @transient lazy val logger = LoggerFactory.getLogger(getClass) + + @transient private var rowProcessingErrorCounter: Counter = _ + @transient private var eventProcessingErrorCounter: Counter = + _ // Shared metric for errors across the entire Flink app. + + override def open(parameters: Configuration): Unit = { + super.open(parameters) + tileCodec = new TileCodec(groupBy, inputSchema) + + val metricsGroup = getRuntimeContext.getMetricGroup + .addGroup("chronon") + .addGroup("feature_group", groupBy.getMetaData.getName) + rowProcessingErrorCounter = metricsGroup.counter("tiling_process_function_error") + eventProcessingErrorCounter = metricsGroup.counter("event_processing_error") + } + + /** + * Process events emitted from the aggregate function. + * Output format: (keys, encoded tile IR, timestamp of the event being processed) + * */ + override def process( + keys: List[Any], + context: Context, + elements: Iterable[TimestampedIR], + out: Collector[TimestampedTile] + ): Unit = { + val windowEnd = context.window.getEnd + val irEntry = elements.head + val isComplete = context.currentWatermark >= windowEnd + + val tileBytes = Try { + tileCodec.makeTileIr(irEntry.ir, isComplete) + } + + tileBytes match { + case Success(v) => { + logger.debug( + s""" + |Flink aggregator processed element irEntry=$irEntry + |tileBytes=${java.util.Base64.getEncoder.encodeToString(v)} + |windowEnd=$windowEnd groupBy=${groupBy.getMetaData.getName} + |keys=$keys isComplete=$isComplete tileAvroSchema=${tileCodec.tileAvroSchema}""" + ) + // The timestamp should never be None here. + out.collect(TimestampedTile(keys, v, irEntry.latestTsMillis.get)) + } + case Failure(e) => + // To improve availability, we don't rethrow the exception. We just drop the event + // and track the errors in a metric. Alerts should be set up on this metric. + logger.error(s"Flink process error making tile IR", e) + eventProcessingErrorCounter.inc() + rowProcessingErrorCounter.inc() + } + } +} diff --git a/flink/src/main/scala/ai/chronon/flink/window/KeySelector.scala b/flink/src/main/scala/ai/chronon/flink/window/KeySelector.scala new file mode 100644 index 000000000..900d8bebd --- /dev/null +++ b/flink/src/main/scala/ai/chronon/flink/window/KeySelector.scala @@ -0,0 +1,33 @@ +package ai.chronon.flink.window + +import ai.chronon.api.GroupBy + +import scala.jdk.CollectionConverters._ +import org.slf4j.LoggerFactory + +/** + * A KeySelector is what Flink uses to determine how to partition a DataStream. In a distributed environment, the + * KeySelector guarantees that events with the same key always end up in the same machine. + * If invoked multiple times on the same object, the returned key must be the same. + */ +object KeySelector { + private[this] lazy val logger = LoggerFactory.getLogger(getClass) + + /** + * Given a GroupBy, create a function to key the output of a SparkExprEval operator by the entities defined in the + * GroupBy. The function returns a List of size equal to the number of keys in the GroupBy. + * + * For example, if a GroupBy is defined as "GroupBy(..., keys=["color", "size"], ...), the function will key the + * Flink SparkExprEval DataStream by color and size, so all events with the same (color, size) are sent to the same + * operator. + */ + def getKeySelectionFunction(groupBy: GroupBy): Map[String, Any] => List[Any] = { + // List uses MurmurHash.seqHash for its .hashCode(), which gives us hashing based on content. + // (instead of based on the instance, which is the case for Array). + val groupByKeys: List[String] = groupBy.keyColumns.asScala.toList + logger.info( + f"Creating key selection function for Flink app. groupByKeys=$groupByKeys" + ) + (sparkEvalOutput: Map[String, Any]) => groupByKeys.collect(sparkEvalOutput) + } +} diff --git a/flink/src/main/scala/ai/chronon/flink/window/Trigger.scala b/flink/src/main/scala/ai/chronon/flink/window/Trigger.scala new file mode 100644 index 000000000..f72dddbe6 --- /dev/null +++ b/flink/src/main/scala/ai/chronon/flink/window/Trigger.scala @@ -0,0 +1,180 @@ +package ai.chronon.flink.window + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * Custom Flink Trigger that fires on every event received. + * */ +class AlwaysFireOnElementTrigger extends Trigger[Map[String, Any], TimeWindow] { + override def onElement( + element: Map[String, Any], + timestamp: Long, + window: TimeWindow, + ctx: Trigger.TriggerContext + ): TriggerResult = + TriggerResult.FIRE + + override def onProcessingTime( + time: Long, + window: TimeWindow, + ctx: Trigger.TriggerContext + ): TriggerResult = + TriggerResult.CONTINUE + + override def onEventTime( + time: Long, + window: TimeWindow, + ctx: Trigger.TriggerContext + ): TriggerResult = + // We don't need to PURGE here since we don't have explicit state. + // Flink's "Window Lifecycle" doc: "The window is completely removed when the time (event or processing time) + // passes its end timestamp plus the user-specified allowed lateness" + TriggerResult.CONTINUE + + // This Trigger doesn't hold state, so we don't need to do anything when the window is purged. + override def clear( + window: TimeWindow, + ctx: Trigger.TriggerContext + ): Unit = {} + + override def canMerge: Boolean = true + + override def onMerge( + window: TimeWindow, + mergeContext: Trigger.OnMergeContext + ): Unit = {} +} + +/** + * BufferedProcessingTimeTrigger is a custom Trigger that fires at most every 'bufferSizeMillis' within a window. + * It is intended for incremental window aggregations using event-time semantics. + * + * Purpose: This trigger exists as an optimization to reduce the number of writes to our online store and better handle + * contention that arises from having hot keys. + * + * Details: + * - The buffer timers are NOT aligned with the UNIX Epoch, they can fire at any timestamp. e.g., if the first + * event arrives at 14ms, and the buffer size is 100ms, the timer will fire at 114ms. + * - Buffer timers are only scheduled when events come in. If there's a gap in events, this trigger won't fire. + * + * Edge cases handled: + * - If the (event-time) window closes before the last (processing-time) buffer fires, this trigger will fire + * the remaining buffered elements before closing. + * + * Example: + * Window size = 300,000 ms (5 minutes) + * BufferSizeMillis = 100 ms. + * Assume we are using this trigger on a GroupBy that counts the number unique IDs see. + * For simplicity, assume event time and processing time are synchronized (although in practice this is never true) + * + * Event 1: ts = 14 ms, ID = A. + * preAggregate (a Set that keeps track of all unique IDs seen) = [A] + * this causes a timer to be set for timestamp = 114 ms. + * Event 2: ts = 38 ms, ID = B. + * preAggregate = [A, B] + * Event 3: ts = 77 ms, ID = B. + * preAggregate = [A, B] + * Timer set for 114ms fires. + * we emit the preAggregate [A, B]. + * Event 4: ts = 400ms, ID = C. + * preAggregate = [A,B,C] (we don't purge the previous events when the time fires!) + * this causes a timer to be set for timestamp = 500 ms + * Timer set for 500ms fires. + * we emit the preAggregate [A, B, C]. + * */ +class BufferedProcessingTimeTrigger(bufferSizeMillis: Long) extends Trigger[Map[String, Any], TimeWindow] { + // Each pane has its own state. A Flink pane is an actual instance of a defined window for a given key. + private val nextTimerTimestampStateDescriptor = + new ValueStateDescriptor[java.lang.Long]("nextTimerTimestampState", classOf[java.lang.Long]) + + /** + * When an element arrives, set up a processing time trigger to fire after `bufferSizeMillis`. + * If a timer is already set, we don't want to create a new one. + * + * Late events are treated the same way as regular events; they will still get buffered. + */ + override def onElement( + element: Map[String, Any], + timestamp: Long, + window: TimeWindow, + ctx: Trigger.TriggerContext + ): TriggerResult = { + val nextTimerTimestampState: ValueState[java.lang.Long] = ctx.getPartitionedState( + nextTimerTimestampStateDescriptor + ) + + // Set timer if one doesn't already exist + if (nextTimerTimestampState.value() == null) { + val nextFireTimestampMillis = ctx.getCurrentProcessingTime + bufferSizeMillis + ctx.registerProcessingTimeTimer(nextFireTimestampMillis) + nextTimerTimestampState.update(nextFireTimestampMillis) + } + + TriggerResult.CONTINUE + } + + /** + * When the processing-time timer set up in `onElement` fires, we emit the results without purging the window. + * i.e., we keep the current pre-aggregates/IRs in the window so we can continue aggregating. + * + * Note: We don't need to PURGE the window anywhere. Flink will do that automatically when a window expires. + * Flink Docs: "[...] Flink keeps the state of windows until their allowed lateness expires. Once this happens, Flink + * removes the window and deletes its state [...]". + * + * Note: In case the app crashes after a processing-time timer is set, but before it fires, it will fire immediately + * after recovery. + */ + override def onProcessingTime( + timestamp: Long, + window: TimeWindow, + ctx: Trigger.TriggerContext + ): TriggerResult = { + val nextTimerTimestampState = ctx.getPartitionedState(nextTimerTimestampStateDescriptor) + nextTimerTimestampState.update(null) + TriggerResult.FIRE + } + + /** + * Fire any elements left in the buffer if the window ends before the last processing-time timer is fired. + * This can happen because we are using event-time semantics for the window, and processing-time for the buffer timer. + * + * Flink automatically sets up an event timer for the end of the window (+ allowed lateness) as soon as it + * sees the first element in it. See 'registerCleanupTimer' in Flink's 'WindowOperator.java'. + */ + override def onEventTime( + timestamp: Long, + window: TimeWindow, + ctx: Trigger.TriggerContext + ): TriggerResult = { + val nextTimerTimestampState: ValueState[java.lang.Long] = ctx.getPartitionedState( + nextTimerTimestampStateDescriptor + ) + if (nextTimerTimestampState.value() != null) { + TriggerResult.FIRE + } else { + TriggerResult.CONTINUE + } + } + + /** + * When a window is being purged (e.g., because it has expired), we delete timers and state. + * + * This function is called immediately after our 'onEventTime' which fires at the end of the window. + * See 'onEventTime' in Flink's 'WindowOperator.java'. + */ + override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = { + // Remove the lingering processing-time timer if it exist. + val nextTimerTimestampState: ValueState[java.lang.Long] = ctx.getPartitionedState( + nextTimerTimestampStateDescriptor + ) + val nextTimerTimestampStateValue = nextTimerTimestampState.value() + if (nextTimerTimestampStateValue != null) { + ctx.deleteProcessingTimeTimer(nextTimerTimestampStateValue) + } + + // Delete state + nextTimerTimestampState.clear() + } +} diff --git a/flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala b/flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala index 3c04014cb..83f4bd55d 100644 --- a/flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala +++ b/flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala @@ -1,44 +1,20 @@ package ai.chronon.flink.test -import ai.chronon.api.Extensions.{WindowOps, WindowUtils} -import ai.chronon.api.{GroupBy, GroupByServingInfo, PartitionSpec} -import ai.chronon.flink.{FlinkJob, FlinkSource, SparkExpressionEvalFn, WriteResponse} -import ai.chronon.online.Extensions.StructTypeOps +import ai.chronon.flink.window.{TimestampedIR, TimestampedTile} +import ai.chronon.flink.{FlinkJob, SparkExpressionEvalFn} import ai.chronon.online.{Api, GroupByServingInfoParsed} +import ai.chronon.online.KVStore.PutRequest import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.spark.sql.Encoders -import org.apache.spark.sql.types.StructType import org.junit.Assert.assertEquals import org.junit.{After, Before, Test} import org.mockito.Mockito.withSettings import org.scalatestplus.mockito.MockitoSugar.mock -import java.util -import java.util.Collections import scala.jdk.CollectionConverters.asScalaBufferConverter -class E2EEventSource(mockEvents: Seq[E2ETestEvent]) extends FlinkSource[E2ETestEvent] { - override def getDataStream(topic: String, groupName: String)(env: StreamExecutionEnvironment, - parallelism: Int): DataStream[E2ETestEvent] = { - env.fromCollection(mockEvents) - } -} - -class CollectSink extends SinkFunction[WriteResponse] { - override def invoke(value: WriteResponse, context: SinkFunction.Context): Unit = { - CollectSink.values.add(value) - } -} - -object CollectSink { - // must be static - val values: util.List[WriteResponse] = Collections.synchronizedList(new util.ArrayList()) -} - class FlinkJobIntegrationTest { val flinkCluster = new MiniClusterWithClientResource( @@ -47,6 +23,30 @@ class FlinkJobIntegrationTest { .setNumberTaskManagers(1) .build) + // Decode a PutRequest into a TimestampedTile + def avroConvertPutRequestToTimestampedTile[T]( + in: PutRequest, + groupByServingInfoParsed: GroupByServingInfoParsed + ): TimestampedTile = { + // Decode the key bytes into a GenericRecord + val tileBytes = in.valueBytes + val record = groupByServingInfoParsed.keyCodec.decode(in.keyBytes) + + // Get all keys we expect to be in the GenericRecord + val decodedKeys: List[String] = + groupByServingInfoParsed.groupBy.keyColumns.asScala.map(record.get(_).toString).toList + + val tsMills = in.tsMillis.get + TimestampedTile(decodedKeys, tileBytes, tsMills) + } + + // Decode a TimestampedTile into a TimestampedIR + def avroConvertTimestampedTileToTimestampedIR(timestampedTile: TimestampedTile, + groupByServingInfoParsed: GroupByServingInfoParsed): TimestampedIR = { + val tileIR = groupByServingInfoParsed.tiledCodec.decodeTileIr(timestampedTile.tileBytes) + TimestampedIR(tileIR._1, Some(timestampedTile.latestTsMillis)) + } + @Before def setup(): Unit = { flinkCluster.before() @@ -56,45 +56,7 @@ class FlinkJobIntegrationTest { @After def teardown(): Unit = { flinkCluster.after() - } - - private def makeTestGroupByServingInfoParsed(groupBy: GroupBy, - inputSchema: StructType, - outputSchema: StructType): GroupByServingInfoParsed = { - val groupByServingInfo = new GroupByServingInfo() - groupByServingInfo.setGroupBy(groupBy) - - // Set input avro schema for groupByServingInfo - groupByServingInfo.setInputAvroSchema( - inputSchema.toAvroSchema("Input").toString(true) - ) - - // Set key avro schema for groupByServingInfo - groupByServingInfo.setKeyAvroSchema( - StructType( - groupBy.keyColumns.asScala.map { keyCol => - val keyColStructType = outputSchema.fields.find(field => field.name == keyCol) - keyColStructType match { - case Some(col) => col - case None => - throw new IllegalArgumentException(s"Missing key col from output schema: $keyCol") - } - } - ).toAvroSchema("Key") - .toString(true) - ) - - // Set value avro schema for groupByServingInfo - val aggInputColNames = groupBy.aggregations.asScala.map(_.inputColumn).toList - groupByServingInfo.setSelectedAvroSchema( - StructType(outputSchema.fields.filter(field => aggInputColNames.contains(field.name))) - .toAvroSchema("Value") - .toString(true) - ) - new GroupByServingInfoParsed( - groupByServingInfo, - PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis) - ) + CollectSink.values.clear() } @Test @@ -113,9 +75,10 @@ class FlinkJobIntegrationTest { val outputSchema = new SparkExpressionEvalFn(encoder, groupBy).getOutputSchema - val groupByServingInfoParsed = makeTestGroupByServingInfoParsed(groupBy, encoder.schema, outputSchema) + val groupByServingInfoParsed = + FlinkTestUtils.makeTestGroupByServingInfoParsed(groupBy, encoder.schema, outputSchema) val mockApi = mock[Api](withSettings().serializable()) - val writerFn = new MockAsyncKVStoreWriter(Seq(true), mockApi, "testFG") + val writerFn = new MockAsyncKVStoreWriter(Seq(true), mockApi, "testFlinkJobEndToEndFG") val job = new FlinkJob[E2ETestEvent](source, writerFn, groupByServingInfoParsed, encoder, 2) job.runGroupByJob(env).addSink(new CollectSink) @@ -132,4 +95,67 @@ class FlinkJobIntegrationTest { // check that all the writes were successful assertEquals(writeEventCreatedDS.map(_.status), Seq(true, true, true)) } + + @Test + def testTiledFlinkJobEndToEnd(): Unit = { + implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + // Create some test events with multiple different ids so we can check if tiling/pre-aggregation works correctly + // for each of them. + val id1Elements = Seq(E2ETestEvent(id = "id1", int_val = 1, double_val = 1.5, created = 1L), + E2ETestEvent(id = "id1", int_val = 1, double_val = 2.5, created = 2L)) + val id2Elements = Seq(E2ETestEvent(id = "id2", int_val = 1, double_val = 10.0, created = 3L)) + val elements: Seq[E2ETestEvent] = id1Elements ++ id2Elements + val source = new WatermarkedE2EEventSource(elements) + + // Make a GroupBy that SUMs the double_val of the elements. + val groupBy = FlinkTestUtils.makeGroupBy(Seq("id")) + + // Prepare the Flink Job + val encoder = Encoders.product[E2ETestEvent] + val outputSchema = new SparkExpressionEvalFn(encoder, groupBy).getOutputSchema + val groupByServingInfoParsed = + FlinkTestUtils.makeTestGroupByServingInfoParsed(groupBy, encoder.schema, outputSchema) + val mockApi = mock[Api](withSettings().serializable()) + val writerFn = new MockAsyncKVStoreWriter(Seq(true), mockApi, "testTiledFlinkJobEndToEndFG") + val job = new FlinkJob[E2ETestEvent](source, writerFn, groupByServingInfoParsed, encoder, 2) + job.runTiledGroupByJob(env).addSink(new CollectSink) + + env.execute("TiledFlinkJobIntegrationTest") + + // capture the datastream of the 'created' timestamps of all the written out events + val writeEventCreatedDS = CollectSink.values.asScala + + // BASIC ASSERTIONS + // All elements were processed + assert(writeEventCreatedDS.size == elements.size) + // check that the timestamps of the written out events match the input events + // we use a Set as we can have elements out of order given we have multiple tasks + assertEquals(writeEventCreatedDS.map(_.putRequest.tsMillis).map(_.get).toSet, elements.map(_.created).toSet) + // check that all the writes were successful + assertEquals(writeEventCreatedDS.map(_.status), Seq(true, true, true)) + + // Assert that the pre-aggregates/tiles are correct + // Get a list of the final IRs for each key. + val finalIRsPerKey: Map[List[Any], List[Any]] = writeEventCreatedDS + .map(writeEvent => { + // First, we work back from the PutRequest decode it to TimestampedTile and then TimestampedIR + val timestampedTile = + avroConvertPutRequestToTimestampedTile(writeEvent.putRequest, groupByServingInfoParsed) + val timestampedIR = avroConvertTimestampedTileToTimestampedIR(timestampedTile, groupByServingInfoParsed) + + // We're interested in the the keys, Intermediate Result, and the timestamp for each processed event + (timestampedTile.keys, timestampedIR.ir.toList, writeEvent.putRequest.tsMillis.get) + }) + .groupBy(_._1) // Group by the keys + .map((keys) => (keys._1, keys._2.maxBy(_._3)._2)) // pick just the events with largest timestamp + + // Looking back at our test events, we expect the following Intermediate Results to be generated: + val expectedFinalIRsPerKey = Map( + List("id1") -> List(4.0), // Add up the double_val of the two 'id1' events + List("id2") -> List(10.0) + ) + + assertEquals(expectedFinalIRsPerKey, finalIRsPerKey) + } } diff --git a/flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala b/flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala index 790b06f28..ff6a9ae2f 100644 --- a/flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala +++ b/flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala @@ -1,19 +1,50 @@ package ai.chronon.flink.test import ai.chronon.api.{Accuracy, Builders, GroupBy, Operation, TimeUnit, Window} -import ai.chronon.flink.AsyncKVStoreWriter +import ai.chronon.flink.{AsyncKVStoreWriter, FlinkSource, WriteResponse} import ai.chronon.online.{Api, KVStore} -import org.apache.flink.api.java.ExecutionEnvironment -import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import ai.chronon.api.Extensions.{WindowOps, WindowUtils} +import ai.chronon.api.{GroupByServingInfo, PartitionSpec} +import ai.chronon.online.Extensions.StructTypeOps +import ai.chronon.online.GroupByServingInfoParsed +import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.apache.spark.sql.types.StructType import org.mockito.ArgumentMatchers import org.mockito.Mockito.{when, withSettings} import org.scalatestplus.mockito.MockitoSugar.mock +import java.time.Duration +import java.util +import java.util.Collections import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} +import scala.jdk.CollectionConverters.asScalaBufferConverter case class E2ETestEvent(id: String, int_val: Int, double_val: Double, created: Long) +class E2EEventSource(mockEvents: Seq[E2ETestEvent]) extends FlinkSource[E2ETestEvent] { + override def getDataStream(topic: String, groupName: String)(env: StreamExecutionEnvironment, + parallelism: Int): DataStream[E2ETestEvent] = { + env.fromCollection(mockEvents) + } +} + +class WatermarkedE2EEventSource(mockEvents: Seq[E2ETestEvent]) extends FlinkSource[E2ETestEvent] { + def watermarkStrategy: WatermarkStrategy[E2ETestEvent] = + WatermarkStrategy + .forBoundedOutOfOrderness[E2ETestEvent](Duration.ofSeconds(5)) + .withTimestampAssigner(new SerializableTimestampAssigner[E2ETestEvent] { + override def extractTimestamp(event: E2ETestEvent, previousElementTimestamp: Long): Long = + event.created + }) + override def getDataStream(topic: String, groupName: String)(env: StreamExecutionEnvironment, + parallelism: Int): DataStream[E2ETestEvent] = { + env.fromCollection(mockEvents).assignTimestampsAndWatermarks(watermarkStrategy) + } +} + class MockAsyncKVStoreWriter(mockResults: Seq[Boolean], onlineImpl: Api, featureGroup: String) extends AsyncKVStoreWriter(onlineImpl, featureGroup) { override def getKVStore: KVStore = { @@ -25,8 +56,56 @@ class MockAsyncKVStoreWriter(mockResults: Seq[Boolean], onlineImpl: Api, feature } } +class CollectSink extends SinkFunction[WriteResponse] { + override def invoke(value: WriteResponse, context: SinkFunction.Context): Unit = { + CollectSink.values.add(value) + } +} + +object CollectSink { + // must be static + val values: util.List[WriteResponse] = Collections.synchronizedList(new util.ArrayList()) +} + object FlinkTestUtils { + def makeTestGroupByServingInfoParsed(groupBy: GroupBy, + inputSchema: StructType, + outputSchema: StructType): GroupByServingInfoParsed = { + val groupByServingInfo = new GroupByServingInfo() + groupByServingInfo.setGroupBy(groupBy) + // Set input avro schema for groupByServingInfo + groupByServingInfo.setInputAvroSchema( + inputSchema.toAvroSchema("Input").toString(true) + ) + + // Set key avro schema for groupByServingInfo + groupByServingInfo.setKeyAvroSchema( + StructType( + groupBy.keyColumns.asScala.map { keyCol => + val keyColStructType = outputSchema.fields.find(field => field.name == keyCol) + keyColStructType match { + case Some(col) => col + case None => + throw new IllegalArgumentException(s"Missing key col from output schema: $keyCol") + } + } + ).toAvroSchema("Key") + .toString(true) + ) + + // Set value avro schema for groupByServingInfo + val aggInputColNames = groupBy.aggregations.asScala.map(_.inputColumn).toList + groupByServingInfo.setSelectedAvroSchema( + StructType(outputSchema.fields.filter(field => aggInputColNames.contains(field.name))) + .toAvroSchema("Value") + .toString(true) + ) + new GroupByServingInfoParsed( + groupByServingInfo, + PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis) + ) + } def makeGroupBy(keyColumns: Seq[String], filters: Seq[String] = Seq.empty): GroupBy = Builders.GroupBy( sources = Seq( diff --git a/flink/src/test/scala/ai/chronon/flink/test/window/FlinkRowAggregationFunctionTest.scala b/flink/src/test/scala/ai/chronon/flink/test/window/FlinkRowAggregationFunctionTest.scala new file mode 100644 index 000000000..e702a2fa1 --- /dev/null +++ b/flink/src/test/scala/ai/chronon/flink/test/window/FlinkRowAggregationFunctionTest.scala @@ -0,0 +1,218 @@ +package ai.chronon.flink.test.window + +import ai.chronon.api._ +import ai.chronon.flink.window.FlinkRowAggregationFunction +import ai.chronon.online.TileCodec +import org.junit.Assert.fail +import org.junit.Test + +import scala.util.{Failure, Try} + +class FlinkRowAggregationFunctionTest { + private val aggregations: Seq[Aggregation] = Seq( + Builders.Aggregation( + Operation.AVERAGE, + "views", + Seq( + new Window(1, TimeUnit.DAYS), + new Window(1, TimeUnit.HOURS), + new Window(30, TimeUnit.DAYS) + ) + ), + Builders.Aggregation( + Operation.AVERAGE, + "rating", + Seq( + new Window(1, TimeUnit.DAYS), + new Window(1, TimeUnit.HOURS) + ) + ), + Builders.Aggregation( + Operation.MAX, + "title", + Seq( + new Window(1, TimeUnit.DAYS) + ) + ), + Builders.Aggregation( + Operation.LAST, + "title", + Seq( + new Window(1, TimeUnit.DAYS) + ) + ) + ) + + private val schema = List( + Constants.TimeColumn -> LongType, + "views" -> IntType, + "rating" -> FloatType, + "title" -> StringType + ) + + @Test + def testFlinkAggregatorProducesCorrectResults(): Unit = { + val groupByMetadata = Builders.MetaData(name = "my_group_by") + val groupBy = Builders.GroupBy(metaData = groupByMetadata, aggregations = aggregations) + val aggregateFunc = new FlinkRowAggregationFunction(groupBy, schema) + + var acc = aggregateFunc.createAccumulator() + val rows = Seq( + createRow(1519862399984L, 4, 4.0f, "A"), + createRow(1519862399984L, 40, 5.0f, "B"), + createRow(1519862399988L, 3, 3.0f, "C"), + createRow(1519862399988L, 5, 4.0f, "D"), + createRow(1519862399994L, 4, 4.0f, "A"), + createRow(1519862399999L, 10, 4.0f, "A") + ) + rows.foreach(row => acc = aggregateFunc.add(row, acc)) + val result = aggregateFunc.getResult(acc) + + // we sanity check the final result of the accumulator + // to do so, we must first expand / decompress the windowed tile IR into a full tile + // then we can finalize the tile and get the final result + val tileCodec = new TileCodec(groupBy, schema) + val expandedIr = tileCodec.expandWindowedTileIr(result.ir) + val finalResult = tileCodec.windowedRowAggregator.finalize(expandedIr) + + // expect 7 columns as we have 3 view avg time windows, 2 rating avg and 1 max title, 1 last title + assert(finalResult.length == 7) + val expectedAvgViews = 11.0f + val expectedAvgRating = 4.0f + val expectedMax = "D" + val expectedLast = "A" + val expectedResult = Array( + expectedAvgViews, + expectedAvgViews, + expectedAvgViews, + expectedAvgRating, + expectedAvgRating, + expectedMax, + expectedLast + ) + assert(finalResult sameElements expectedResult) + } + + @Test + def testFlinkAggregatorResultsCanBeMergedWithOtherPreAggregates(): Unit = { + val groupByMetadata = Builders.MetaData(name = "my_group_by") + val groupBy = Builders.GroupBy(metaData = groupByMetadata, aggregations = aggregations) + val aggregateFunc = new FlinkRowAggregationFunction(groupBy, schema) + + // create partial aggregate 1 + var acc1 = aggregateFunc.createAccumulator() + val rows1 = Seq( + createRow(1519862399984L, 4, 4.0f, "A"), + createRow(1519862399984L, 40, 5.0f, "B") + ) + rows1.foreach(row => acc1 = aggregateFunc.add(row, acc1)) + val partialResult1 = aggregateFunc.getResult(acc1) + + // create partial aggregate 2 + var acc2 = aggregateFunc.createAccumulator() + val rows2 = Seq( + createRow(1519862399988L, 3, 3.0f, "C"), + createRow(1519862399988L, 5, 4.0f, "D") + ) + rows2.foreach(row => acc2 = aggregateFunc.add(row, acc2)) + val partialResult2 = aggregateFunc.getResult(acc2) + + // create partial aggregate 3 + var acc3 = aggregateFunc.createAccumulator() + val rows3 = Seq( + createRow(1519862399994L, 4, 4.0f, "A"), + createRow(1519862399999L, 10, 4.0f, "A") + ) + rows3.foreach(row => acc3 = aggregateFunc.add(row, acc3)) + val partialResult3 = aggregateFunc.getResult(acc3) + + // lets merge the partial results together and check + val mergedPartialAggregates = aggregateFunc.rowAggregator + .merge( + aggregateFunc.rowAggregator.merge(partialResult1.ir, partialResult2.ir), + partialResult3.ir + ) + + // we sanity check the final result of the accumulator + // to do so, we must first expand / decompress the windowed tile IR into a full tile + // then we can finalize the tile and get the final result + val tileCodec = new TileCodec(groupBy, schema) + val expandedIr = tileCodec.expandWindowedTileIr(mergedPartialAggregates) + val finalResult = tileCodec.windowedRowAggregator.finalize(expandedIr) + + // expect 7 columns as we have 3 view avg time windows, 2 rating avg and 1 max title, 1 last title + assert(finalResult.length == 7) + val expectedAvgViews = 11.0f + val expectedAvgRating = 4.0f + val expectedMax = "D" + val expectedLast = "A" + val expectedResult = Array( + expectedAvgViews, + expectedAvgViews, + expectedAvgViews, + expectedAvgRating, + expectedAvgRating, + expectedMax, + expectedLast + ) + assert(finalResult sameElements expectedResult) + } + + @Test + def testFlinkAggregatorProducesCorrectResultsIfInputIsInIncorrectOrder(): Unit = { + val groupByMetadata = Builders.MetaData(name = "my_group_by") + val groupBy = Builders.GroupBy(metaData = groupByMetadata, aggregations = aggregations) + val aggregateFunc = new FlinkRowAggregationFunction(groupBy, schema) + + var acc = aggregateFunc.createAccumulator() + + // Create a map where the entries are not in the same order as `schema`. + val outOfOrderRow = Map[String, Any]( + "rating" -> 4.0f, + Constants.TimeColumn -> 1519862399999L, + "title" -> "A", + "views" -> 10 + ) + + // If the aggregator fails to fix the order, we'll get a ClassCastException + Try { + acc = aggregateFunc.add(outOfOrderRow, acc) + } match { + case Failure(e) => { + fail( + s"An exception was thrown by the aggregator when it should not have been. " + + s"The aggregator should fix the order without failing. $e") + } + case _ => + } + + val result = aggregateFunc.getResult(acc) + + // we sanity check the final result of the accumulator + // to do so, we must first expand / decompress the windowed tile IR into a full tile + // then we can finalize the tile and get the final result + val tileCodec = new TileCodec(groupBy, schema) + val expandedIr = tileCodec.expandWindowedTileIr(result.ir) + val finalResult = tileCodec.windowedRowAggregator.finalize(expandedIr) + assert(finalResult.length == 7) + + val expectedResult = Array( + outOfOrderRow("views"), + outOfOrderRow("views"), + outOfOrderRow("views"), + outOfOrderRow("rating"), + outOfOrderRow("rating"), + outOfOrderRow("title"), + outOfOrderRow("title") + ) + assert(finalResult sameElements expectedResult) + } + + def createRow(ts: Long, views: Int, rating: Float, title: String): Map[String, Any] = + Map( + Constants.TimeColumn -> ts, + "views" -> views, + "rating" -> rating, + "title" -> title + ) +} diff --git a/flink/src/test/scala/ai/chronon/flink/test/window/KeySelectorTest.scala b/flink/src/test/scala/ai/chronon/flink/test/window/KeySelectorTest.scala new file mode 100644 index 000000000..b81c39aab --- /dev/null +++ b/flink/src/test/scala/ai/chronon/flink/test/window/KeySelectorTest.scala @@ -0,0 +1,58 @@ +package ai.chronon.flink.test.window + +import ai.chronon.api.Builders +import ai.chronon.flink.window.KeySelector +import org.junit.Test + +class KeySelectorTest { + @Test + def TestChrononFlinkJobCorrectlyKeysByAGroupbysEntityKeys(): Unit = { + // We expect something like this to come out of the SparkExprEval operator + val sampleSparkExprEvalOutput: Map[String, Any] = + Map("number" -> 4242, "ip" -> "192.168.0.1", "user" -> "abc") + + val groupByWithOneEntityKey = Builders.GroupBy(keyColumns = Seq("number")) + val keyFunctionOne = KeySelector.getKeySelectionFunction(groupByWithOneEntityKey) + assert( + keyFunctionOne(sampleSparkExprEvalOutput) == List(4242) + ) + + val groupByWithTwoEntityKey = Builders.GroupBy(keyColumns = Seq("number", "user")) + val keyFunctionTwo = KeySelector.getKeySelectionFunction(groupByWithTwoEntityKey) + assert( + keyFunctionTwo(sampleSparkExprEvalOutput) == List(4242, "abc") + ) + } + + @Test + def testKeySelectorFunctionReturnsSameHashesForListsWithTheSameContent(): Unit = { + // This is more of a sanity check. It's not comprehensive. + // SINGLE ENTITY KEY + val map1: Map[String, Any] = + Map("number" -> 4242, "ip" -> "192.168.0.1", "user" -> "abc") + val map2: Map[String, Any] = + Map("number" -> 4242, "ip" -> "10.0.0.1", "user" -> "notabc") + val groupBySingleKey = Builders.GroupBy(keyColumns = Seq("number")) + val keyFunctionOne = KeySelector.getKeySelectionFunction(groupBySingleKey) + assert( + keyFunctionOne(map1).hashCode() == keyFunctionOne(map2).hashCode() + ) + + // TWO ENTITY KEYS + val map3: Map[String, Any] = + Map("number" -> 4242, "ip" -> "192.168.0.1", "user" -> "abc") + val map4: Map[String, Any] = + Map("ip" -> "192.168.0.1", "number" -> 4242, "user" -> "notabc") + val groupByTwoKeys = Builders.GroupBy(keyColumns = Seq("number", "ip")) + val keyFunctionTwo = KeySelector.getKeySelectionFunction(groupByTwoKeys) + assert( + keyFunctionTwo(map3).hashCode() == keyFunctionTwo(map4).hashCode() + ) + + val map5: Map[String, Any] = + Map("ip" -> "192.168.0.1", "number" -> null) + val map6: Map[String, Any] = + Map("ip" -> "192.168.0.1", "number" -> null) + assert(keyFunctionTwo(map5).hashCode() == keyFunctionTwo(map6).hashCode()) + } +} diff --git a/proposals/CHIP-1.md b/proposals/CHIP-1.md index b5a05c369..d2f0de409 100644 --- a/proposals/CHIP-1.md +++ b/proposals/CHIP-1.md @@ -50,7 +50,7 @@ The caches will be configured on a per-GroupBy basis, i.e. two caches per GroupB Caching will be an opt-in feature that can be enabled by Chronon developers. -Most of the code changes are in [FetcherBase.scala](https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/FetcherBase.scala). +Most of the code changes are in [FetcherBase.scala](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/FetcherBase.scala). ### Batch Caching Details @@ -144,7 +144,7 @@ The size of the cache should ideally be set in terms of maximum memory usage (e. ### Step 1: BatchIr Caching -We start by caching the conversion from `batchBytes` to `FinalBatchIr` (the [toBatchIr function in FetcherBase](https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/FetcherBase.scala#L102)) and `Map[String, AnyRef]`. +We start by caching the conversion from `batchBytes` to `FinalBatchIr` (the [toBatchIr function in FetcherBase](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/FetcherBase.scala#L102)) and `Map[String, AnyRef]`. To make testing easier, we'll disable this feature by default and enable it via Java Args. @@ -166,7 +166,7 @@ Results: will add ### Step 3: `TiledIr` Caching -The second step is caching [tile bytes to TiledIr](https://github.com/airbnb/chronon/blob/master/online/src/main/scala/ai/chronon/online/TileCodec.scala#L77C67-L77C67). This is only possible if the tile bytes contain information about whether a tile is complete (i.e. it won’t be updated anymore). The Flink side marks tiles as complete. +The second step is caching [tile bytes to TiledIr](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/TileCodec.scala#L77C67-L77C67). This is only possible if the tile bytes contain information about whether a tile is complete (i.e. it won’t be updated anymore). The Flink side marks tiles as complete. This cache can be "monoid-aware". Instead of storing multiple consecutive tiles for a given time range, we combine the tiles and store a single, larger tile in memory. For example, we combine two tiles, [0, 1) and [1, 2), into one, [0, 2). diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 39d2b5aed..a5276ea40 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -16,7 +16,6 @@ package ai.chronon.spark -import org.slf4j.LoggerFactory import ai.chronon.api import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps} import ai.chronon.api.ThriftJsonCodec @@ -36,6 +35,7 @@ import org.apache.spark.sql.streaming.StreamingQueryListener.{ import org.apache.spark.sql.{DataFrame, SparkSession, SparkSessionExtensions} import org.apache.thrift.TBase import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand} +import org.slf4j.LoggerFactory import java.io.File import java.nio.file.{Files, Paths} @@ -232,6 +232,8 @@ object Driver { opt[String](required = false, descr = "Start date to compute join backfill, this start date will override start partition in conf.") + val selectedJoinParts: ScallopOption[List[String]] = + opt[List[String]](required = false, descr = "A list of join parts that require backfilling.") lazy val joinConf: api.Join = parseConf[api.Join](confPath()) override def subcommandName() = s"join_${joinConf.metaData.name}" } @@ -242,8 +244,18 @@ object Driver { args.joinConf, args.endDate(), args.buildTableUtils(), - !args.runFirstHole() + !args.runFirstHole(), + selectedJoinParts = args.selectedJoinParts.toOption ) + + if (args.selectedJoinParts.isDefined) { + join.computeJoinOpt(args.stepDays.toOption, args.startPartitionOverride.toOption) + logger.info( + s"Backfilling selected join parts: ${args.selectedJoinParts()} is complete. Skipping the final join. Exiting." + ) + return + } + val df = join.computeJoin(args.stepDays.toOption, args.startPartitionOverride.toOption) if (args.shouldExport()) { @@ -558,6 +570,11 @@ object Driver { descr = "file path to json of the keys to fetch", short = 'f' ) + val atMillis: ScallopOption[Long] = opt[Long]( + required = false, + descr = "timestamp to fetch the data at", + default = None + ) val interval: ScallopOption[Int] = opt[Int]( required = false, descr = "interval between requests in seconds", @@ -626,7 +643,7 @@ object Driver { fetchStats(args, objectMapper, keyMap, fetcher) } else { val startNs = System.nanoTime - val requests = Seq(Fetcher.Request(args.name(), keyMap)) + val requests = Seq(Fetcher.Request(args.name(), keyMap, args.atMillis.toOption)) val resultFuture = if (args.`type`() == "join") { fetcher.fetchJoin(requests) } else { diff --git a/spark/src/main/scala/ai/chronon/spark/GroupBy.scala b/spark/src/main/scala/ai/chronon/spark/GroupBy.scala index a8224f1cf..fc7e724df 100644 --- a/spark/src/main/scala/ai/chronon/spark/GroupBy.scala +++ b/spark/src/main/scala/ai/chronon/spark/GroupBy.scala @@ -16,22 +16,21 @@ package ai.chronon.spark -import org.slf4j.LoggerFactory import ai.chronon.aggregator.base.TimeTuple import ai.chronon.aggregator.row.RowAggregator import ai.chronon.aggregator.windowing._ import ai.chronon.api -import ai.chronon.api.{Accuracy, Constants, DataModel, ParametricMacro} import ai.chronon.api.DataModel.{Entities, Events} import ai.chronon.api.Extensions._ +import ai.chronon.api.{Accuracy, Constants, DataModel, ParametricMacro} import ai.chronon.online.{RowWrapper, SparkConversions} import ai.chronon.spark.Extensions._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ -import org.apache.spark.sql import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.util.sketch.BloomFilter +import org.slf4j.LoggerFactory import java.util import scala.collection.{Seq, mutable} diff --git a/spark/src/main/scala/ai/chronon/spark/Join.scala b/spark/src/main/scala/ai/chronon/spark/Join.scala index c06d54390..054730905 100644 --- a/spark/src/main/scala/ai/chronon/spark/Join.scala +++ b/spark/src/main/scala/ai/chronon/spark/Join.scala @@ -16,7 +16,6 @@ package ai.chronon.spark -import org.slf4j.LoggerFactory import ai.chronon.api import ai.chronon.api.Extensions._ import ai.chronon.api._ @@ -27,13 +26,11 @@ import org.apache.spark.sql import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ -import java.util.concurrent.{Callable, ExecutorCompletionService, ExecutorService, Executors} -import scala.collection.Seq -import scala.collection.mutable -import scala.collection.parallel.ExecutionContextTaskSupport -import scala.concurrent.duration.{Duration, DurationInt} +import java.util.concurrent.Executors +import scala.collection.{Seq, mutable} +import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future} -import scala.util.ScalaJavaConversions.{IterableOps, ListOps, MapOps} +import scala.util.ScalaJavaConversions.{ListOps, MapOps} import scala.util.{Failure, Success} /* @@ -65,8 +62,9 @@ class Join(joinConf: api.Join, tableUtils: TableUtils, skipFirstHole: Boolean = true, mutationScan: Boolean = true, - showDf: Boolean = false) - extends JoinBase(joinConf, endPartition, tableUtils, skipFirstHole, mutationScan, showDf) { + showDf: Boolean = false, + selectedJoinParts: Option[List[String]] = None) + extends JoinBase(joinConf, endPartition, tableUtils, skipFirstHole, mutationScan, showDf, selectedJoinParts) { private val bootstrapTable = joinConf.metaData.bootstrapTable @@ -154,8 +152,22 @@ class Join(joinConf: api.Join, }.toSeq } - val coveringSetsPerJoinPart: Seq[(JoinPartMetadata, Seq[CoveringSet])] = bootstrapInfo.joinParts.map { - joinPartMetadata => + val partsToCompute: Seq[JoinPartMetadata] = { + if (selectedJoinParts.isEmpty) { + bootstrapInfo.joinParts + } else { + bootstrapInfo.joinParts.filter(part => selectedJoinParts.get.contains(part.joinPart.fullPrefix)) + } + } + + if (selectedJoinParts.isDefined && partsToCompute.isEmpty) { + throw new IllegalArgumentException( + s"Selected join parts are not found. Available ones are: ${bootstrapInfo.joinParts.map(_.joinPart.fullPrefix).prettyInline}") + } + + val coveringSetsPerJoinPart: Seq[(JoinPartMetadata, Seq[CoveringSet])] = bootstrapInfo.joinParts + .filter(part => selectedJoinParts.isEmpty || partsToCompute.contains(part)) + .map { joinPartMetadata => val coveringSets = distinctBootstrapSets.map { case (hashes, rowCount) => val schema = hashes.toSet.flatMap(bootstrapInfo.hashToSchema.apply) @@ -169,7 +181,7 @@ class Join(joinConf: api.Join, CoveringSet(hashes, rowCount, isCovering) } (joinPartMetadata, coveringSets) - } + } logger.info( s"\n======= CoveringSet for JoinPart ${joinConf.metaData.name} for PartitionRange(${leftRange.start}, ${leftRange.end}) =======\n") @@ -185,7 +197,9 @@ class Join(joinConf: api.Join, coveringSetsPerJoinPart } - override def computeRange(leftDf: DataFrame, leftRange: PartitionRange, bootstrapInfo: BootstrapInfo): DataFrame = { + override def computeRange(leftDf: DataFrame, + leftRange: PartitionRange, + bootstrapInfo: BootstrapInfo): Option[DataFrame] = { val leftTaggedDf = if (leftDf.schema.names.contains(Constants.TimeColumn)) { leftDf.withTimeBasedColumn(Constants.TimePartitionColumn) } else { @@ -259,6 +273,9 @@ class Join(joinConf: api.Join, } val rightResults = Await.result(Future.sequence(rightResultsFuture), Duration.Inf).flatten + // early exit if selectedJoinParts is defined. Otherwise, we combine all join parts + if (selectedJoinParts.isDefined) return None + // combine bootstrap table and join part tables // sequentially join bootstrap table and each join part table. some column may exist both on left and right because // a bootstrap source can cover a partial date range. we combine the columns using coalesce-rule @@ -287,7 +304,7 @@ class Join(joinConf: api.Join, bootstrapInfo, leftDf.columns) finalDf.explain() - finalDf + Some(finalDf) } def applyDerivation(baseDf: DataFrame, bootstrapInfo: BootstrapInfo, leftColumns: Seq[String]): DataFrame = { diff --git a/spark/src/main/scala/ai/chronon/spark/JoinBase.scala b/spark/src/main/scala/ai/chronon/spark/JoinBase.scala index 12664ab03..d95fd0edd 100644 --- a/spark/src/main/scala/ai/chronon/spark/JoinBase.scala +++ b/spark/src/main/scala/ai/chronon/spark/JoinBase.scala @@ -16,7 +16,6 @@ package ai.chronon.spark -import org.slf4j.LoggerFactory import ai.chronon.api import ai.chronon.api.DataModel.{Entities, Events} import ai.chronon.api.Extensions._ @@ -28,6 +27,7 @@ import com.google.gson.Gson import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.apache.spark.util.sketch.BloomFilter +import org.slf4j.LoggerFactory import java.time.Instant import scala.collection.JavaConverters._ @@ -38,7 +38,8 @@ abstract class JoinBase(joinConf: api.Join, tableUtils: TableUtils, skipFirstHole: Boolean, mutationScan: Boolean = true, - showDf: Boolean = false) { + showDf: Boolean = false, + selectedJoinParts: Option[Seq[String]] = None) { @transient lazy val logger = LoggerFactory.getLogger(getClass) assert(Option(joinConf.metaData.outputNamespace).nonEmpty, s"output namespace could not be empty or null") val metrics: Metrics.Context = Metrics.Context(Metrics.Environment.JoinOffline, joinConf) @@ -286,9 +287,13 @@ abstract class JoinBase(joinConf: api.Join, Some(rightDfWithDerivations) } - def computeRange(leftDf: DataFrame, leftRange: PartitionRange, bootstrapInfo: BootstrapInfo): DataFrame + def computeRange(leftDf: DataFrame, leftRange: PartitionRange, bootstrapInfo: BootstrapInfo): Option[DataFrame] def computeJoin(stepDays: Option[Int] = None, overrideStartPartition: Option[String] = None): DataFrame = { + computeJoinOpt(stepDays, overrideStartPartition).get + } + + def computeJoinOpt(stepDays: Option[Int] = None, overrideStartPartition: Option[String] = None): Option[DataFrame] = { assert(Option(joinConf.metaData.team).nonEmpty, s"join.metaData.team needs to be set for join ${joinConf.metaData.name}") @@ -337,7 +342,7 @@ abstract class JoinBase(joinConf: api.Join, def finalResult: DataFrame = tableUtils.sql(rangeToFill.genScanQuery(null, outputTable)) if (unfilledRanges.isEmpty) { logger.info(s"\nThere is no data to compute based on end partition of ${rangeToFill.end}.\n\n Exiting..") - return finalResult + return Some(finalResult) } stepDays.foreach(metrics.gauge("step_days", _)) @@ -358,14 +363,23 @@ abstract class JoinBase(joinConf: api.Join, leftDf(joinConf, range, tableUtils).map { leftDfInRange => if (showDf) leftDfInRange.prettyPrint() // set autoExpand = true to ensure backward compatibility due to column ordering changes - computeRange(leftDfInRange, range, bootstrapInfo).save(outputTable, tableProps, autoExpand = true) - val elapsedMins = (System.currentTimeMillis() - startMillis) / (60 * 1000) - metrics.gauge(Metrics.Name.LatencyMinutes, elapsedMins) - metrics.gauge(Metrics.Name.PartitionCount, range.partitions.length) - logger.info(s"Wrote to table $outputTable, into partitions: ${range.toString} $progress in $elapsedMins mins") + val finalDf = computeRange(leftDfInRange, range, bootstrapInfo) + if (selectedJoinParts.isDefined) { + assert(finalDf.isEmpty, + "The arg `selectedJoinParts` is defined, so no final join is required. `finalDf` should be empty") + logger.info(s"Skipping writing to the output table for range: ${range.toString} $progress") + return None + } else { + finalDf.get.save(outputTable, tableProps, autoExpand = true) + val elapsedMins = (System.currentTimeMillis() - startMillis) / (60 * 1000) + metrics.gauge(Metrics.Name.LatencyMinutes, elapsedMins) + metrics.gauge(Metrics.Name.PartitionCount, range.partitions.length) + logger.info( + s"Wrote to table $outputTable, into partitions: ${range.toString} $progress in $elapsedMins mins") + } } } logger.info(s"Wrote to table $outputTable, into partitions: $unfilledRanges") - finalResult + Some(finalResult) } } diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index 64ce6e654..be15616c5 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -35,7 +35,8 @@ object SparkSessionBuilder { def build(name: String, local: Boolean = false, localWarehouseLocation: Option[String] = None, - additionalConfig: Option[Map[String, String]] = None): SparkSession = { + additionalConfig: Option[Map[String, String]] = None, + enforceKryoSerializer: Boolean = true): SparkSession = { if (local) { //required to run spark locally with hive support enabled - for sbt test System.setSecurityManager(null) @@ -49,16 +50,20 @@ object SparkSessionBuilder { .config("spark.sql.session.timeZone", "UTC") //otherwise overwrite will delete ALL partitions, not just the ones it touches .config("spark.sql.sources.partitionOverwriteMode", "dynamic") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator") - .config("spark.kryoserializer.buffer.max", "2000m") - .config("spark.kryo.referenceTracking", "false") .config("hive.exec.dynamic.partition", "true") .config("hive.exec.dynamic.partition.mode", "nonstrict") .config("spark.sql.catalogImplementation", "hive") .config("spark.hadoop.hive.exec.max.dynamic.partitions", 30000) .config("spark.sql.legacy.timeParserPolicy", "LEGACY") + // Staging queries don't benefit from the KryoSerializer and in fact may fail with buffer underflow in some cases. + if (enforceKryoSerializer) { + baseBuilder + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator") + .config("spark.kryoserializer.buffer.max", "2000m") + .config("spark.kryo.referenceTracking", "false") + } additionalConfig.foreach { configMap => configMap.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) } } diff --git a/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala b/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala index c25c7b0d7..9570ed945 100644 --- a/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala +++ b/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala @@ -128,7 +128,8 @@ object StagingQuery { val stagingQueryJob = new StagingQuery( stagingQueryConf, parsedArgs.endDate(), - TableUtils(SparkSessionBuilder.build(s"staging_query_${stagingQueryConf.metaData.name}")) + TableUtils( + SparkSessionBuilder.build(s"staging_query_${stagingQueryConf.metaData.name}", enforceKryoSerializer = false)) ) stagingQueryJob.computeStagingQuery(parsedArgs.stepDays.toOption) } diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index 15007dee3..0703c437b 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel import java.time.format.DateTimeFormatter @@ -324,8 +324,18 @@ case class TableUtils(sparkSession: SparkSession) { val partitionCount = sparkSession.sparkContext.getConf.getInt("spark.default.parallelism", 1000) logger.info( s"\n----[Running query coalesced into at most $partitionCount partitions]----\n$query\n----[End of Query]----\n") - val df = sparkSession.sql(query).coalesce(partitionCount) - df + try { + // Run the query + val df = sparkSession.sql(query).coalesce(partitionCount) + df + } catch { + case e: AnalysisException if e.getMessage.contains(" already exists") => + logger.warn(s"Non-Fatal: ${e.getMessage}. Query may result in redefinition.") + sparkSession.sql("SHOW USER FUNCTIONS") + case e: Exception => + logger.error("Error running query:", e) + throw e + } } def insertUnPartitioned(df: DataFrame, diff --git a/spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java b/spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java index 611c7c927..2bbefaede 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java +++ b/spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java @@ -39,7 +39,7 @@ public class JavaFetcherTest { String namespace = "java_fetcher_test"; - SparkSession session = SparkSessionBuilder.build(namespace, true, scala.Option.apply(null), scala.Option.apply(null)); + SparkSession session = SparkSessionBuilder.build(namespace, true, scala.Option.apply(null), scala.Option.apply(null), true); TableUtils tu = new TableUtils(session); InMemoryKvStore kvStore = new InMemoryKvStore(func(() -> tu)); MockApi mockApi = new MockApi(func(() -> kvStore), "java_fetcher_test"); diff --git a/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala b/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala index 5ed03f9d2..95caa0c66 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala @@ -27,9 +27,10 @@ import ai.chronon.spark.stats.SummaryJob import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructType, StringType => SparkStringType} -import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession} import org.junit.Assert._ import org.junit.Test +import org.scalatest.Assertions.intercept import scala.collection.JavaConverters._ import scala.util.ScalaJavaConversions.ListOps @@ -1170,4 +1171,122 @@ class JoinTest { val computed = runner.computeJoin(Some(7)) assertFalse(computed.isEmpty) } + + /** + * Create a event table as left side, 3 group bys as right side. + * Generate data using DataFrameGen and save to the tables. + * Create a join with only one join part selected. + * Run computeJoin(). + * Check if the selected join part is computed and the other join parts are not computed. + */ + @Test + def testSelectedJoinParts(): Unit = { + // Left + val itemQueries = List( + Column("item", api.StringType, 100), + Column("value", api.LongType, 100) + ) + val itemQueriesTable = s"$namespace.item_queries_selected_join_parts" + spark.sql(s"DROP TABLE IF EXISTS $itemQueriesTable") + spark.sql(s"DROP TABLE IF EXISTS ${itemQueriesTable}_tmp") + DataFrameGen.events(spark, itemQueries, 10000, partitions = 30).save(s"${itemQueriesTable}_tmp") + val leftDf = tableUtils.sql(s"SELECT item, value, ts, ds FROM ${itemQueriesTable}_tmp") + leftDf.save(itemQueriesTable) + val start = monthAgo + + // Right + val viewsSchema = List( + Column("user", api.StringType, 10000), + Column("item", api.StringType, 100), + Column("value", api.LongType, 100) + ) + val viewsTable = s"$namespace.view_selected_join_parts" + spark.sql(s"DROP TABLE IF EXISTS $viewsTable") + DataFrameGen.events(spark, viewsSchema, count = 10000, partitions = 30).save(viewsTable) + + // Group By + val gb1 = Builders.GroupBy( + sources = Seq( + Builders.Source.events( + table = viewsTable, + query = Builders.Query(startPartition = start) + )), + keyColumns = Seq("item"), + aggregations = Seq( + Builders.Aggregation(operation = Operation.LAST_K, argMap = Map("k" -> "10"), inputColumn = "user"), + Builders.Aggregation(operation = Operation.MAX, argMap = Map("k" -> "2"), inputColumn = "value") + ), + metaData = + Builders.MetaData(name = s"unit_test.item_views_selected_join_parts_1", namespace = namespace, team = "item_team"), + accuracy = Accuracy.SNAPSHOT + ) + + val gb2 = Builders.GroupBy( + sources = Seq( + Builders.Source.events( + table = viewsTable, + query = Builders.Query(startPartition = start) + )), + keyColumns = Seq("item"), + aggregations = Seq( + Builders.Aggregation(operation = Operation.MIN, argMap = Map("k" -> "1"), inputColumn = "value") + ), + metaData = + Builders.MetaData(name = s"unit_test.item_views_selected_join_parts_2", namespace = namespace, team = "item_team"), + accuracy = Accuracy.SNAPSHOT + ) + + val gb3 = Builders.GroupBy( + sources = Seq( + Builders.Source.events( + table = viewsTable, + query = Builders.Query(startPartition = start) + )), + keyColumns = Seq("item"), + aggregations = Seq( + Builders.Aggregation(operation = Operation.AVERAGE, inputColumn = "value") + ), + metaData = + Builders.MetaData(name = s"unit_test.item_views_selected_join_parts_3", namespace = namespace, team = "item_team"), + accuracy = Accuracy.SNAPSHOT + ) + + // Join + val joinConf = Builders.Join( + left = Builders.Source.events(Builders.Query(startPartition = start), table = itemQueriesTable), + joinParts = Seq( + Builders.JoinPart(groupBy = gb1, prefix = "user1"), + Builders.JoinPart(groupBy = gb2, prefix = "user2"), + Builders.JoinPart(groupBy = gb3, prefix = "user3") + ), + metaData = Builders.MetaData(name = s"unit_test.item_temporal_features.selected_join_parts", + namespace = namespace, + team = "item_team", + online = true) + ) + + // Drop Join Part tables if any + val partTable1 = s"${joinConf.metaData.outputTable}_user1_unit_test_item_views_selected_join_parts_1" + val partTable2 = s"${joinConf.metaData.outputTable}_user2_unit_test_item_views_selected_join_parts_2" + val partTable3 = s"${joinConf.metaData.outputTable}_user3_unit_test_item_views_selected_join_parts_3" + spark.sql(s"DROP TABLE IF EXISTS $partTable1") + spark.sql(s"DROP TABLE IF EXISTS $partTable2") + spark.sql(s"DROP TABLE IF EXISTS $partTable3") + + // Compute daily join. + val joinJob = new Join(joinConf, today, tableUtils, selectedJoinParts = Some(List("user1_unit_test_item_views_selected_join_parts_1"))) + + joinJob.computeJoinOpt() + + val part1 = tableUtils.sql(s"SELECT * FROM $partTable1") + assertTrue(part1.count() > 0) + + val thrown2 = intercept[AnalysisException] { + spark.sql(s"SELECT * FROM $partTable2") + } + val thrown3 = intercept[AnalysisException] { + spark.sql(s"SELECT * FROM $partTable3") + } + assert(thrown2.getMessage.contains("Table or view not found") && thrown3.getMessage.contains("Table or view not found")) + } } diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index f15152533..5823cb397 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -22,6 +22,7 @@ import ai.chronon.spark.test.TestUtils.makeDf import ai.chronon.api.{StructField, _} import ai.chronon.online.SparkConversions import ai.chronon.spark.{IncompatibleSchemaException, PartitionRange, SparkSessionBuilder, TableUtils} +import org.apache.hadoop.hive.ql.exec.UDF import org.apache.spark.sql.functions.col import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession, types} import org.junit.Assert.{assertEquals, assertFalse, assertTrue} @@ -29,6 +30,14 @@ import org.junit.Test import scala.util.Try + + +class SimpleAddUDF extends UDF { + def evaluate(value: Int): Int = { + value + 20 + } +} + class TableUtilsTest { lazy val spark: SparkSession = SparkSessionBuilder.build("TableUtilsTest", local = true) private val tableUtils = TableUtils(spark) @@ -409,6 +418,12 @@ class TableUtilsTest { assertTrue(tableUtils.checkTablePermission(tableName)) } + @Test + def testDoubleUDFRegistration(): Unit = { + tableUtils.sql("CREATE TEMPORARY FUNCTION test AS 'ai.chronon.spark.test.SimpleAddUDF'") + tableUtils.sql("CREATE TEMPORARY FUNCTION test AS 'ai.chronon.spark.test.SimpleAddUDF'") + } + @Test def testIfPartitionExistsInTable(): Unit = { val tableName = "db.test_if_partition_exists"