Skip to content

Commit

Permalink
[HUDI-8566] Test coverage of default hoodie avro payload class for MIT
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Nov 22, 2024
1 parent ae5833e commit e433b2d
Showing 1 changed file with 91 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1403,4 +1403,95 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
})
}
}

test("Test MergeInto with payloads default configuration - partitioned and non-partitioned") {
withSQLConf("hoodie.merge.small.file.group.candidates.limit" -> "1") {
withTempDir { tmp =>
Seq(true, false).foreach { isPartitioned =>
Seq("cow", "mor").foreach { tableType =>
val sourceTable = generateTableName
spark.sql(
s"""
|CREATE TABLE $sourceTable (
| id INT,
| name STRING,
| price INT,
| timestamp BIGINT,
| category STRING
|) USING hudi
|LOCATION '${tmp.getCanonicalPath}/$sourceTable'
|""".stripMargin)

spark.sql(
s"""
| INSERT INTO $sourceTable
| VALUES (1, 'John Doe Update Failed', 19, 1, 'A'),
| (3, 'Bob Smith Update Succeeded With a larger Precombine Field', 49, 1599058800, 'A'),
| (4, 'Alice Johnson New Insert', 49, 2, 'B'),
| (5, 'Baker Mayfield Updated given Same precombine Key value', 6, 10, 'A')
|""".stripMargin)

val targetTable = generateTableName
val partitionClause = if (isPartitioned) "PARTITIONED BY (category)" else ""

spark.sql(
s"""
|CREATE TABLE $targetTable (
| id INT,
| name STRING,
| price INT,
| timestamp BIGINT,
| category STRING
|) USING hudi
|$partitionClause
|TBLPROPERTIES (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'timestamp'
| )
|LOCATION '${tmp.getCanonicalPath}/$targetTable'
|""".stripMargin)

val insertPartitionClause = if (isPartitioned) "partition(category)" else ""
spark.sql(
s"""
| INSERT INTO $targetTable $insertPartitionClause
| VALUES (1, 'John Doe Existing larger precombine key prevails', 19, 1598886001, 'A'),
| (2, 'Jane Doe', 24, 1598972400, 'B'),
| (3, 'Bob Smith', 14, 1, 'A'),
| (5, 'Baker Mayfield', 60, 10, 'A')
|""".stripMargin)

spark.sql(
s"""
|MERGE INTO $targetTable t
|USING $sourceTable s
|ON t.price = s.price AND t.category = s.category
|WHEN MATCHED THEN UPDATE SET
| t.id = s.id,
| t.name = s.name,
| t.price = s.price,
| t.timestamp = s.timestamp,
| t.category = s.category
|WHEN NOT MATCHED THEN INSERT
| (id, name, price, timestamp, category)
|VALUES
| (s.id, s.name, s.price, s.timestamp, s.category)
|""".stripMargin)

checkAnswer(s"select id, name, price, timestamp, category from $targetTable ORDER BY id, category")(
Seq(1, "John Doe Existing larger precombine key prevails", 19, 1598886001L, "A"),
Seq(2, "Jane Doe", 24, 1598972400L, "B"),
Seq(3, "Bob Smith Update Succeeded With a larger Precombine Field", 49, 1599058800L, "A"),
Seq(4, "Alice Johnson New Insert", 49, 2L, "B"),
Seq(5, "Baker Mayfield Updated given Same precombine Key value", 6, 10L, "A"))

// Clean up
spark.sql(s"DROP TABLE IF EXISTS $sourceTable")
spark.sql(s"DROP TABLE IF EXISTS $targetTable")
}
}
}
}
}
}

0 comments on commit e433b2d

Please sign in to comment.