-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: change catalog provider and schema provider methods to be asynchronous #13582
base: main
Are you sure you want to change the base?
Conversation
datafusion/catalog/src/catalog.rs
Outdated
pub trait CatalogProvider: Debug + Sync + Send { | ||
/// Returns the catalog provider as [`Any`] | ||
/// so that it can be downcast to a specific implementation. | ||
fn as_any(&self) -> &dyn Any; | ||
|
||
/// Retrieves the list of available schema names in this catalog. | ||
fn schema_names(&self) -> Vec<String>; | ||
async fn schema_names(&self) -> Vec<String>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this should/could return a stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BoxStream<String>
? Or BoxStream<Result<String>>
. In retrospect it seems that errors are likely to occur if we're using a real remote catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably the latter, not sure about whether it should be '_
or 'static
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try and get away with '_
. Otherwise the memory provider will need to clone its contents.
If callers need static they can Arc the provider, clone it, and wrap with the call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason to perhaps prefer 'static
is python compatibility - apache/arrow-rs#6587
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. How about we retreat to:
async fn schema_names(&self) -> Result<Box<dyn Iterator<Item = &str>>>;
with the rationale that real world implementations should be caching results anyways. Note there is once again an implicit '_
bound on the result but since we're returning a future and not a stream it's easily removed if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a case where there are so many tables / schemas / catalogs that caching the full list is too expensive then the planner is probably going to have other problems anyways. I'm guessing the planner is iterating through this list and constructing a hash map it uses to resolve lookups and so the thing is going to get cached one way or another.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems like the worst of both worlds 😅
What is the issue with a static stream, its fairly extensively used in DF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the issue with a static stream, its fairly extensively used in DF?
Just me being foolish. Having now gone full circle on the iterator approach I'm ready to do static stream again 😮💨
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I have switched to BoxStream<'static, Result<String>>
datafusion/catalog/src/catalog.rs
Outdated
&self, | ||
name: String, | ||
catalog: Arc<dyn CatalogProvider>, | ||
) -> Option<Arc<dyn CatalogProvider>>; | ||
|
||
/// Retrieves the list of available catalog names | ||
fn catalog_names(&self) -> Vec<String>; | ||
async fn catalog_names(&self) -> Vec<String>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also be a stream
…ng>>. Add Result to a few APIs
Which issue does this PR close?
Closes #10339
Rationale for this change
Many catalogs are remote (and/or disk based) and offer only asynchronous APIs. For example, Polaris, Unity, and Hive. Integrating with this catalogs is impossible since something like
ctx.sql("SELECT * FROM db.schm.tbl")
first enters an async context (sql
) then a synchronous context (calling the catalog provider to resolvedb
) and then we need to go into an asynchronous context to interact with the catalog and this async -> sync -> async path is generally forbidden.What changes are included in this PR?
The heart of the change is making all non-trivial methods async in
CatalogProvider
,SchemaProvider
, andCatalogProviderList
.These changes had rather far-reaching ramifications discussed more below.
Are these changes tested?
Yes, in the sense that these traits were all tested previously. I did not add any new testing.
Are there any user-facing changes?
Yes, there are significant user-facing breaking changes beyond the obvious change that these public traits are now async.
Notable but expected
The following methods are now async and were previously sync
Perhaps surprising
The
SessionStateBuilder::build
method andSessionStateBuilder::new_from_existing
methods are now async and theFrom
impls to go betweenSessionState
andSessionStateBuilder
were removed.The
new_from_existing
change is because the method does a (now async) lookup into the existing catalog list (which may be remote) to determine if a default catalog exists so that it knows if it needs to create a new one or not.The
build
change is because, if no default catalog exists, then a default catalog is created and registered with the (potentially remote) catalog list.The
SessionContext::register_batch
andSessionContext::register_table
methods are used frequently and it may make sense to think of them as synchronous since in-memory tables and batches are not typically thought of as something a "catalog" provides. It would be possible forSessionContext
to have both "a catalog" (which is async) and "a collection of in-memory session-specific tables" (which is sync) and thus keep these methods synchronous. However, that felt like more complexity than justified by this change.Caveats
In most cases I simply propagated the async. In some benchmarks I am using
now_or_never().expect()
to avoid the need to create a tokio runtime.In a few of the
SessionContext
factory functions, where a custom catalog cannot possibly be provided (e.g.SessionContext::default
) I usenow_or_never
since it is safe to assume the default in-memory catalog is synchronous.Details for review
The only non-trivial implementation changes were in
SessionState
. There is aRwLock
there and it required some fiddling (and a few Arc clones of the catalog list) to avoid holding the lock across an await boundary. This could be potentially simplified by changing theRwLock
to an async lock but I'm not sure that's justified yet.