Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yuli_han authored and yuli-han committed Jun 25, 2024
1 parent 41378c0 commit caa0d1e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
11 changes: 8 additions & 3 deletions online/src/main/scala/ai/chronon/online/MetadataStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import ai.chronon.api.Constants.{ChrononMetadataKey, UTF8}
import ai.chronon.api.Extensions.{JoinOps, MetadataOps, StringOps, WindowOps, WindowUtils}
import ai.chronon.api._
import ai.chronon.online.KVStore.{GetRequest, PutRequest, TimedValue}
import ai.chronon.online.MetadataEndPoint.NameByTeamEndPointName
import ai.chronon.online.MetadataEndPoint.{ConfByKeyEndPointName, NameByTeamEndPointName}
import ai.chronon.online.Metrics.Name.Exception
import com.google.gson.{Gson, GsonBuilder}
import org.apache.thrift.TBase

Expand Down Expand Up @@ -206,8 +207,12 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey,
|key: $k
|conf: $v""".stripMargin)
val kBytes = k.getBytes()
// if value is a single string, use it as is, else join the strings into a json list
val vBytes = if (v.size == 1) v.head.getBytes() else StringArrayConverter.stringsToBytes(v)
// The value is a single string by default, for NameByTeamEndPointName, it's a list of strings
val vBytes = if (datasetName == NameByTeamEndPointName) {
StringArrayConverter.stringsToBytes(v)
} else {
v.head.getBytes()
}
PutRequest(keyBytes = kBytes,
valueBytes = vBytes,
dataset = datasetName,
Expand Down
13 changes: 10 additions & 3 deletions spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class FetcherTest extends TestCase {
finally src.close()
}.replaceAll("\\s+", "")

val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName)
val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName, MetadataEndPoint.NameByTeamEndPointName)
val inMemoryKvStore = OnlineUtils.buildInMemoryKVStore("FetcherTest")
val singleFileDataSet = ChrononMetadataKey + "_single_file_test"
val singleFileMetadataStore = new MetadataStore(inMemoryKvStore, singleFileDataSet, timeoutMillis = 10000)
Expand All @@ -81,13 +81,17 @@ class FetcherTest extends TestCase {
case (endPoint, kvMap) => singleFileMetadataStore.put(kvMap, singleFileDataSet)
}
singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf))

val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet))
val res = Await.result(response, Duration.Inf)
assertTrue(res.latest.isSuccess)
val actual = new String(res.values.get.head.bytes)

assertEquals(expected, actual.replaceAll("\\s+", ""))

val teamMetadataResponse = inMemoryKvStore.getString("joins/relevance", singleFileDataSet, 10000)
val teamMetadataRes = teamMetadataResponse.get
assert(teamMetadataRes.equals("joins/team/example_join.v1"))

val directoryDataSetDataSet = ChrononMetadataKey + "_directory_test"
val directoryMetadataStore = new MetadataStore(inMemoryKvStore, directoryDataSetDataSet, timeoutMillis = 10000)
inMemoryKvStore.create(directoryDataSetDataSet)
Expand All @@ -102,9 +106,12 @@ class FetcherTest extends TestCase {
val dirRes = Await.result(dirResponse, Duration.Inf)
assertTrue(dirRes.latest.isSuccess)
val dirActual = new String(dirRes.values.get.head.bytes)

assertEquals(expected, dirActual.replaceAll("\\s+", ""))

val teamMetadataDirResponse = inMemoryKvStore.getString("group_bys/team", directoryDataSetDataSet, 10000)
val teamMetadataDirRes = teamMetadataDirResponse.get
assert(teamMetadataDirRes.equals("group_bys/team/example_group_by.v1"))

val emptyResponse =
inMemoryKvStore.get(GetRequest("NoneExistKey".getBytes(), "NonExistDataSetName"))
val emptyRes = Await.result(emptyResponse, Duration.Inf)
Expand Down

0 comments on commit caa0d1e

Please sign in to comment.