-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add generate_series() udtf (and introduce 'lazy' MemoryExec
)
#13540
base: main
Are you sure you want to change the base?
Conversation
) -> Result<Self> { | ||
let cache = PlanProperties::new( | ||
EquivalenceProperties::new(Arc::clone(&schema)), | ||
Partitioning::RoundRobinBatch(generators.len()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this field mean: Let's say a exec only have 1 generator, the optimizer will try to insert a RepartitionExec
to the output of StreamingMemoryExec
, and use round robin to repartition output batches to target_partitions
number?
I found it works on some aggregate queries, but not for a sort query
> explain select * from generate_series(1, 10000) as t1(v1) order by v1;
+---------------+----------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: t1.v1 ASC NULLS LAST |
| | SubqueryAlias: t1 |
| | Projection: tmp_table.value AS v1 |
| | TableScan: tmp_table projection=[value] |
| physical_plan | SortExec: expr=[v1@0 ASC NULLS LAST], preserve_partitioning=[false] |
| | ProjectionExec: expr=[value@0 as v1] |
| | StreamingMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10000, batch_size=8192] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------+
Parallelized sort should look like
> explain select * from lineitem order by l_orderkey;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: lineitem.l_orderkey ASC NULLS LAST |
| | TableScan: lineitem projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment] |
| physical_plan | SortPreservingMergeExec: [l_orderkey@0 ASC NULLS LAST] |
| | SortExec: expr=[l_orderkey@0 ASC NULLS LAST], preserve_partitioning=[true] |
| | ParquetExec: file_groups={14 groups: [[Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:0..11525426], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:11525426..20311205, Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:0..2739647], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:2739647..14265073], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:14265073..20193593, Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:0..5596906], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:5596906..17122332], ...]}, projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
I want to know is it correct here, and this sort query in theory can be made parallelized by changing somewhere in optimization phase? 🤔
/// Stream that generates record batches on demand | ||
pub struct StreamingMemoryStream { | ||
schema: SchemaRef, | ||
generator: Box<dyn StreamingBatchGenerator>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use Arc
given it makes sense to have generator generate across threads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, Arc
is more flexible: implementation can choose to let a StreamingBatchGenerate
share between multiple streams, or create separate generators for each stream
/// Schema representing the data | ||
schema: SchemaRef, | ||
/// Functions to generate batches for each partition | ||
batch_generators: Vec<Box<dyn StreamingBatchGenerator>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch_generators: Vec<Box<dyn StreamingBatchGenerator>>, | |
batch_generators: Vec<Arc<dyn StreamingBatchGenerator>>, |
@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send { | |||
cmd: &CreateExternalTable, | |||
) -> Result<Arc<dyn TableProvider>>; | |||
} | |||
|
|||
/// A trait for table function implementations | |||
pub trait TableFunctionImpl: Debug + Sync + Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub trait TableFunctionImpl: Debug + Sync + Send { | |
/// Returns this function's name | |
fn name(&self) -> &str; |
This is much more consistent with other UDFs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part of the refactor, changing this requires several fixes. We can leave it to a separate PR to make this PR smaller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also recommend to take name()
inside TableFunctionImpl
statement error DataFusion error: Error during planning: Second argument must be an integer literal | ||
SELECT * FROM generate_series(1, NULL) | ||
|
||
statement error DataFusion error: Error during planning: generate_series expects 2 arguments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
D SELECT * FROM generate_series(1, 5, null);
┌─────────────────┐
│ generate_series │
│ int64 │
├─────────────────┤
│ 0 rows │
└─────────────────┘
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll open an issue for 3-arg support later
Thank you for the comments @jayzhan211 , I have updated. Now I think the concurrent generator might not be very straightforward, I'll first leave some rationale here. Rationale for
|
Given the quick look, I'm not sure whether we need 3rd case. It seems the 1st case runs execution parallelly too. I would need to think about the advantage of the 3rd case over 1st case. For 3rd case, if we need it, we might use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @2010YOUY01. I reviewed the changes and LGTM. I have a few minor comments and one question: I noticed another approach of generate_series()
, which can be used like this:
SELECT generate_series(1, 5)
I assume it is not a udtf in that context. Does this implementation leave room for that usage?
@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send { | |||
cmd: &CreateExternalTable, | |||
) -> Result<Arc<dyn TableProvider>>; | |||
} | |||
|
|||
/// A trait for table function implementations | |||
pub trait TableFunctionImpl: Debug + Sync + Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also recommend to take name()
inside TableFunctionImpl
pub fn default_table_functions() -> Vec<Arc<TableFunction>> { | ||
functions_table::all_default_table_functions() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get the defaults from a singleton ? Like other default getters, with get_or_init or smth similar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea, updated
#[derive(Debug, Clone)] | ||
struct GenerateSeriesState { | ||
schema: SchemaRef, | ||
_start: i64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to keep this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field is for display, i added a comment to explain
// Check input `exprs` type and number. Input validity check (e.g. start <= end) | ||
// will be performed in `TableProvider::scan` | ||
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> { | ||
// TODO: support 3 arguments following DuckDB: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also a one length argument usage, perhaps can be added as todo
@@ -365,8 +366,165 @@ impl RecordBatchStream for MemoryStream { | |||
} | |||
} | |||
|
|||
pub trait StreamingBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not very satisfied with these Streaming...
naming. Can we describe the behavior better with a different naming for these? Perhaps we could use 'lazy' term
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree Lazy*
is less ambiguous, updated all prefixes
Thank you for the review @berkaysynnada The answer to the question is yes, they are different function with the same name. @jayzhan211 I thought about the sharing generator issue again, I think making a interface more flexible is important, so I kept the lock, and updated |
Which issue does this PR close?
Closes #10069
Rationale for this change
To make it easier to test memory-limited queries, i was looking for a datasource which can generate long streams, and itself doesn't consume lots of memory.
There are several existing approaches are close, but can't fully satisfy this requirement:
MemoryExec
have to buffer all output data up frontselect * from unnest(generate_series(1,10000))
also have to buffer all output during constructionSo in this PR, a new
StreamingMemoryExec
is introduced, it can be used to generate batches lazily by defining a 'iterator' onRecordBatch
in its interface. And a new user-defined table function is implemented with this new execution plan.This function's behavior is kept consistent with DuckDB's generate_series() table function, and only 2 argument variant is implemented for now
I also think this UDTF can be useful to tests/micro-benchmarks in general
What changes are included in this PR?
TableFunctionImpl
fromdatafusion
crate ->datafusion-catalog
crate, so new UDTFs defined indatafusion-functions-table
crate won't have circular dependency withdatafusion
crate (TableFunctionImpl
depends onTableProvider
indatafusion-catalog
crate, so I think it's the best place to move it to)StreamingMemoryExec
generate_series()
Are these changes tested?
Unit tests for
StreamingMemoryExec
, andsqllogictest
s forgenerate_series()
UDTFAre there any user-facing changes?
No