Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generate_series() udtf (and introduce 'lazy' MemoryExec) #13540

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

2010YOUY01
Copy link
Contributor

@2010YOUY01 2010YOUY01 commented Nov 23, 2024

Which issue does this PR close?

Closes #10069

Rationale for this change

To make it easier to test memory-limited queries, i was looking for a datasource which can generate long streams, and itself doesn't consume lots of memory.
There are several existing approaches are close, but can't fully satisfy this requirement:

  • MemoryExec have to buffer all output data up front
  • select * from unnest(generate_series(1,10000)) also have to buffer all output during construction
  • It's possible to do such tests on large files, however it would be hard to setup and takes longer to run

So in this PR, a new StreamingMemoryExec is introduced, it can be used to generate batches lazily by defining a 'iterator' on RecordBatch in its interface. And a new user-defined table function is implemented with this new execution plan.

This function's behavior is kept consistent with DuckDB's generate_series() table function, and only 2 argument variant is implemented for now

I also think this UDTF can be useful to tests/micro-benchmarks in general

> select * from generate_series(1,3);
+-------+
| value |
+-------+
| 1     |
| 2     |
| 3     |
+-------+

What changes are included in this PR?

  1. Move TableFunctionImpl from datafusion crate -> datafusion-catalog crate, so new UDTFs defined in datafusion-functions-table crate won't have circular dependency with datafusion crate (TableFunctionImpl depends on TableProvider in datafusion-catalog crate, so I think it's the best place to move it to)
  2. Introduced StreamingMemoryExec
  3. Implemented a new UDTF generate_series()

Are these changes tested?

Unit tests for StreamingMemoryExec, and sqllogictests for generate_series() UDTF

Are there any user-facing changes?

No

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) catalog Related to the catalog crate labels Nov 23, 2024
) -> Result<Self> {
let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
Partitioning::RoundRobinBatch(generators.len()),
Copy link
Contributor Author

@2010YOUY01 2010YOUY01 Nov 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this field mean: Let's say a exec only have 1 generator, the optimizer will try to insert a RepartitionExec to the output of StreamingMemoryExec, and use round robin to repartition output batches to target_partitions number?
I found it works on some aggregate queries, but not for a sort query

> explain select * from generate_series(1, 10000) as t1(v1) order by v1;
+---------------+----------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                           |
+---------------+----------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: t1.v1 ASC NULLS LAST                                                                                     |
|               |   SubqueryAlias: t1                                                                                            |
|               |     Projection: tmp_table.value AS v1                                                                          |
|               |       TableScan: tmp_table projection=[value]                                                                  |
| physical_plan | SortExec: expr=[v1@0 ASC NULLS LAST], preserve_partitioning=[false]                                            |
|               |   ProjectionExec: expr=[value@0 as v1]                                                                         |
|               |     StreamingMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10000, batch_size=8192] |
|               |                                                                                                                |
+---------------+----------------------------------------------------------------------------------------------------------------+

Parallelized sort should look like

> explain select * from lineitem order by l_orderkey;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: lineitem.l_orderkey ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |   TableScan: lineitem projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| physical_plan | SortPreservingMergeExec: [l_orderkey@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |   SortExec: expr=[l_orderkey@0 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |     ParquetExec: file_groups={14 groups: [[Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:0..11525426], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:11525426..20311205, Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:0..2739647], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:2739647..14265073], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:14265073..20193593, Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:0..5596906], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:5596906..17122332], ...]}, projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment] |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

I want to know is it correct here, and this sort query in theory can be made parallelized by changing somewhere in optimization phase? 🤔

/// Stream that generates record batches on demand
pub struct StreamingMemoryStream {
schema: SchemaRef,
generator: Box<dyn StreamingBatchGenerator>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use Arc given it makes sense to have generator generate across threads

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, Arc is more flexible: implementation can choose to let a StreamingBatchGenerate share between multiple streams, or create separate generators for each stream

/// Schema representing the data
schema: SchemaRef,
/// Functions to generate batches for each partition
batch_generators: Vec<Box<dyn StreamingBatchGenerator>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
batch_generators: Vec<Box<dyn StreamingBatchGenerator>>,
batch_generators: Vec<Arc<dyn StreamingBatchGenerator>>,

@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}

