From 7155156ea7bbd29d233a9c02df7fe57059452fd0 Mon Sep 17 00:00:00 2001 From: Cristian Figueroa Date: Fri, 1 Mar 2024 08:48:58 -0800 Subject: [PATCH 1/4] [Driver] Remove exit override for GB streaming --- .../main/scala/ai/chronon/spark/Driver.scala | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 7df794961..39d2b5aed 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -858,7 +858,6 @@ object Driver { def main(baseArgs: Array[String]): Unit = { val args = new Args(baseArgs) - var shouldExit = true args.subcommand match { case Some(x) => x match { @@ -866,26 +865,21 @@ object Driver { case args.GroupByBackfillArgs => GroupByBackfill.run(args.GroupByBackfillArgs) case args.StagingQueryBackfillArgs => StagingQueryBackfill.run(args.StagingQueryBackfillArgs) case args.GroupByUploadArgs => GroupByUploader.run(args.GroupByUploadArgs) - case args.GroupByStreamingArgs => - shouldExit = false - GroupByStreaming.run(args.GroupByStreamingArgs) - - case args.MetadataUploaderArgs => MetadataUploader.run(args.MetadataUploaderArgs) - case args.FetcherCliArgs => FetcherCli.run(args.FetcherCliArgs) - case args.LogFlattenerArgs => LogFlattener.run(args.LogFlattenerArgs) - case args.ConsistencyMetricsArgs => ConsistencyMetricsCompute.run(args.ConsistencyMetricsArgs) - case args.CompareJoinQueryArgs => CompareJoinQuery.run(args.CompareJoinQueryArgs) - case args.AnalyzerArgs => Analyzer.run(args.AnalyzerArgs) - case args.DailyStatsArgs => DailyStats.run(args.DailyStatsArgs) - case args.LogStatsArgs => LogStats.run(args.LogStatsArgs) - case args.MetadataExportArgs => MetadataExport.run(args.MetadataExportArgs) - case args.LabelJoinArgs => LabelJoin.run(args.LabelJoinArgs) - case _ => logger.info(s"Unknown subcommand: $x") + case args.GroupByStreamingArgs => GroupByStreaming.run(args.GroupByStreamingArgs) + case args.MetadataUploaderArgs => MetadataUploader.run(args.MetadataUploaderArgs) + case args.FetcherCliArgs => FetcherCli.run(args.FetcherCliArgs) + case args.LogFlattenerArgs => LogFlattener.run(args.LogFlattenerArgs) + case args.ConsistencyMetricsArgs => ConsistencyMetricsCompute.run(args.ConsistencyMetricsArgs) + case args.CompareJoinQueryArgs => CompareJoinQuery.run(args.CompareJoinQueryArgs) + case args.AnalyzerArgs => Analyzer.run(args.AnalyzerArgs) + case args.DailyStatsArgs => DailyStats.run(args.DailyStatsArgs) + case args.LogStatsArgs => LogStats.run(args.LogStatsArgs) + case args.MetadataExportArgs => MetadataExport.run(args.MetadataExportArgs) + case args.LabelJoinArgs => LabelJoin.run(args.LabelJoinArgs) + case _ => logger.info(s"Unknown subcommand: $x") } case None => logger.info(s"specify a subcommand please") } - if (shouldExit) { - System.exit(0) - } + System.exit(0) } } From 9c9e9b5f3134960f8c0135e67f8a16664c296318 Mon Sep 17 00:00:00 2001 From: Cristian Figueroa Date: Fri, 1 Mar 2024 10:31:49 -0800 Subject: [PATCH 2/4] [Driver] Streaming error handling --- .../ai/chronon/spark/streaming/GroupBy.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala b/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala index cfa5fe48c..ae21321ac 100644 --- a/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala +++ b/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala @@ -33,7 +33,8 @@ import java.time.format.DateTimeFormatter import java.time.{Instant, ZoneId, ZoneOffset} import java.util.Base64 import scala.collection.JavaConverters._ -import scala.concurrent.duration.{DurationInt} +import scala.concurrent.duration.DurationInt +import scala.util.{Success, Try} class GroupBy(inputStream: DataFrame, session: SparkSession, @@ -75,22 +76,25 @@ class GroupBy(inputStream: DataFrame, } def run(local: Boolean = false): StreamingQuery = { - buildDataStream(local).start() + val fetcher = onlineImpl.buildFetcher(local) + fetcher.getGroupByServingInfo(groupByConf.getMetaData.getName) match { + case scala.util.Failure(exception) => + logger.error(s"Failed to retrieve groupByServingInfo: ${exception.getMessage}") + session.stop() + sys.exit(1) + case scala.util.Success(groupByServingInfo) => + buildDataStream(groupByServingInfo).start() + } } // TODO: Support local by building gbServingInfo based on specified type hints when available. - def buildDataStream(local: Boolean = false): DataStreamWriter[KVStore.PutRequest] = { + def buildDataStream(groupByServingInfo: GroupByServingInfoParsed): DataStreamWriter[KVStore.PutRequest] = { val streamingTable = groupByConf.metaData.cleanName + "_stream" - val fetcher = onlineImpl.buildFetcher(local) - val groupByServingInfo = fetcher.getGroupByServingInfo(groupByConf.getMetaData.getName).get - val streamDecoder = onlineImpl.streamDecoder(groupByServingInfo) assert(groupByConf.streamingSource.isDefined, "No streaming source defined in GroupBy. Please set a topic/mutationTopic.") val streamingSource = groupByConf.streamingSource.get - val streamingQuery = buildStreamingQuery(streamingTable) - val context = Metrics.Context(Metrics.Environment.GroupByStreaming, groupByConf) val ingressContext = context.withSuffix("ingress") import session.implicits._ From c0fb7b37ce0a13879cd425051bdf64f15ff5f58b Mon Sep 17 00:00:00 2001 From: Cristian Figueroa Date: Fri, 1 Mar 2024 10:34:08 -0800 Subject: [PATCH 3/4] Imports --- .../src/main/scala/ai/chronon/spark/streaming/GroupBy.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala b/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala index ae21321ac..7c8ac7220 100644 --- a/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala +++ b/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala @@ -34,7 +34,7 @@ import java.time.{Instant, ZoneId, ZoneOffset} import java.util.Base64 import scala.collection.JavaConverters._ import scala.concurrent.duration.DurationInt -import scala.util.{Success, Try} +import scala.util.{Success, Failure} class GroupBy(inputStream: DataFrame, session: SparkSession, @@ -78,11 +78,11 @@ class GroupBy(inputStream: DataFrame, def run(local: Boolean = false): StreamingQuery = { val fetcher = onlineImpl.buildFetcher(local) fetcher.getGroupByServingInfo(groupByConf.getMetaData.getName) match { - case scala.util.Failure(exception) => + case Failure(exception) => logger.error(s"Failed to retrieve groupByServingInfo: ${exception.getMessage}") session.stop() sys.exit(1) - case scala.util.Success(groupByServingInfo) => + case Success(groupByServingInfo) => buildDataStream(groupByServingInfo).start() } } From 4931fe2494d2743ecda89fb41c6e0476ab78c784 Mon Sep 17 00:00:00 2001 From: Cristian Figueroa Date: Fri, 1 Mar 2024 11:46:05 -0800 Subject: [PATCH 4/4] Reduce LOC and signature changes --- .../ai/chronon/spark/streaming/GroupBy.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala b/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala index 7c8ac7220..2775a4216 100644 --- a/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala +++ b/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala @@ -33,8 +33,8 @@ import java.time.format.DateTimeFormatter import java.time.{Instant, ZoneId, ZoneOffset} import java.util.Base64 import scala.collection.JavaConverters._ -import scala.concurrent.duration.DurationInt -import scala.util.{Success, Failure} +import scala.concurrent.duration.{DurationInt} +import scala.util.{Failure, Success} class GroupBy(inputStream: DataFrame, session: SparkSession, @@ -76,25 +76,27 @@ class GroupBy(inputStream: DataFrame, } def run(local: Boolean = false): StreamingQuery = { - val fetcher = onlineImpl.buildFetcher(local) - fetcher.getGroupByServingInfo(groupByConf.getMetaData.getName) match { - case Failure(exception) => - logger.error(s"Failed to retrieve groupByServingInfo: ${exception.getMessage}") - session.stop() - sys.exit(1) - case Success(groupByServingInfo) => - buildDataStream(groupByServingInfo).start() - } + buildDataStream(local).start() } // TODO: Support local by building gbServingInfo based on specified type hints when available. - def buildDataStream(groupByServingInfo: GroupByServingInfoParsed): DataStreamWriter[KVStore.PutRequest] = { + def buildDataStream(local: Boolean = false): DataStreamWriter[KVStore.PutRequest] = { val streamingTable = groupByConf.metaData.cleanName + "_stream" + val fetcher = onlineImpl.buildFetcher(local) + val groupByServingInfoOpt = fetcher.getGroupByServingInfo(groupByConf.getMetaData.getName) + if (groupByServingInfoOpt.isFailure) { + logger.error(s"Failed to retrieve groupByServingInfo: ${groupByServingInfoOpt.failed.get.getMessage}") + session.stop() + sys.exit(1) + } + val groupByServingInfo = groupByServingInfoOpt.get val streamDecoder = onlineImpl.streamDecoder(groupByServingInfo) assert(groupByConf.streamingSource.isDefined, "No streaming source defined in GroupBy. Please set a topic/mutationTopic.") val streamingSource = groupByConf.streamingSource.get + val streamingQuery = buildStreamingQuery(streamingTable) + val context = Metrics.Context(Metrics.Environment.GroupByStreaming, groupByConf) val ingressContext = context.withSuffix("ingress") import session.implicits._