From 293cf2eba3fd904ffdfeef1395e5328a5230e376 Mon Sep 17 00:00:00 2001 From: zhyass Date: Sat, 19 Oct 2024 02:39:59 +0800 Subject: [PATCH] fix --- src/bendpy/src/context.rs | 2 +- .../it/aggregating_index/index_refresh.rs | 4 +- .../tests/it/aggregating_index/index_scan.rs | 4 +- .../interpreters/access/privilege_access.rs | 4 +- .../service/src/interpreters/interpreter.rs | 71 +++++++++++++++++-- .../interpreter_table_add_column.rs | 2 +- .../interpreters/interpreter_table_analyze.rs | 2 +- .../interpreter_table_describe.rs | 2 +- .../interpreter_table_modify_column.rs | 2 +- .../interpreters/interpreter_view_alter.rs | 2 +- .../interpreters/interpreter_view_create.rs | 2 +- .../interpreters/interpreter_view_describe.rs | 2 +- src/query/service/src/interpreters/mod.rs | 1 + src/query/service/src/interpreters/util.rs | 2 +- src/query/service/src/local/executor.rs | 3 +- .../flight_sql/flight_sql_service/query.rs | 4 +- .../src/servers/http/clickhouse_handler.rs | 21 +----- .../servers/http/v1/query/execute_state.rs | 21 +----- .../servers/mysql/mysql_interactive_worker.rs | 7 +- src/query/service/src/sessions/queue_mgr.rs | 17 ++--- .../others/suggested_background_tasks.rs | 2 +- src/query/service/src/test_kits/context.rs | 2 +- src/query/service/src/test_kits/fixture.rs | 2 +- src/query/service/tests/it/frame/dataframe.rs | 2 +- .../service/tests/it/parquet_rs/utils.rs | 2 +- .../it/pipelines/builders/runtime_filter.rs | 3 +- .../tests/it/servers/admin/v1/status.rs | 2 +- .../service/tests/it/sessions/queue_mgr.rs | 12 ++-- .../tests/it/sql/exec/get_table_bind_test.rs | 2 +- src/query/service/tests/it/sql/exec/mod.rs | 4 +- .../tests/it/sql/planner/builders/binder.rs | 2 +- .../storages/fuse/operations/alter_table.rs | 2 +- .../fuse/operations/internal_column.rs | 2 +- .../it/storages/fuse/operations/optimize.rs | 2 +- .../storages/fuse/operations/table_analyze.rs | 4 +- src/query/sql/src/planner/binder/ddl/table.rs | 2 +- src/query/sql/src/planner/planner.rs | 7 +- .../storages/system/src/columns_table.rs | 4 +- 38 files changed, 122 insertions(+), 111 deletions(-) diff --git a/src/bendpy/src/context.rs b/src/bendpy/src/context.rs index e7ba5243cece..ad1e7836a206 100644 --- a/src/bendpy/src/context.rs +++ b/src/bendpy/src/context.rs @@ -190,6 +190,6 @@ impl PySessionContext { async fn plan_sql(ctx: &Arc, sql: &str) -> Result { let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(sql).await?; + let plan = planner.plan_sql(sql).await?; Ok(PyDataFrame::new(ctx.clone(), plan, default_box_size())) } diff --git a/src/query/ee/tests/it/aggregating_index/index_refresh.rs b/src/query/ee/tests/it/aggregating_index/index_refresh.rs index 28157ae63eb1..5defac9bae4f 100644 --- a/src/query/ee/tests/it/aggregating_index/index_refresh.rs +++ b/src/query/ee/tests/it/aggregating_index/index_refresh.rs @@ -523,9 +523,7 @@ async fn test_sync_agg_index_after_copy_into() -> Result<()> { async fn plan_sql(ctx: Arc, sql: &str) -> Result { let mut planner = Planner::new(ctx); - let (plan, _) = planner.plan_sql(sql).await?; - - Ok(plan) + planner.plan_sql(sql).await } async fn execute_sql(ctx: Arc, sql: &str) -> Result { diff --git a/src/query/ee/tests/it/aggregating_index/index_scan.rs b/src/query/ee/tests/it/aggregating_index/index_scan.rs index 85674a95795f..5fcf24721490 100644 --- a/src/query/ee/tests/it/aggregating_index/index_scan.rs +++ b/src/query/ee/tests/it/aggregating_index/index_scan.rs @@ -85,9 +85,7 @@ fn test_fuzz_with_spill() -> Result<()> { async fn plan_sql(ctx: Arc, sql: &str) -> Result { let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(sql).await?; - - Ok(plan) + planner.plan_sql(sql).await } async fn execute_sql(ctx: Arc, sql: &str) -> Result { diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 0eaa433bc235..9a8d692b40e2 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -1108,13 +1108,13 @@ impl AccessChecker for PrivilegeAccess { } Plan::CreateView(plan) => { let mut planner = Planner::new(self.ctx.clone()); - let (plan, _) = planner.plan_sql(&plan.subquery).await?; + let plan = planner.plan_sql(&plan.subquery).await?; self.check(ctx, &plan).await? } Plan::AlterView(plan) => { self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Alter, false).await?; let mut planner = Planner::new(self.ctx.clone()); - let (plan, _) = planner.plan_sql(&plan.subquery).await?; + let plan = planner.plan_sql(&plan.subquery).await?; self.check(ctx, &plan).await? } Plan::DropView(plan) => { diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 1f220e17edf4..c88e26cfd7e2 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -16,7 +16,10 @@ use std::collections::BTreeMap; use std::sync::Arc; use std::time::SystemTime; +use databend_common_ast::ast::AlterTableAction; +use databend_common_ast::ast::AlterTableStmt; use databend_common_ast::ast::Literal; +use databend_common_ast::ast::ModifyColumnAction; use databend_common_ast::ast::Statement; use databend_common_base::base::short_sql; use databend_common_base::runtime::profile::get_statistics_desc; @@ -54,7 +57,10 @@ use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::executor::PipelinePullingExecutor; use crate::pipelines::PipelineBuildResult; use crate::schedulers::ServiceQueryExecutor; +use crate::sessions::AcquireQueueGuard; +use crate::sessions::QueriesQueueManager; use crate::sessions::QueryContext; +use crate::sessions::QueryEntry; use crate::sessions::SessionManager; use crate::stream::DataBlockStream; use crate::stream::ProgressStream; @@ -197,22 +203,52 @@ fn log_query_finished(ctx: &QueryContext, error: Option, has_profiles } } +pub async fn plan_sql( + ctx: Arc, + sql: &str, + acquire_queue: bool, +) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> { + let mut planner = Planner::new_with_sample_executor( + ctx.clone(), + Arc::new(ServiceQueryExecutor::new(ctx.clone())), + ); + let extras = planner.parse_sql(sql)?; + if !acquire_queue { + let plan = planner.plan_stmt(&extras.statement).await?; + return Ok((plan, extras, AcquireQueueGuard::create(None))); + } + + let need_acquire_lock = need_acquire_lock(&extras.statement); + if need_acquire_lock { + let query_entry = QueryEntry::create_entry(&ctx, &extras.statement, true)?; + let guard = QueriesQueueManager::instance().acquire(query_entry).await?; + let plan = planner.plan_stmt(&extras.statement).await?; + Ok((plan, extras, guard)) + } else { + let plan = planner.plan_stmt(&extras.statement).await?; + let query_entry = QueryEntry::create(&ctx, &plan, &extras.statement)?; + let guard = QueriesQueueManager::instance().acquire(query_entry).await?; + Ok((plan, extras, guard)) + } +} + /// There are two steps to execute a query: /// 1. Plan the SQL /// 2. Execute the plan -- interpreter /// /// This function is used to plan the SQL. If an error occurs, we will log the query start and finished. -pub async fn interpreter_plan_sql(ctx: Arc, sql: &str) -> Result<(Plan, PlanExtras)> { - let mut planner = Planner::new_with_sample_executor( - ctx.clone(), - Arc::new(ServiceQueryExecutor::new(ctx.clone())), - ); - let result = planner.plan_sql(sql).await; +pub async fn interpreter_plan_sql( + ctx: Arc, + sql: &str, + acquire_queue: bool, +) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> { + let result = plan_sql(ctx.clone(), sql, acquire_queue).await; + let short_sql = short_sql( sql.to_string(), ctx.get_settings().get_short_sql_max_length()?, ); - let mut stmt = if let Ok((_, extras)) = &result { + let mut stmt = if let Ok((_, extras, _)) = &result { Some(extras.statement.clone()) } else { // Only log if there's an error @@ -299,3 +335,24 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc) Err(error) => Err(error.clone()), } } + +fn need_acquire_lock(stmt: &Statement) -> bool { + match stmt { + Statement::Replace(_) + | Statement::MergeInto(_) + | Statement::Update(_) + | Statement::Delete(_) + | Statement::OptimizeTable(_) + | Statement::TruncateTable(_) => true, + + Statement::AlterTable(AlterTableStmt { action, .. }) => matches!( + action, + AlterTableAction::ReclusterTable { .. } + | AlterTableAction::ModifyColumn { + action: ModifyColumnAction::SetDataType(_), + } + ), + + _ => false, + } +} diff --git a/src/query/service/src/interpreters/interpreter_table_add_column.rs b/src/query/service/src/interpreters/interpreter_table_add_column.rs index e2493d9d3438..19e65e7db0ed 100644 --- a/src/query/service/src/interpreters/interpreter_table_add_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_add_column.rs @@ -133,7 +133,7 @@ impl Interpreter for AddTableColumnInterpreter { field.default_expr().unwrap() ); let mut planner = Planner::new(self.ctx.clone()); - let (plan, _) = planner.plan_sql(&query).await?; + let plan = planner.plan_sql(&query).await?; if let Plan::DataMutation { s_expr, schema, .. } = plan { let interpreter = MutationInterpreter::try_create(self.ctx.clone(), *s_expr, schema)?; diff --git a/src/query/service/src/interpreters/interpreter_table_analyze.rs b/src/query/service/src/interpreters/interpreter_table_analyze.rs index f3a5d2b53bcc..f90aaf81d5ee 100644 --- a/src/query/service/src/interpreters/interpreter_table_analyze.rs +++ b/src/query/service/src/interpreters/interpreter_table_analyze.rs @@ -61,7 +61,7 @@ impl AnalyzeTableInterpreter { async fn plan_sql(&self, sql: String) -> Result<(PhysicalPlan, BindContext)> { let mut planner = Planner::new(self.ctx.clone()); - let (plan, _) = planner.plan_sql(&sql).await?; + let plan = planner.plan_sql(&sql).await?; let (select_plan, bind_context) = match &plan { Plan::Query { s_expr, diff --git a/src/query/service/src/interpreters/interpreter_table_describe.rs b/src/query/service/src/interpreters/interpreter_table_describe.rs index c0cdb536a8af..c297810e50b8 100644 --- a/src/query/service/src/interpreters/interpreter_table_describe.rs +++ b/src/query/service/src/interpreters/interpreter_table_describe.rs @@ -63,7 +63,7 @@ impl Interpreter for DescribeTableInterpreter { let schema = if tbl_info.engine() == VIEW_ENGINE { if let Some(query) = tbl_info.options().get(QUERY) { let mut planner = Planner::new(self.ctx.clone()); - let (plan, _) = planner.plan_sql(query).await?; + let plan = planner.plan_sql(query).await?; infer_table_schema(&plan.schema()) } else { return Err(ErrorCode::Internal( diff --git a/src/query/service/src/interpreters/interpreter_table_modify_column.rs b/src/query/service/src/interpreters/interpreter_table_modify_column.rs index 958988527b63..141bb911cffc 100644 --- a/src/query/service/src/interpreters/interpreter_table_modify_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_modify_column.rs @@ -357,7 +357,7 @@ impl ModifyTableColumnInterpreter { // 2. build plan by sql let mut planner = Planner::new(self.ctx.clone()); - let (plan, _extras) = planner.plan_sql(&sql).await?; + let plan = planner.plan_sql(&sql).await?; // 3. build physical plan by plan let (select_plan, select_column_bindings) = match plan { diff --git a/src/query/service/src/interpreters/interpreter_view_alter.rs b/src/query/service/src/interpreters/interpreter_view_alter.rs index 16b7c4ae460b..bfd658be2ef0 100644 --- a/src/query/service/src/interpreters/interpreter_view_alter.rs +++ b/src/query/service/src/interpreters/interpreter_view_alter.rs @@ -60,7 +60,7 @@ impl Interpreter for AlterViewInterpreter { self.plan.subquery.clone() } else { let mut planner = Planner::new(self.ctx.clone()); - let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?; + let plan = planner.plan_sql(&self.plan.subquery.clone()).await?; if plan.schema().fields().len() != self.plan.column_names.len() { return Err(ErrorCode::BadDataArrayLength(format!( "column name length mismatch, expect {}, got {}", diff --git a/src/query/service/src/interpreters/interpreter_view_create.rs b/src/query/service/src/interpreters/interpreter_view_create.rs index 0a33608ef92b..374c37f06d45 100644 --- a/src/query/service/src/interpreters/interpreter_view_create.rs +++ b/src/query/service/src/interpreters/interpreter_view_create.rs @@ -59,7 +59,7 @@ impl Interpreter for CreateViewInterpreter { let table_function = catalog.list_table_functions(); let mut options = BTreeMap::new(); let mut planner = Planner::new(self.ctx.clone()); - let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?; + let plan = planner.plan_sql(&self.plan.subquery.clone()).await?; match plan.clone() { Plan::Query { metadata, .. } => { let metadata = metadata.read().clone(); diff --git a/src/query/service/src/interpreters/interpreter_view_describe.rs b/src/query/service/src/interpreters/interpreter_view_describe.rs index f65491fecc91..d39065e08242 100644 --- a/src/query/service/src/interpreters/interpreter_view_describe.rs +++ b/src/query/service/src/interpreters/interpreter_view_describe.rs @@ -64,7 +64,7 @@ impl Interpreter for DescribeViewInterpreter { let schema = if engine == VIEW_ENGINE { if let Some(query) = tbl_info.options().get(QUERY) { let mut planner = Planner::new(self.ctx.clone()); - let (plan, _) = planner.plan_sql(query).await?; + let plan = planner.plan_sql(query).await?; infer_table_schema(&plan.schema()) } else { return Err(ErrorCode::Internal( diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index 2142577b4a9e..2afb8e5b5a9d 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -150,6 +150,7 @@ pub use access::ManagementModeAccess; pub use common::InterpreterQueryLog; pub use hook::HookOperator; pub use interpreter::interpreter_plan_sql; +pub use interpreter::plan_sql; pub use interpreter::Interpreter; pub use interpreter::InterpreterPtr; pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter; diff --git a/src/query/service/src/interpreters/util.rs b/src/query/service/src/interpreters/util.rs index 26b7e6789a57..ea02b4b337b3 100644 --- a/src/query/service/src/interpreters/util.rs +++ b/src/query/service/src/interpreters/util.rs @@ -101,7 +101,7 @@ impl Client for ScriptClient { .await?; let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(query).await?; + let plan = planner.plan_sql(query).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; let stream = interpreter.execute(ctx.clone()).await?; let blocks = stream.try_collect::>().await?; diff --git a/src/query/service/src/local/executor.rs b/src/query/service/src/local/executor.rs index bbf6b8f2c422..ab97995dfb7d 100644 --- a/src/query/service/src/local/executor.rs +++ b/src/query/service/src/local/executor.rs @@ -135,7 +135,8 @@ impl SessionExecutor { ) -> Result<(SendableDataBlockStream, Arc, Plan, Statement)> { let context = session.create_query_context().await?; let mut planner = Planner::new(context.clone()); - let (plan, extras) = planner.plan_sql(sql).await?; + let extras = planner.parse_sql(sql)?; + let plan = planner.plan_stmt(&extras.statement).await?; let interpreter = InterpreterFactory::get(context.clone(), &plan).await?; let ctx = context.clone(); diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs index 89ca52ac5b97..ebde976ebaf4 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs @@ -121,7 +121,9 @@ impl FlightSqlServiceImpl { .map_err(|e| status!("Could not create_query_context", e))?; let mut planner = Planner::new(context.clone()); - planner.plan_sql(query).await + let extras = planner.parse_sql(query)?; + let plan = planner.plan_stmt(&extras.statement).await?; + Ok((plan, extras)) } #[async_backtrace::framed] diff --git a/src/query/service/src/servers/http/clickhouse_handler.rs b/src/query/service/src/servers/http/clickhouse_handler.rs index 22f41136144f..8bfa38b2f73b 100644 --- a/src/query/service/src/servers/http/clickhouse_handler.rs +++ b/src/query/service/src/servers/http/clickhouse_handler.rs @@ -27,7 +27,6 @@ use databend_common_expression::DataSchemaRef; use databend_common_formats::ClickhouseFormatType; use databend_common_formats::FileFormatOptionsExt; use databend_common_formats::FileFormatTypeExt; -use databend_common_sql::Planner; use fastrace::func_path; use fastrace::prelude::*; use futures::StreamExt; @@ -52,13 +51,12 @@ use serde::Deserialize; use serde::Serialize; use crate::interpreters::interpreter_plan_sql; +use crate::interpreters::plan_sql; use crate::interpreters::InterpreterFactory; use crate::interpreters::InterpreterPtr; use crate::servers::http::middleware::sanitize_request_headers; use crate::servers::http::v1::HttpQueryContext; -use crate::sessions::QueriesQueueManager; use crate::sessions::QueryContext; -use crate::sessions::QueryEntry; use crate::sessions::SessionType; use crate::sessions::TableContext; @@ -256,16 +254,11 @@ pub async fn clickhouse_handler_get( let default_format = get_default_format(¶ms, headers).map_err(BadRequest)?; let sql = params.query(); // Use interpreter_plan_sql, we can write the query log if an error occurs. - let (plan, extras) = interpreter_plan_sql(context.clone(), &sql) + let (plan, extras, _guard) = interpreter_plan_sql(context.clone(), &sql, true) .await .map_err(|err| err.display_with_sql(&sql)) .map_err(BadRequest)?; - let query_entry = QueryEntry::create(&context, &plan, &extras).map_err(BadRequest)?; - let _guard = QueriesQueueManager::instance() - .acquire(query_entry) - .await - .map_err(BadRequest)?; let format = get_format_with_default(extras.format, default_format)?; let interpreter = InterpreterFactory::get(context.clone(), &plan) .await @@ -345,19 +338,11 @@ pub async fn clickhouse_handler_post( }; info!("receive clickhouse http post, (query + body) = {}", &msg); - let mut planner = Planner::new(ctx.clone()); - let (mut plan, extras) = planner - .plan_sql(&sql) + let (mut plan, extras, _guard) = plan_sql(ctx.clone(), &sql, true) .await .map_err(|err| err.display_with_sql(&sql)) .map_err(BadRequest)?; - let entry = QueryEntry::create(&ctx, &plan, &extras).map_err(BadRequest)?; - let _guard = QueriesQueueManager::instance() - .acquire(entry) - .await - .map_err(BadRequest)?; - let mut handle = None; let output_schema = plan.schema(); diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index 779e22dc9e29..b384c69afd27 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -45,10 +45,8 @@ use crate::servers::http::v1::http_query_handlers::QueryResponseField; use crate::servers::http::v1::query::http_query::ResponseState; use crate::servers::http::v1::query::sized_spsc::SizedChannelSender; use crate::sessions::AcquireQueueGuard; -use crate::sessions::QueriesQueueManager; use crate::sessions::QueryAffect; use crate::sessions::QueryContext; -use crate::sessions::QueryEntry; use crate::sessions::Session; use crate::sessions::TableContext; @@ -346,32 +344,15 @@ impl ExecuteState { info!("http query prepare to plan sql"); // Use interpreter_plan_sql, we can write the query log if an error occurs. - let (plan, extras) = interpreter_plan_sql(ctx.clone(), &sql) + let (plan, _, queue_guard) = interpreter_plan_sql(ctx.clone(), &sql, true) .await .map_err(|err| err.display_with_sql(&sql)) .with_context(make_error)?; - - let query_queue_manager = QueriesQueueManager::instance(); - - info!( - "http query preparing to acquire from query queue, length: {}", - query_queue_manager.length() - ); - - let entry = QueryEntry::create(&ctx, &plan, &extras).with_context(make_error)?; - let queue_guard = query_queue_manager - .acquire(entry) - .await - .with_context(make_error)?; { // set_var may change settings let mut guard = format_settings.write(); *guard = Some(ctx.get_format_settings().with_context(make_error)?); } - info!( - "http query finished acquiring from queue, length: {}", - query_queue_manager.length() - ); let interpreter = InterpreterFactory::get(ctx.clone(), &plan) .await diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 796697c30c3f..35046f59b8cd 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -56,9 +56,7 @@ use crate::servers::mysql::writers::ProgressReporter; use crate::servers::mysql::writers::QueryResult; use crate::servers::mysql::MySQLFederated; use crate::servers::mysql::MYSQL_VERSION; -use crate::sessions::QueriesQueueManager; use crate::sessions::QueryContext; -use crate::sessions::QueryEntry; use crate::sessions::Session; use crate::sessions::TableContext; use crate::stream::DataBlockStream; @@ -377,10 +375,7 @@ impl InteractiveWorkerBase { context.set_id(query_id); // Use interpreter_plan_sql, we can write the query log if an error occurs. - let (plan, extras) = interpreter_plan_sql(context.clone(), query).await?; - - let entry = QueryEntry::create(&context, &plan, &extras)?; - let _guard = QueriesQueueManager::instance().acquire(entry).await?; + let (plan, _, _guard) = interpreter_plan_sql(context.clone(), query, true).await?; let interpreter = InterpreterFactory::get(context.clone(), &plan).await?; let has_result_set = plan.has_result_set(); diff --git a/src/query/service/src/sessions/queue_mgr.rs b/src/query/service/src/sessions/queue_mgr.rs index 3a48b61b0af5..fee02649935b 100644 --- a/src/query/service/src/sessions/queue_mgr.rs +++ b/src/query/service/src/sessions/queue_mgr.rs @@ -28,6 +28,7 @@ use std::time::Instant; use std::time::SystemTime; use databend_common_ast::ast::ExplainKind; +use databend_common_ast::ast::Statement; use databend_common_base::base::GlobalInstance; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -41,7 +42,6 @@ use databend_common_metrics::session::incr_session_queue_acquire_timeout_count; use databend_common_metrics::session::record_session_queue_acquire_duration_ms; use databend_common_metrics::session::set_session_queued_queries; use databend_common_sql::plans::Plan; -use databend_common_sql::PlanExtras; use log::info; use parking_lot::Mutex; use pin_project_lite::pin_project; @@ -316,9 +316,9 @@ pub struct QueryEntry { } impl QueryEntry { - fn create_entry( + pub fn create_entry( ctx: &Arc, - plan_extras: &PlanExtras, + stmt: &Statement, need_acquire_to_queue: bool, ) -> Result { let settings = ctx.get_settings(); @@ -327,7 +327,7 @@ impl QueryEntry { need_acquire_to_queue, query_id: ctx.get_id(), create_time: ctx.get_created_time(), - sql: plan_extras.statement.to_mask_sql(), + sql: stmt.to_mask_sql(), user_info: ctx.get_current_user()?, timeout: match settings.get_statement_queued_timeout()? { 0 => Duration::from_secs(60 * 60 * 24 * 365 * 35), @@ -336,13 +336,9 @@ impl QueryEntry { }) } - pub fn create( - ctx: &Arc, - plan: &Plan, - plan_extras: &PlanExtras, - ) -> Result { + pub fn create(ctx: &Arc, plan: &Plan, stmt: &Statement) -> Result { let need_add_to_queue = Self::is_heavy_action(plan); - QueryEntry::create_entry(ctx, plan_extras, need_add_to_queue) + QueryEntry::create_entry(ctx, stmt, need_add_to_queue) } /// Check a plan is heavy action or not. @@ -402,6 +398,7 @@ impl QueryEntry { | Plan::VacuumTable(_) | Plan::VacuumTemporaryFiles(_) | Plan::RefreshIndex(_) + | Plan::ReclusterTable { .. } | Plan::TruncateTable(_) => { return true; } diff --git a/src/query/service/src/table_functions/others/suggested_background_tasks.rs b/src/query/service/src/table_functions/others/suggested_background_tasks.rs index 3aa67581400d..a2053c953908 100644 --- a/src/query/service/src/table_functions/others/suggested_background_tasks.rs +++ b/src/query/service/src/table_functions/others/suggested_background_tasks.rs @@ -160,7 +160,7 @@ impl SuggestedBackgroundTasksSource { sql: String, ) -> Result> { // Use interpreter_plan_sql, we can write the query log if an error occurs. - let (plan, _) = interpreter_plan_sql(ctx.clone(), sql.as_str()).await?; + let (plan, _, _) = interpreter_plan_sql(ctx.clone(), sql.as_str(), false).await?; let data_schema = plan.schema(); let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; diff --git a/src/query/service/src/test_kits/context.rs b/src/query/service/src/test_kits/context.rs index f95ea84e9094..f2a54a286516 100644 --- a/src/query/service/src/test_kits/context.rs +++ b/src/query/service/src/test_kits/context.rs @@ -31,7 +31,7 @@ use crate::sql::Planner; /// If you no need to care the ctx please use TestFixture.execute_query(). pub async fn execute_query(ctx: Arc, query: &str) -> Result { let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(query).await?; + let plan = planner.plan_sql(query).await?; let executor = InterpreterFactory::get(ctx.clone(), &plan).await?; executor.execute(ctx.clone()).await } diff --git a/src/query/service/src/test_kits/fixture.rs b/src/query/service/src/test_kits/fixture.rs index 09fb1534168a..81ce77a2bfe5 100644 --- a/src/query/service/src/test_kits/fixture.rs +++ b/src/query/service/src/test_kits/fixture.rs @@ -877,7 +877,7 @@ impl TestFixture { pub async fn execute_query(&self, query: &str) -> Result { let ctx = self.new_query_ctx().await?; let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(query).await?; + let plan = planner.plan_sql(query).await?; let executor = InterpreterFactory::get(ctx.clone(), &plan).await?; executor.execute(ctx).await } diff --git a/src/query/service/tests/it/frame/dataframe.rs b/src/query/service/tests/it/frame/dataframe.rs index c482126556dc..4f820dff8586 100644 --- a/src/query/service/tests/it/frame/dataframe.rs +++ b/src/query/service/tests/it/frame/dataframe.rs @@ -411,7 +411,7 @@ async fn test_box_display() { for sql in cases { let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(sql).await.unwrap(); + let plan = planner.plan_sql(sql).await.unwrap(); let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await.unwrap(); let stream = interpreter.execute(ctx.clone()).await.unwrap(); diff --git a/src/query/service/tests/it/parquet_rs/utils.rs b/src/query/service/tests/it/parquet_rs/utils.rs index d168734347eb..9073358eda83 100644 --- a/src/query/service/tests/it/parquet_rs/utils.rs +++ b/src/query/service/tests/it/parquet_rs/utils.rs @@ -31,7 +31,7 @@ pub async fn create_parquet_test_fixture() -> TestFixture { pub async fn get_data_source_plan(ctx: Arc, sql: &str) -> Result { let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(sql).await?; + let plan = planner.plan_sql(sql).await?; let plan = if let Plan::Query { s_expr, metadata, diff --git a/src/query/service/tests/it/pipelines/builders/runtime_filter.rs b/src/query/service/tests/it/pipelines/builders/runtime_filter.rs index 811153ec118b..d171655ffecf 100644 --- a/src/query/service/tests/it/pipelines/builders/runtime_filter.rs +++ b/src/query/service/tests/it/pipelines/builders/runtime_filter.rs @@ -32,8 +32,7 @@ use databend_query::test_kits::TestFixture; async fn plan_sql(ctx: Arc, sql: &str) -> Result { let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(sql).await?; - Ok(plan) + planner.plan_sql(sql).await } async fn execute_sql(ctx: Arc, sql: &str) -> Result { diff --git a/src/query/service/tests/it/servers/admin/v1/status.rs b/src/query/service/tests/it/servers/admin/v1/status.rs index a02c4ce65f78..62703247bdb3 100644 --- a/src/query/service/tests/it/servers/admin/v1/status.rs +++ b/src/query/service/tests/it/servers/admin/v1/status.rs @@ -64,7 +64,7 @@ async fn run_query(query_ctx: &Arc) -> Result .get_current_session() .set_authed_user(user, None) .await?; - let (plan, _) = interpreter_plan_sql(query_ctx.clone(), sql).await?; + let (plan, _, _) = interpreter_plan_sql(query_ctx.clone(), sql, false).await?; InterpreterFactory::get(query_ctx.clone(), &plan).await } diff --git a/src/query/service/tests/it/sessions/queue_mgr.rs b/src/query/service/tests/it/sessions/queue_mgr.rs index ceb92b064af5..91b566918d21 100644 --- a/src/query/service/tests/it/sessions/queue_mgr.rs +++ b/src/query/service/tests/it/sessions/queue_mgr.rs @@ -314,19 +314,19 @@ async fn test_heavy_actions() -> Result<()> { { let sql = "create table t1(a int)"; - let (plan, _extras) = planner.plan_sql(sql).await?; + let plan = planner.plan_sql(sql).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; let _ = interpreter.execute(ctx.clone()).await?; } { let sql = "create table t2(a int)"; - let (plan, _extras) = planner.plan_sql(sql).await?; + let plan = planner.plan_sql(sql).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; let _ = interpreter.execute(ctx.clone()).await?; } { let sql = "create stage s1"; - let (plan, _extras) = planner.plan_sql(sql).await?; + let plan = planner.plan_sql(sql).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; let _ = interpreter.execute(ctx.clone()).await?; } @@ -335,9 +335,9 @@ async fn test_heavy_actions() -> Result<()> { // Check. for query in queries.iter() { let mut planner = Planner::new(ctx.clone()); - let (plan, extras) = planner.plan_sql(query.sql).await?; - - let query_entry = QueryEntry::create(&ctx, &plan, &extras)?; + let extras = planner.parse_sql(query.sql)?; + let plan = planner.plan_stmt(&extras.statement).await?; + let query_entry = QueryEntry::create(&ctx, &plan, &extras.statement)?; if query.add_to_queue != query_entry.need_acquire_to_queue() { error!( "query: {:?}, query-entry: {:?}", diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index a73205db1886..48151422580c 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -1020,7 +1020,7 @@ async fn test_get_same_table_once() -> Result<()> { let ctx = Arc::new(CtxDelegation::new(ctx, faked_catalog)); let mut planner = Planner::new(ctx.clone()); - let (_, _) = planner.plan_sql(query.as_str()).await?; + let _ = planner.plan_sql(query.as_str()).await?; assert_eq!( ctx.table_without_cache diff --git a/src/query/service/tests/it/sql/exec/mod.rs b/src/query/service/tests/it/sql/exec/mod.rs index 47674ad222be..792a234cdb8d 100644 --- a/src/query/service/tests/it/sql/exec/mod.rs +++ b/src/query/service/tests/it/sql/exec/mod.rs @@ -77,7 +77,7 @@ pub async fn test_snapshot_consistency() -> Result<()> { ); // a. thread 1: read table - let (query_plan, _) = planner.plan_sql(&query).await?; + let query_plan = planner.plan_sql(&query).await?; if let Plan::Query { s_expr: _s_expr, @@ -148,7 +148,7 @@ pub async fn test_snapshot_consistency() -> Result<()> { let compact_task = async move { let compact_sql = format!("optimize table {}.{} compact", db2, tbl2); - let (compact_plan, _) = planner2.plan_sql(&compact_sql).await?; + let compact_plan = planner2.plan_sql(&compact_sql).await?; if let Plan::OptimizeCompactBlock { s_expr, need_purge } = compact_plan { let optimize_interpreter = OptimizeCompactBlockInterpreter::try_create( ctx.clone(), diff --git a/src/query/service/tests/it/sql/planner/builders/binder.rs b/src/query/service/tests/it/sql/planner/builders/binder.rs index a3829436c27a..d68268a8301b 100644 --- a/src/query/service/tests/it/sql/planner/builders/binder.rs +++ b/src/query/service/tests/it/sql/planner/builders/binder.rs @@ -21,7 +21,7 @@ async fn test_query_kind() -> Result<()> { fixture.default_db_name(), fixture.default_table_name() ); - let (_, _) = planner.plan_sql(&sql).await?; + let _ = planner.plan_sql(&sql).await?; let kind = ctx.get_query_kind(); assert_eq!(kind, QueryKind::CopyIntoTable); Ok(()) diff --git a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs index 8c4646879ba7..929ff6c61cc9 100644 --- a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs +++ b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs @@ -255,7 +255,7 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> { let query = format!("optimize table {db_name}.{tbl_name} compact"); let ctx = fixture.new_query_ctx().await?; let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(&query).await?; + let plan = planner.plan_sql(&query).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; ctx.get_settings().set_max_threads(1)?; let data_stream = interpreter.execute(ctx.clone()).await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/internal_column.rs b/src/query/service/tests/it/storages/fuse/operations/internal_column.rs index a759e15d1eb0..78edb8ea09f9 100644 --- a/src/query/service/tests/it/storages/fuse/operations/internal_column.rs +++ b/src/query/service/tests/it/storages/fuse/operations/internal_column.rs @@ -198,7 +198,7 @@ async fn test_internal_column() -> Result<()> { let query = format!("optimize table {db}.{tbl} compact"); let ctx = fixture.new_query_ctx().await?; let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(&query).await?; + let plan = planner.plan_sql(&query).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; let data_stream = interpreter.execute(ctx.clone()).await?; let _ = data_stream.try_collect::>().await; diff --git a/src/query/service/tests/it/storages/fuse/operations/optimize.rs b/src/query/service/tests/it/storages/fuse/operations/optimize.rs index 02f300cf3680..ddc0efbdfed7 100644 --- a/src/query/service/tests/it/storages/fuse/operations/optimize.rs +++ b/src/query/service/tests/it/storages/fuse/operations/optimize.rs @@ -64,7 +64,7 @@ async fn test_fuse_table_optimize() -> Result<()> { let query = format!("optimize table {db_name}.{tbl_name} compact"); let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(&query).await?; + let plan = planner.plan_sql(&query).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; // `PipelineBuilder` will parallelize the table reading according to value of setting `max_threads`, diff --git a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs index 854baa2b19d4..45dcb37f07c0 100644 --- a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs @@ -85,7 +85,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { ctx.evict_table_from_cache("default", "default", "t")?; let query = "delete from default.t where c=1"; let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(query).await?; + let plan = planner.plan_sql(query).await?; if let Plan::DataMutation { s_expr, schema, .. } = plan { do_mutation(ctx.clone(), *s_expr.clone(), schema.clone()).await?; } @@ -117,7 +117,7 @@ async fn test_table_update_analyze_statistics() -> Result<()> { // update let query = format!("update {}.{} set id = 3 where id = 0", db_name, tb_name); let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(&query).await?; + let plan = planner.plan_sql(&query).await?; if let Plan::DataMutation { s_expr, schema, .. } = plan { do_mutation(ctx.clone(), *s_expr.clone(), schema.clone()).await?; } diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 394c2323ff1e..4833e873760e 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -1577,7 +1577,7 @@ impl Binder { if table.engine() == VIEW_ENGINE { if let Some(query) = table.get_table_info().options().get(QUERY) { let mut planner = Planner::new(self.ctx.clone()); - let (plan, _) = planner.plan_sql(query).await?; + let plan = planner.plan_sql(query).await?; Ok((infer_table_schema(&plan.schema())?, vec![], None)) } else { Err(ErrorCode::Internal( diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index c4c2ac9b8087..8bb991026c85 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -83,14 +83,13 @@ impl Planner { #[async_backtrace::framed] #[fastrace::trace] - pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, PlanExtras)> { + pub async fn plan_sql(&mut self, sql: &str) -> Result { let extras = self.parse_sql(sql)?; - let plan = self.plan_stmt(&extras.statement).await?; - Ok((plan, extras)) + self.plan_stmt(&extras.statement).await } #[fastrace::trace] - pub fn parse_sql(&mut self, sql: &str) -> Result { + pub fn parse_sql(&self, sql: &str) -> Result { let settings = self.ctx.get_settings(); let sql_dialect = settings.get_sql_dialect()?; // compile prql to sql for prql dialect diff --git a/src/query/storages/system/src/columns_table.rs b/src/query/storages/system/src/columns_table.rs index cecd2c0518df..1ca072606e82 100644 --- a/src/query/storages/system/src/columns_table.rs +++ b/src/query/storages/system/src/columns_table.rs @@ -157,9 +157,7 @@ impl ColumnsTable { let fields = if let Some(query) = table.options().get(QUERY) { let mut planner = Planner::new(ctx.clone()); match planner.plan_sql(query).await { - Ok((plan, _)) => { - infer_table_schema(&plan.schema())?.fields().clone() - } + Ok(plan) => infer_table_schema(&plan.schema())?.fields().clone(), Err(e) => { // If VIEW SELECT QUERY plan err, should return empty. not destroy the query. warn!(