/// A trait for table function implementations
pub trait TableFunctionImpl: Debug + Sync + Send {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub trait TableFunctionImpl: Debug + Sync + Send {
/// Returns this function's name
fn name(&self) -> &str;

This is much more consistent with other UDFs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part of the refactor, changing this requires several fixes. We can leave it to a separate PR to make this PR smaller

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also recommend to take name() inside TableFunctionImpl

datafusion/sqllogictest/test_files/table_functions.slt Outdated Show resolved Hide resolved
datafusion/sqllogictest/test_files/table_functions.slt Outdated Show resolved Hide resolved
statement error DataFusion error: Error during planning: Second argument must be an integer literal
SELECT * FROM generate_series(1, NULL)

statement error DataFusion error: Error during planning: generate_series expects 2 arguments
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D SELECT * FROM generate_series(1, 5, null);
┌─────────────────┐
│ generate_series │
│      int64      │
├─────────────────┤
│     0 rows      │
└─────────────────┘

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open an issue for 3-arg support later

@2010YOUY01
Copy link
Contributor Author

2010YOUY01 commented Nov 27, 2024

Thank you for the comments @jayzhan211 , I have updated.

Now I think the concurrent generator might not be very straightforward, I'll first leave some rationale here.
If we can come to agreement I'll add more doc to it (else revert back to Box version for simplicity)


Rationale for StreamingMemoryStream design

Suppose we want to run select ... from generate_series(1,100) in two partitions
And the underlying batch generator is wrapped with Mutex

pub struct StreamingMemoryStream {
    ...
    generator: Arc<Mutex<dyn StreamingBatchGenerator>>,
}

It's possible to implement the UDTF in 3 ways:
1.

[generator_1_50]   --- [StreamingMemoryStream  stream1] --> xxStream1
[generator_50_100] --- [StreamingMemoryStream  stream2] --> xxStream1
[generator_1_100] --- [StreamingMemoryStream  stream1] --> Repartition --> xxStream1
                                                                       |-> xxStream2                                                                                
[generator_1_100] --- [StreamingMemoryStream  stream1] --> xxStream1
                  |-- [StreamingMemoryStream  stream2] --> xxStream2                                                                                                                                      

1 and 2 is the common pattern for datafusion scanning operators to do plan-time parallelism, generator won't be accessed by multiple threads thus Mutex is redundant
3 make the StreamingBatchGenerator being able to concurrently accessed by multiple streams.

The Mutex is added to make it possible for case 3 (so the interface can be more general-purpose for future use cases)

@jayzhan211
Copy link
Contributor

Thank you for the comments @jayzhan211 , I have updated.

Now I think the concurrent generator might not be very straightforward, I'll first leave some rationale here. If we can come to agreement I'll add more doc to it (else revert back to Box version for simplicity)

Rationale for StreamingMemoryStream design

Suppose we want to run select ... from generate_series(1,100) in two partitions And the underlying batch generator is wrapped with Mutex

pub struct StreamingMemoryStream {
    ...
    generator: Arc<Mutex<dyn StreamingBatchGenerator>>,
}

It's possible to implement the UDTF in 3 ways: 1.

[generator_1_50]   --- [StreamingMemoryStream  stream1] --> xxStream1
[generator_50_100] --- [StreamingMemoryStream  stream2] --> xxStream1
[generator_1_100] --- [StreamingMemoryStream  stream1] --> Repartition --> xxStream1
                                                                       |-> xxStream2                                                                                
[generator_1_100] --- [StreamingMemoryStream  stream1] --> xxStream1
                  |-- [StreamingMemoryStream  stream2] --> xxStream2                                                                                                                                      

1 and 2 is the common pattern for datafusion scanning operators to do plan-time parallelism, generator won't be accessed by multiple threads thus Mutex is redundant 3 make the StreamingBatchGenerator being able to concurrently accessed by multiple streams.

The Mutex is added to make it possible for case 3 (so the interface can be more general-purpose for future use cases)

Given the quick look, I'm not sure whether we need 3rd case. It seems the 1st case runs execution parallelly too. I would need to think about the advantage of the 3rd case over 1st case.

For 3rd case, if we need it, we might use Arc<RwLock<dyn T>> if generate_next_batch takes &self. It might be more efficiently.

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @2010YOUY01. I reviewed the changes and LGTM. I have a few minor comments and one question: I noticed another approach of generate_series(), which can be used like this:
SELECT generate_series(1, 5)
I assume it is not a udtf in that context. Does this implementation leave room for that usage?

@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}

/// A trait for table function implementations
pub trait TableFunctionImpl: Debug + Sync + Send {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also recommend to take name() inside TableFunctionImpl

pub fn default_table_functions() -> Vec<Arc<TableFunction>> {
functions_table::all_default_table_functions()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get the defaults from a singleton ? Like other default getters, with get_or_init or smth similar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea, updated

#[derive(Debug, Clone)]
struct GenerateSeriesState {
schema: SchemaRef,
_start: i64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to keep this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is for display, i added a comment to explain

// Check input `exprs` type and number. Input validity check (e.g. start <= end)
// will be performed in `TableProvider::scan`
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
// TODO: support 3 arguments following DuckDB:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also a one length argument usage, perhaps can be added as todo

@@ -365,8 +366,165 @@ impl RecordBatchStream for MemoryStream {
}
}

pub trait StreamingBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very satisfied with these Streaming... naming. Can we describe the behavior better with a different naming for these? Perhaps we could use 'lazy' term

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree Lazy* is less ambiguous, updated all prefixes

@2010YOUY01
Copy link
Contributor Author

Thank you @2010YOUY01. I reviewed the changes and LGTM. I have a few minor comments and one question: I noticed another approach of generate_series(), which can be used like this: SELECT generate_series(1, 5) I assume it is not a udtf in that context. Does this implementation leave room for that usage?

Thank you for the review @berkaysynnada

The answer to the question is yes, they are different function with the same name.
Using same name is probably not the best idea, but we do this to keep the behavior the same as DuckDB.
I added a sql test to make sure they can work together (like select generate_series() from generate_series())


@jayzhan211 I thought about the sharing generator issue again, I think making a interface more flexible is important, so I kept the lock, and updated RwLock as you suggested

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
catalog Related to the catalog crate core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

suggest add synax select from generate_series()
3 participants