Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: change catalog provider and schema provider methods to be asynchronous #13582

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

westonpace
Copy link
Member

@westonpace westonpace commented Nov 27, 2024

Which issue does this PR close?

Closes #10339

Rationale for this change

Many catalogs are remote (and/or disk based) and offer only asynchronous APIs. For example, Polaris, Unity, and Hive. Integrating with this catalogs is impossible since something like ctx.sql("SELECT * FROM db.schm.tbl") first enters an async context (sql) then a synchronous context (calling the catalog provider to resolve db) and then we need to go into an asynchronous context to interact with the catalog and this async -> sync -> async path is generally forbidden.

What changes are included in this PR?

The heart of the change is making all non-trivial methods async in CatalogProvider, SchemaProvider, and CatalogProviderList.

These changes had rather far-reaching ramifications discussed more below.

Are these changes tested?

Yes, in the sense that these traits were all tested previously. I did not add any new testing.

Are there any user-facing changes?

Yes, there are significant user-facing breaking changes beyond the obvious change that these public traits are now async.

Notable but expected

The following methods are now async and were previously sync

SessionContext::register_catalog
SessionContext::catalog_names
SessionContext::catalog
SessionContext::register_table
SessionContext::deregister_table
SessionContext::table_exist

Perhaps surprising

The SessionStateBuilder::build method and SessionStateBuilder::new_from_existing methods are now async and the From impls to go between SessionState and SessionStateBuilder were removed.

The new_from_existing change is because the method does a (now async) lookup into the existing catalog list (which may be remote) to determine if a default catalog exists so that it knows if it needs to create a new one or not.

The build change is because, if no default catalog exists, then a default catalog is created and registered with the (potentially remote) catalog list.

The SessionContext::register_batch and SessionContext::register_table methods are used frequently and it may make sense to think of them as synchronous since in-memory tables and batches are not typically thought of as something a "catalog" provides. It would be possible for SessionContext to have both "a catalog" (which is async) and "a collection of in-memory session-specific tables" (which is sync) and thus keep these methods synchronous. However, that felt like more complexity than justified by this change.

Caveats

In most cases I simply propagated the async. In some benchmarks I am using now_or_never().expect() to avoid the need to create a tokio runtime.

In a few of the SessionContext factory functions, where a custom catalog cannot possibly be provided (e.g. SessionContext::default) I use now_or_never since it is safe to assume the default in-memory catalog is synchronous.

Details for review

The only non-trivial implementation changes were in SessionState. There is a RwLock there and it required some fiddling (and a few Arc clones of the catalog list) to avoid holding the lock across an await boundary. This could be potentially simplified by changing the RwLock to an async lock but I'm not sure that's justified yet.

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) substrait catalog Related to the catalog crate proto Related to proto crate labels Nov 27, 2024
@westonpace westonpace changed the title feat: change catalog provider and schema provider methods to be asynchronous feat!: change catalog provider and schema provider methods to be asynchronous Nov 27, 2024
pub trait CatalogProvider: Debug + Sync + Send {
/// Returns the catalog provider as [`Any`]
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Retrieves the list of available schema names in this catalog.
fn schema_names(&self) -> Vec<String>;
async fn schema_names(&self) -> Vec<String>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should/could return a stream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BoxStream<String>? Or BoxStream<Result<String>>. In retrospect it seems that errors are likely to occur if we're using a real remote catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the latter, not sure about whether it should be '_ or 'static

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try and get away with '_. Otherwise the memory provider will need to clone its contents.

If callers need static they can Arc the provider, clone it, and wrap with the call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to perhaps prefer 'static is python compatibility - apache/arrow-rs#6587

Copy link
Member Author

@westonpace westonpace Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. How about we retreat to:

async fn schema_names(&self) -> Result<Box<dyn Iterator<Item = &str>>>;

with the rationale that real world implementations should be caching results anyways. Note there is once again an implicit '_ bound on the result but since we're returning a future and not a stream it's easily removed if needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a case where there are so many tables / schemas / catalogs that caching the full list is too expensive then the planner is probably going to have other problems anyways. I'm guessing the planner is iterating through this list and constructing a hash map it uses to resolve lookups and so the thing is going to get cached one way or another.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like the worst of both worlds 😅

What is the issue with a static stream, its fairly extensively used in DF?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the issue with a static stream, its fairly extensively used in DF?

Just me being foolish. Having now gone full circle on the iterator approach I'm ready to do static stream again 😮‍💨

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I have switched to BoxStream<'static, Result<String>>

&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>>;

/// Retrieves the list of available catalog names
fn catalog_names(&self) -> Vec<String>;
async fn catalog_names(&self) -> Vec<String>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be a stream

@github-actions github-actions bot added the documentation Improvements or additions to documentation label Nov 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
catalog Related to the catalog crate core Core DataFusion crate documentation Improvements or additions to documentation proto Related to proto crate sqllogictest SQL Logic Tests (.slt) substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make all SchemaProvider trait APIs async
2 participants