-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47094][SQL] SPJ : fix bucket reducer function #47126
base: master
Are you sure you want to change the base?
Conversation
@szehon-ho please take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks , some preliminary comment.
As this is just fixing test transform, I think we should just add one minimum negative test for this (to assert no SPJ in this case)
@@ -1558,7 +1558,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { | |||
} | |||
} | |||
|
|||
test("SPARK-47094: Support compatible buckets with common divisor") { | |||
test("SPARK-47094: SPJ:Support compatible buckets with common divisor") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: space before Support ? Maybe SPJ is redundant here actually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the other ones in SPJ has this SPJ:
, can remove it too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, its fine then, maybe just put the space
|
||
Seq((table1, partition1), (table2, partition2)).foreach { case (tab, part) => | ||
createTable(tab, columns2, part) | ||
val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I added a ton of values to test the positive case, but do you think we can just have the bare minimum values on each side?
} | ||
|
||
|
||
test("SPARK-47094: SPJ: Support compatible buckets common divisor is one of the numbers") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this is already covered by "Support compatible bucket" , and would just say no need to add this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to check if we can test the fact that there is no grouping happening on the side which has same number of bucket as gcd. Not sure whether we can assert that or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to check if we can test the fact that there is no grouping happening on the side which has same number of bucket as gcd. Not sure whether we can assert that or not.
Do you mean expected scans?
I'm still not seeing the difference, this test the case that one divides into the other (4, 8), (3, 9), what's the difference with the existing test of (2, 4), (2, 6) above? (Support compatible bucket) It also asserts the expected scans up there too.
@@ -101,8 +101,8 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In | |||
|
|||
if (otherFunc == BucketFunction) { | |||
val gcd = this.gcd(thisNumBuckets, otherNumBuckets) | |||
if (gcd != thisNumBuckets) { | |||
return BucketReducer(thisNumBuckets, gcd) | |||
if (gcd>1 && gcd!=thisNumBuckets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: spacing (gcd > 1 && gcd != thisNumBuckets)
|t1.store_id, t1.dept_id, t1.data, t2.data | ||
|FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 | ||
|ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id | ||
|ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need for ORDER BY just to check the test, its only to check the result.
So previously when it is reduced to 1, is it a correctness issue? Or just performance issue? |
@@ -1310,7 +1310,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { | |||
} | |||
} | |||
|
|||
test("SPARK-44647: test join key is subset of cluster key " + | |||
test("SPARK-44647: SPJ: test join key is subset of cluster key " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes look unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the other ones in SPJ has SPJ:
, was missing in this one, so more for consistency.
performance issue, if it reduces to 1, there will be only task doing the work. |
@viirya it seems it is a test transform, but good to have a good example |
Oh okay, I didn't see it is test only code. |
4d1505d
to
8493934
Compare
8493934
to
b85847c
Compare
What changes were proposed in this pull request?
SPJ compatible bucket issue has an implementation of reducible function. This patch fixes the implementation and make it same as in apache iceberg one.
Why are the changes needed?
With this fix, incompatible number of buckets do not return 1 as GCD, hence the buckets do not reduce to 1 when it used in incompatible number of buckets.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
With unit tests
Was this patch authored or co-authored using generative AI tooling?
No.