Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Oct 18, 2024
1 parent 78dae3d commit 293cf2e
Show file tree
Hide file tree
Showing 38 changed files with 122 additions and 111 deletions.
2 changes: 1 addition & 1 deletion src/bendpy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,6 @@ impl PySessionContext {

async fn plan_sql(ctx: &Arc<QueryContext>, sql: &str) -> Result<PyDataFrame> {
let mut planner = Planner::new(ctx.clone());
let (plan, _) = planner.plan_sql(sql).await?;
let plan = planner.plan_sql(sql).await?;
Ok(PyDataFrame::new(ctx.clone(), plan, default_box_size()))
}
4 changes: 1 addition & 3 deletions src/query/ee/tests/it/aggregating_index/index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,7 @@ async fn test_sync_agg_index_after_copy_into() -> Result<()> {

async fn plan_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<Plan> {
let mut planner = Planner::new(ctx);
let (plan, _) = planner.plan_sql(sql).await?;

Ok(plan)
planner.plan_sql(sql).await
}

async fn execute_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<SendableDataBlockStream> {
Expand Down
4 changes: 1 addition & 3 deletions src/query/ee/tests/it/aggregating_index/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ fn test_fuzz_with_spill() -> Result<()> {

async fn plan_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<Plan> {
let mut planner = Planner::new(ctx.clone());
let (plan, _) = planner.plan_sql(sql).await?;

Ok(plan)
planner.plan_sql(sql).await
}

async fn execute_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<SendableDataBlockStream> {
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,13 +1108,13 @@ impl AccessChecker for PrivilegeAccess {
}
Plan::CreateView(plan) => {
let mut planner = Planner::new(self.ctx.clone());
let (plan, _) = planner.plan_sql(&plan.subquery).await?;
let plan = planner.plan_sql(&plan.subquery).await?;
self.check(ctx, &plan).await?
}
Plan::AlterView(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Alter, false).await?;
let mut planner = Planner::new(self.ctx.clone());
let (plan, _) = planner.plan_sql(&plan.subquery).await?;
let plan = planner.plan_sql(&plan.subquery).await?;
self.check(ctx, &plan).await?
}
Plan::DropView(plan) => {
Expand Down
71 changes: 64 additions & 7 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::SystemTime;

use databend_common_ast::ast::AlterTableAction;
use databend_common_ast::ast::AlterTableStmt;
use databend_common_ast::ast::Literal;
use databend_common_ast::ast::ModifyColumnAction;
use databend_common_ast::ast::Statement;
use databend_common_base::base::short_sql;
use databend_common_base::runtime::profile::get_statistics_desc;
Expand Down Expand Up @@ -54,7 +57,10 @@ use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::ServiceQueryExecutor;
use crate::sessions::AcquireQueueGuard;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryContext;
use crate::sessions::QueryEntry;
use crate::sessions::SessionManager;
use crate::stream::DataBlockStream;
use crate::stream::ProgressStream;
Expand Down Expand Up @@ -197,22 +203,52 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
}
}

pub async fn plan_sql(
ctx: Arc<QueryContext>,
sql: &str,
acquire_queue: bool,
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
let mut planner = Planner::new_with_sample_executor(
ctx.clone(),
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
);
let extras = planner.parse_sql(sql)?;
if !acquire_queue {
let plan = planner.plan_stmt(&extras.statement).await?;
return Ok((plan, extras, AcquireQueueGuard::create(None)));
}

let need_acquire_lock = need_acquire_lock(&extras.statement);
if need_acquire_lock {
let query_entry = QueryEntry::create_entry(&ctx, &extras.statement, true)?;
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
let plan = planner.plan_stmt(&extras.statement).await?;
Ok((plan, extras, guard))
} else {
let plan = planner.plan_stmt(&extras.statement).await?;
let query_entry = QueryEntry::create(&ctx, &plan, &extras.statement)?;
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
Ok((plan, extras, guard))
}
}

/// There are two steps to execute a query:
/// 1. Plan the SQL
/// 2. Execute the plan -- interpreter
///
/// This function is used to plan the SQL. If an error occurs, we will log the query start and finished.
pub async fn interpreter_plan_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<(Plan, PlanExtras)> {
let mut planner = Planner::new_with_sample_executor(
ctx.clone(),
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
);
let result = planner.plan_sql(sql).await;
pub async fn interpreter_plan_sql(
ctx: Arc<QueryContext>,
sql: &str,
acquire_queue: bool,
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
let result = plan_sql(ctx.clone(), sql, acquire_queue).await;

let short_sql = short_sql(
sql.to_string(),
ctx.get_settings().get_short_sql_max_length()?,
);
let mut stmt = if let Ok((_, extras)) = &result {
let mut stmt = if let Ok((_, extras, _)) = &result {
Some(extras.statement.clone())
} else {
// Only log if there's an error
Expand Down Expand Up @@ -299,3 +335,24 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc<QueryContext>)
Err(error) => Err(error.clone()),
}
}

fn need_acquire_lock(stmt: &Statement) -> bool {
match stmt {
Statement::Replace(_)
| Statement::MergeInto(_)
| Statement::Update(_)
| Statement::Delete(_)
| Statement::OptimizeTable(_)
| Statement::TruncateTable(_) => true,

Statement::AlterTable(AlterTableStmt { action, .. }) => matches!(
action,
AlterTableAction::ReclusterTable { .. }
| AlterTableAction::ModifyColumn {
action: ModifyColumnAction::SetDataType(_),
}
),

_ => false,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Interpreter for AddTableColumnInterpreter {
field.default_expr().unwrap()
);
let mut planner = Planner::new(self.ctx.clone());
let (plan, _) = planner.plan_sql(&query).await?;
let plan = planner.plan_sql(&query).await?;
if let Plan::DataMutation { s_expr, schema, .. } = plan {
let interpreter =
MutationInterpreter::try_create(self.ctx.clone(), *s_expr, schema)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl AnalyzeTableInterpreter {

async fn plan_sql(&self, sql: String) -> Result<(PhysicalPlan, BindContext)> {
let mut planner = Planner::new(self.ctx.clone());
let (plan, _) = planner.plan_sql(&sql).await?;
let plan = planner.plan_sql(&sql).await?;
let (select_plan, bind_context) = match &plan {
Plan::Query {
s_expr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Interpreter for DescribeTableInterpreter {
let schema = if tbl_info.engine() == VIEW_ENGINE {
if let Some(query) = tbl_info.options().get(QUERY) {
let mut planner = Planner::new(self.ctx.clone());
let (plan, _) = planner.plan_sql(query).await?;
let plan = planner.plan_sql(query).await?;
infer_table_schema(&plan.schema())
} else {
return Err(ErrorCode::Internal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl ModifyTableColumnInterpreter {

// 2. build plan by sql
let mut planner = Planner::new(self.ctx.clone());
let (plan, _extras) = planner.plan_sql(&sql).await?;
let plan = planner.plan_sql(&sql).await?;

// 3. build physical plan by plan
let (select_plan, select_column_bindings) = match plan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Interpreter for AlterViewInterpreter {
self.plan.subquery.clone()
} else {
let mut planner = Planner::new(self.ctx.clone());
let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
let plan = planner.plan_sql(&self.plan.subquery.clone()).await?;
if plan.schema().fields().len() != self.plan.column_names.len() {
return Err(ErrorCode::BadDataArrayLength(format!(
"column name length mismatch, expect {}, got {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Interpreter for CreateViewInterpreter {
let table_function = catalog.list_table_functions();
let mut options = BTreeMap::new();
let mut planner = Planner::new(self.ctx.clone());
let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
let plan = planner.plan_sql(&self.plan.subquery.clone()).await?;
match plan.clone() {
Plan::Query { metadata, .. } => {
let metadata = metadata.read().clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Interpreter for DescribeViewInterpreter {
let schema = if engine == VIEW_ENGINE {
if let Some(query) = tbl_info.options().get(QUERY) {
let mut planner = Planner::new(self.ctx.clone());
let (plan, _) = planner.plan_sql(query).await?;
let plan = planner.plan_sql(query).await?;
infer_table_schema(&plan.schema())
} else {
return Err(ErrorCode::Internal(
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub use access::ManagementModeAccess;
pub use common::InterpreterQueryLog;
pub use hook::HookOperator;
pub use interpreter::interpreter_plan_sql;
pub use interpreter::plan_sql;
pub use interpreter::Interpreter;
pub use interpreter::InterpreterPtr;
pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Client for ScriptClient {
.await?;

let mut planner = Planner::new(ctx.clone());
let (plan, _) = planner.plan_sql(query).await?;
let plan = planner.plan_sql(query).await?;
let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;
let stream = interpreter.execute(ctx.clone()).await?;
let blocks = stream.try_collect::<Vec<_>>().await?;
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/local/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ impl SessionExecutor {
) -> Result<(SendableDataBlockStream, Arc<QueryContext>, Plan, Statement)> {
let context = session.create_query_context().await?;
let mut planner = Planner::new(context.clone());
let (plan, extras) = planner.plan_sql(sql).await?;
let extras = planner.parse_sql(sql)?;
let plan = planner.plan_stmt(&extras.statement).await?;

let interpreter = InterpreterFactory::get(context.clone(), &plan).await?;
let ctx = context.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ impl FlightSqlServiceImpl {
.map_err(|e| status!("Could not create_query_context", e))?;

let mut planner = Planner::new(context.clone());
planner.plan_sql(query).await
let extras = planner.parse_sql(query)?;
let plan = planner.plan_stmt(&extras.statement).await?;
Ok((plan, extras))
}

#[async_backtrace::framed]
Expand Down
21 changes: 3 additions & 18 deletions src/query/service/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use databend_common_expression::DataSchemaRef;
use databend_common_formats::ClickhouseFormatType;
use databend_common_formats::FileFormatOptionsExt;
use databend_common_formats::FileFormatTypeExt;
use databend_common_sql::Planner;
use fastrace::func_path;
use fastrace::prelude::*;
use futures::StreamExt;
Expand All @@ -52,13 +51,12 @@ use serde::Deserialize;
use serde::Serialize;

use crate::interpreters::interpreter_plan_sql;
use crate::interpreters::plan_sql;
use crate::interpreters::InterpreterFactory;
use crate::interpreters::InterpreterPtr;
use crate::servers::http::middleware::sanitize_request_headers;
use crate::servers::http::v1::HttpQueryContext;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryContext;
use crate::sessions::QueryEntry;
use crate::sessions::SessionType;
use crate::sessions::TableContext;

Expand Down Expand Up @@ -256,16 +254,11 @@ pub async fn clickhouse_handler_get(
let default_format = get_default_format(&params, headers).map_err(BadRequest)?;
let sql = params.query();
// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, extras) = interpreter_plan_sql(context.clone(), &sql)
let (plan, extras, _guard) = interpreter_plan_sql(context.clone(), &sql, true)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(BadRequest)?;

let query_entry = QueryEntry::create(&context, &plan, &extras).map_err(BadRequest)?;
let _guard = QueriesQueueManager::instance()
.acquire(query_entry)
.await
.map_err(BadRequest)?;
let format = get_format_with_default(extras.format, default_format)?;
let interpreter = InterpreterFactory::get(context.clone(), &plan)
.await
Expand Down Expand Up @@ -345,19 +338,11 @@ pub async fn clickhouse_handler_post(
};
info!("receive clickhouse http post, (query + body) = {}", &msg);

let mut planner = Planner::new(ctx.clone());
let (mut plan, extras) = planner
.plan_sql(&sql)
let (mut plan, extras, _guard) = plan_sql(ctx.clone(), &sql, true)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(BadRequest)?;

let entry = QueryEntry::create(&ctx, &plan, &extras).map_err(BadRequest)?;
let _guard = QueriesQueueManager::instance()
.acquire(entry)
.await
.map_err(BadRequest)?;

let mut handle = None;
let output_schema = plan.schema();

Expand Down
21 changes: 1 addition & 20 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ use crate::servers::http::v1::http_query_handlers::QueryResponseField;
use crate::servers::http::v1::query::http_query::ResponseState;
use crate::servers::http::v1::query::sized_spsc::SizedChannelSender;
use crate::sessions::AcquireQueueGuard;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryAffect;
use crate::sessions::QueryContext;
use crate::sessions::QueryEntry;
use crate::sessions::Session;
use crate::sessions::TableContext;

Expand Down Expand Up @@ -346,32 +344,15 @@ impl ExecuteState {
info!("http query prepare to plan sql");

// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, extras) = interpreter_plan_sql(ctx.clone(), &sql)
let (plan, _, queue_guard) = interpreter_plan_sql(ctx.clone(), &sql, true)
.await
.map_err(|err| err.display_with_sql(&sql))
.with_context(make_error)?;

let query_queue_manager = QueriesQueueManager::instance();

info!(
"http query preparing to acquire from query queue, length: {}",
query_queue_manager.length()
);

let entry = QueryEntry::create(&ctx, &plan, &extras).with_context(make_error)?;
let queue_guard = query_queue_manager
.acquire(entry)
.await
.with_context(make_error)?;
{
// set_var may change settings
let mut guard = format_settings.write();
*guard = Some(ctx.get_format_settings().with_context(make_error)?);
}
info!(
"http query finished acquiring from queue, length: {}",
query_queue_manager.length()
);

let interpreter = InterpreterFactory::get(ctx.clone(), &plan)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ use crate::servers::mysql::writers::ProgressReporter;
use crate::servers::mysql::writers::QueryResult;
use crate::servers::mysql::MySQLFederated;
use crate::servers::mysql::MYSQL_VERSION;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryContext;
use crate::sessions::QueryEntry;
use crate::sessions::Session;
use crate::sessions::TableContext;
use crate::stream::DataBlockStream;
Expand Down Expand Up @@ -377,10 +375,7 @@ impl InteractiveWorkerBase {
context.set_id(query_id);

// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, extras) = interpreter_plan_sql(context.clone(), query).await?;

let entry = QueryEntry::create(&context, &plan, &extras)?;
let _guard = QueriesQueueManager::instance().acquire(entry).await?;
let (plan, _, _guard) = interpreter_plan_sql(context.clone(), query, true).await?;

let interpreter = InterpreterFactory::get(context.clone(), &plan).await?;
let has_result_set = plan.has_result_set();
Expand Down
Loading

0 comments on commit 293cf2e

Please sign in to comment.