Skip to content

Commit

Permalink
[Driver] Streaming error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianfr committed Mar 1, 2024
1 parent 7155156 commit 9c9e9b5
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import java.time.format.DateTimeFormatter
import java.time.{Instant, ZoneId, ZoneOffset}
import java.util.Base64
import scala.collection.JavaConverters._
import scala.concurrent.duration.{DurationInt}
import scala.concurrent.duration.DurationInt
import scala.util.{Success, Try}

class GroupBy(inputStream: DataFrame,
session: SparkSession,
Expand Down Expand Up @@ -75,22 +76,25 @@ class GroupBy(inputStream: DataFrame,
}

def run(local: Boolean = false): StreamingQuery = {
buildDataStream(local).start()
val fetcher = onlineImpl.buildFetcher(local)
fetcher.getGroupByServingInfo(groupByConf.getMetaData.getName) match {
case scala.util.Failure(exception) =>
logger.error(s"Failed to retrieve groupByServingInfo: ${exception.getMessage}")
session.stop()
sys.exit(1)
case scala.util.Success(groupByServingInfo) =>
buildDataStream(groupByServingInfo).start()
}
}

// TODO: Support local by building gbServingInfo based on specified type hints when available.
def buildDataStream(local: Boolean = false): DataStreamWriter[KVStore.PutRequest] = {
def buildDataStream(groupByServingInfo: GroupByServingInfoParsed): DataStreamWriter[KVStore.PutRequest] = {
val streamingTable = groupByConf.metaData.cleanName + "_stream"
val fetcher = onlineImpl.buildFetcher(local)
val groupByServingInfo = fetcher.getGroupByServingInfo(groupByConf.getMetaData.getName).get

val streamDecoder = onlineImpl.streamDecoder(groupByServingInfo)
assert(groupByConf.streamingSource.isDefined,
"No streaming source defined in GroupBy. Please set a topic/mutationTopic.")
val streamingSource = groupByConf.streamingSource.get

val streamingQuery = buildStreamingQuery(streamingTable)

val context = Metrics.Context(Metrics.Environment.GroupByStreaming, groupByConf)
val ingressContext = context.withSuffix("ingress")
import session.implicits._
Expand Down

0 comments on commit 9c9e9b5

Please sign in to comment.