diff --git a/src/query/service/src/pipelines/builders/builder_union_all.rs b/src/query/service/src/pipelines/builders/builder_union_all.rs index 7ed40bc87e808..1c262a7c923d2 100644 --- a/src/query/service/src/pipelines/builders/builder_union_all.rs +++ b/src/query/service/src/pipelines/builders/builder_union_all.rs @@ -15,6 +15,7 @@ use async_channel::Receiver; use databend_common_exception::Result; use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_sinks::UnionReceiveSink; use databend_common_sql::executor::physical_plans::UnionAll; @@ -26,17 +27,29 @@ use crate::sessions::QueryContext; impl PipelineBuilder { pub fn build_union_all(&mut self, union_all: &UnionAll) -> Result<()> { - self.build_pipeline(&union_all.left)?; - let union_all_receiver = self.expand_union_all(&union_all.right)?; + self.build_pipeline(union_all.children.last().unwrap())?; + let mut remain_children_receivers = vec![]; + for (idx, remaining_child) in union_all + .children + .iter() + .take(union_all.children.len() - 1) + .enumerate() + { + remain_children_receivers.push((idx, self.expand_union_all(remaining_child)?)); + } + let schemas: Vec = union_all + .children + .iter() + .map(|plan| plan.output_schema()) + .collect::>()?; self.main_pipeline .add_transform(|transform_input_port, transform_output_port| { Ok(ProcessorPtr::create(TransformMergeBlock::try_create( transform_input_port, transform_output_port, - union_all.left.output_schema()?, - union_all.right.output_schema()?, - union_all.pairs.clone(), - union_all_receiver.clone(), + schemas.clone(), + union_all.output_cols.clone(), + remain_children_receivers.clone(), )?)) })?; Ok(()) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs index a2735d9131c9a..0c654857f5086 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -27,6 +27,7 @@ use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; +use databend_common_sql::IndexType; pub struct TransformMergeBlock { finished: bool, @@ -35,22 +36,20 @@ pub struct TransformMergeBlock { input_data: Option, output_data: Option, - left_schema: DataSchemaRef, - right_schema: DataSchemaRef, - pairs: Vec<(String, String)>, + schemas: Vec, + output_cols: Vec>, - receiver: Receiver, - receiver_result: Option, + receivers: Vec<(usize, Receiver)>, + receiver_results: Vec<(usize, DataBlock)>, } impl TransformMergeBlock { pub fn try_create( input: Arc, output: Arc, - left_schema: DataSchemaRef, - right_schema: DataSchemaRef, - pairs: Vec<(String, String)>, - receiver: Receiver, + schemas: Vec, + output_cols: Vec>, + receivers: Vec<(usize, Receiver)>, ) -> Result> { Ok(Box::new(TransformMergeBlock { finished: false, @@ -58,67 +57,75 @@ impl TransformMergeBlock { output, input_data: None, output_data: None, - left_schema, - right_schema, - pairs, - receiver, - receiver_result: None, + schemas, + output_cols, + receivers, + receiver_results: vec![], })) } - fn project_block(&self, block: DataBlock, is_left: bool) -> Result { + fn project_block(&self, block: DataBlock, idx: Option) -> Result { let num_rows = block.num_rows(); - let columns = self - .pairs - .iter() - .map(|(left, right)| { - if is_left { + let columns = if let Some(idx) = idx { + self.check_type(idx, &block)? + } else { + self.output_cols + .last() + .unwrap() + .iter() + .map(|idx| { Ok(block - .get_by_offset(self.left_schema.index_of(left)?) + .get_by_offset(self.schemas.last().unwrap().index_of(&idx.to_string())?) .clone()) - } else { - // If block from right, check if block schema matches self scheme(left schema) - // If unmatched, covert block columns types or report error - self.check_type(left, right, &block) - } - }) - .collect::>>()?; - Ok(DataBlock::new(columns, num_rows)) + }) + .collect::>>()? + }; + let block = DataBlock::new(columns, num_rows); + Ok(block) } - fn check_type( - &self, - left_name: &str, - right_name: &str, - block: &DataBlock, - ) -> Result { - let left_field = self.left_schema.field_with_name(left_name)?; - let left_data_type = left_field.data_type(); - - let right_field = self.right_schema.field_with_name(right_name)?; - let right_data_type = right_field.data_type(); - - let index = self.right_schema.index_of(right_name)?; - - if left_data_type == right_data_type { - return Ok(block.get_by_offset(index).clone()); - } + fn check_type(&self, idx: usize, block: &DataBlock) -> Result> { + let mut columns = vec![]; + for (left_idx, right_idx) in self + .output_cols + .last() + .unwrap() + .iter() + .zip(self.output_cols[idx].iter()) + { + let left_field = self + .schemas + .last() + .unwrap() + .field_with_name(&left_idx.to_string())?; + let left_data_type = left_field.data_type(); + + let right_field = self.schemas[idx].field_with_name(&right_idx.to_string())?; + let right_data_type = right_field.data_type(); + + let offset = self.schemas[idx].index_of(&right_idx.to_string())?; + if left_data_type == right_data_type { + columns.push(block.get_by_offset(offset).clone()); + continue; + } - if left_data_type.remove_nullable() == right_data_type.remove_nullable() { - let origin_column = block.get_by_offset(index).clone(); - let mut builder = ColumnBuilder::with_capacity(left_data_type, block.num_rows()); - let value = origin_column.value.as_ref(); - for idx in 0..block.num_rows() { - let scalar = value.index(idx).unwrap(); - builder.push(scalar); + if left_data_type.remove_nullable() == right_data_type.remove_nullable() { + let origin_column = block.get_by_offset(offset).clone(); + let mut builder = ColumnBuilder::with_capacity(left_data_type, block.num_rows()); + let value = origin_column.value.as_ref(); + for idx in 0..block.num_rows() { + let scalar = value.index(idx).unwrap(); + builder.push(scalar); + } + let col = builder.build(); + columns.push(BlockEntry::new(left_data_type.clone(), Value::Column(col))); + } else { + return Err(ErrorCode::IllegalDataType( + "The data type on both sides of the union does not match", + )); } - let col = builder.build(); - Ok(BlockEntry::new(left_data_type.clone(), Value::Column(col))) - } else { - Err(ErrorCode::IllegalDataType( - "The data type on both sides of the union does not match", - )) } + Ok(columns) } } @@ -148,12 +155,7 @@ impl Processor for TransformMergeBlock { return Ok(Event::NeedConsume); } - if self.input_data.is_some() || self.receiver_result.is_some() { - return Ok(Event::Sync); - } - - if let Ok(result) = self.receiver.try_recv() { - self.receiver_result = Some(result); + if self.input_data.is_some() || !self.receiver_results.is_empty() { return Ok(Event::Sync); } @@ -175,28 +177,25 @@ impl Processor for TransformMergeBlock { } fn process(&mut self) -> Result<()> { + let mut blocks = vec![]; + for (idx, receive_result) in self.receiver_results.iter() { + blocks.push(self.project_block(receive_result.clone(), Some(*idx))?); + } + self.receiver_results.clear(); if let Some(input_data) = self.input_data.take() { - if let Some(receiver_result) = self.receiver_result.take() { - self.output_data = Some(DataBlock::concat(&[ - self.project_block(input_data, true)?, - self.project_block(receiver_result, false)?, - ])?); - } else { - self.output_data = Some(self.project_block(input_data, true)?); - } - } else if let Some(receiver_result) = self.receiver_result.take() { - self.output_data = Some(self.project_block(receiver_result, false)?); + blocks.push(self.project_block(input_data, None)?); } - + self.output_data = Some(DataBlock::concat(&blocks)?); Ok(()) } #[async_backtrace::framed] async fn async_process(&mut self) -> Result<()> { if !self.finished { - if let Ok(result) = self.receiver.recv().await { - self.receiver_result = Some(result); - return Ok(()); + for (idx, receiver) in self.receivers.iter() { + if let Ok(result) = receiver.recv().await { + self.receiver_results.push((*idx, result)); + } } self.finished = true; } diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index 8c209f75e92d6..0aeb29eba9342 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -255,27 +255,24 @@ impl PhysicalPlanReplacer for Fragmenter { fn replace_union(&mut self, plan: &UnionAll) -> Result { let mut fragments = vec![]; - let left_input = self.replace(plan.left.as_ref())?; - let left_state = self.state.clone(); - - // Consume current fragments to prevent them being consumed by `right_input`. - fragments.append(&mut self.fragments); - let right_input = self.replace(plan.right.as_ref())?; - let right_state = self.state.clone(); - - fragments.append(&mut self.fragments); + let mut children = vec![]; + let mut states = vec![]; + for child in plan.children.iter() { + children.push(Box::new(self.replace(child)?)); + states.push(self.state.clone()); + fragments.append(&mut self.fragments); + } self.fragments = fragments; // If any of the input is a source fragment, the union all is a source fragment. - if left_state == State::SelectLeaf || right_state == State::SelectLeaf { + if states.iter().any(|state| state == &State::SelectLeaf) { self.state = State::SelectLeaf; } else { self.state = State::Other; } Ok(PhysicalPlan::UnionAll(UnionAll { - left: Box::new(left_input), - right: Box::new(right_input), + children, ..plan.clone() })) } diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index b80efc713b302..7ef1367c5d0bb 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -159,14 +159,14 @@ impl PhysicalPlan { )) } PhysicalPlan::UnionAll(union_all) => { - let left_child = union_all.left.format_join(metadata)?; - let right_child = union_all.right.format_join(metadata)?; - - let children = vec![ - FormatTreeNode::with_children("Left".to_string(), vec![left_child]), - FormatTreeNode::with_children("Right".to_string(), vec![right_child]), - ]; - + let mut children = Vec::with_capacity(union_all.children.len()); + for (idx, child) in union_all.children.iter().enumerate() { + let child = child.format_join(metadata)?; + children.push(FormatTreeNode::with_children( + format!("#{:?}", idx + 1), + vec![child], + )) + } Ok(FormatTreeNode::with_children( "UnionAll".to_string(), children, @@ -1013,11 +1013,9 @@ fn union_all_to_format_tree( } append_profile_info(&mut children, profs, plan.plan_id); - - children.extend(vec![ - to_format_tree(&plan.left, metadata, profs)?, - to_format_tree(&plan.right, metadata, profs)?, - ]); + for child in plan.children.iter() { + children.push(to_format_tree(child, metadata, profs)?); + } Ok(FormatTreeNode::with_children( "UnionAll".to_string(), diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index ac92a9779d2b9..6c41f5f0883a1 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -230,8 +230,9 @@ impl PhysicalPlan { PhysicalPlan::UnionAll(plan) => { plan.plan_id = *next_id; *next_id += 1; - plan.left.adjust_plan_id(next_id); - plan.right.adjust_plan_id(next_id); + for child in plan.children.iter_mut() { + child.adjust_plan_id(next_id); + } } PhysicalPlan::CteScan(plan) => { plan.plan_id = *next_id; @@ -573,9 +574,7 @@ impl PhysicalPlan { ), PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), - PhysicalPlan::UnionAll(plan) => Box::new( - std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), - ), + PhysicalPlan::UnionAll(plan) => Box::new(plan.children.iter().map(|child| &(**child))), PhysicalPlan::DistributedInsertSelect(plan) => { Box::new(std::iter::once(plan.input.as_ref())) } @@ -823,11 +822,6 @@ impl PhysicalPlan { PhysicalPlan::CteScan(v) => { format!("CTE index: {}, sub index: {}", v.cte_idx.0, v.cte_idx.1) } - PhysicalPlan::UnionAll(v) => v - .pairs - .iter() - .map(|(l, r)| format!("#{} <- #{}", l, r)) - .join(", "), _ => String::new(), }) } diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index bb61163e0e32e..0583e169d0ff5 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -370,14 +370,15 @@ pub trait PhysicalPlanReplacer { } fn replace_union(&mut self, plan: &UnionAll) -> Result { - let left = self.replace(&plan.left)?; - let right = self.replace(&plan.right)?; + let mut children = Vec::with_capacity(plan.children.len()); + for child in plan.children.iter() { + children.push(Box::new(self.replace(child)?)); + } Ok(PhysicalPlan::UnionAll(UnionAll { plan_id: plan.plan_id, - left: Box::new(left), - right: Box::new(right), + children, schema: plan.schema.clone(), - pairs: plan.pairs.clone(), + output_cols: plan.output_cols.clone(), stat_info: plan.stat_info.clone(), })) } @@ -671,8 +672,9 @@ impl PhysicalPlan { Self::traverse(&plan.input, pre_visit, visit, post_visit); } PhysicalPlan::UnionAll(plan) => { - Self::traverse(&plan.left, pre_visit, visit, post_visit); - Self::traverse(&plan.right, pre_visit, visit, post_visit); + for child in plan.children.iter() { + Self::traverse(child, pre_visit, visit, post_visit); + } } PhysicalPlan::DistributedInsertSelect(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); diff --git a/src/query/sql/src/executor/physical_plans/physical_union_all.rs b/src/query/sql/src/executor/physical_plans/physical_union_all.rs index f17806a6bedbd..9e2f6361ae861 100644 --- a/src/query/sql/src/executor/physical_plans/physical_union_all.rs +++ b/src/query/sql/src/executor/physical_plans/physical_union_all.rs @@ -28,6 +28,8 @@ use crate::executor::PhysicalPlan; use crate::executor::PhysicalPlanBuilder; use crate::optimizer::SExpr; use crate::plans::BoundColumnRef; +use crate::plans::Operator; +use crate::plans::RelOp; use crate::plans::ScalarItem; use crate::ColumnBindingBuilder; use crate::ColumnSet; @@ -39,9 +41,8 @@ use crate::Visibility; pub struct UnionAll { // A unique id of operator in a `PhysicalPlan` tree, only used for display. pub plan_id: u32, - pub left: Box, - pub right: Box, - pub pairs: Vec<(String, String)>, + pub children: Vec>, + pub output_cols: Vec>, pub schema: DataSchemaRef, // Only used for explain @@ -62,32 +63,30 @@ impl PhysicalPlanBuilder { required: ColumnSet, stat_info: PlanStatsInfo, ) -> Result { - // 1. Prune unused Columns. - let left_required = union_all.pairs.iter().fold(required.clone(), |mut acc, v| { - acc.insert(v.0); - acc - }); - let right_required = union_all.pairs.iter().fold(required, |mut acc, v| { - acc.insert(v.1); - acc - }); - - // 2. Build physical plan. - let left_plan = self.build(s_expr.child(0)?, left_required).await?; - let right_plan = self.build(s_expr.child(1)?, right_required).await?; - let left_schema = left_plan.output_schema()?; - let right_schema = right_plan.output_schema()?; - - let common_types = union_all.pairs.iter().map(|(l, r)| { - let left_field = left_schema.field_with_name(&l.to_string()).unwrap(); - let right_field = right_schema.field_with_name(&r.to_string()).unwrap(); + // Flatten union plan + let mut union_children = vec![]; + let mut output_cols = vec![]; + self.flatten_union( + s_expr, + union_all, + required, + &mut union_children, + &mut output_cols, + ) + .await?; + let right_schema = union_children[0].output_schema()?; + let left_schema = union_children[1].output_schema()?; + let mut common_types = vec![]; + for (r, l) in output_cols[0].iter().zip(output_cols[1].iter()) { + let left_field = left_schema.field_with_name(&l.to_string())?; + let right_field = right_schema.field_with_name(&r.to_string())?; let common_type = common_super_type( left_field.data_type().clone(), right_field.data_type().clone(), &BUILTIN_FUNCTIONS.default_cast_rules, ); - common_type.ok_or_else(|| { + common_types.push(common_type.ok_or_else(|| { ErrorCode::SemanticError(format!( "SetOperation's types cannot be matched, left column {:?}, type: {:?}, right column {:?}, type: {:?}", left_field.name(), @@ -95,17 +94,21 @@ impl PhysicalPlanBuilder { right_field.name(), right_field.data_type() )) - }) - }).collect::>>()?; + })?) + } + + for (left_plan, left_output_cols) in union_children.iter().zip(output_cols.iter()).skip(2) { + common_types = common_super_types_for_union(common_types, left_plan, left_output_cols)?; + } async fn cast_plan( plan_builder: &mut PhysicalPlanBuilder, - plan: PhysicalPlan, + plan: Box, plan_schema: &DataSchema, indexes: &[IndexType], common_types: &[DataType], stat_info: PlanStatsInfo, - ) -> Result { + ) -> Result> { debug_assert!(indexes.len() == common_types.len()); let scalar_items = indexes .iter() @@ -134,46 +137,36 @@ impl PhysicalPlanBuilder { let new_plan = if scalar_items.is_empty() { plan } else { - plan_builder.create_eval_scalar( + Box::new(plan_builder.create_eval_scalar( &crate::plans::EvalScalar { items: scalar_items, }, indexes.to_vec(), - plan, + *plan, stat_info, - )? + )?) }; Ok(new_plan) } - let left_indexes = union_all.pairs.iter().map(|(l, _)| *l).collect::>(); - let right_indexes = union_all.pairs.iter().map(|(_, r)| *r).collect::>(); - let left_plan = cast_plan( - self, - left_plan, - left_schema.as_ref(), - &left_indexes, - &common_types, - stat_info.clone(), - ) - .await?; - let right_plan = cast_plan( - self, - right_plan, - right_schema.as_ref(), - &right_indexes, - &common_types, - stat_info.clone(), - ) - .await?; + for (plan, idxes) in union_children.iter_mut().zip(output_cols.iter()) { + let schema = plan.output_schema()?; + *plan = cast_plan( + self, + plan.clone(), + &schema, + idxes, + &common_types, + stat_info.clone(), + ) + .await?; + } - let pairs = union_all - .pairs - .iter() - .map(|(l, r)| (l.to_string(), r.to_string())) - .collect::>(); - let fields = left_indexes + // Last is the most left union child + let fields = output_cols + .last() + .unwrap() .iter() .zip(&common_types) .map(|(index, ty)| DataField::new(&index.to_string(), ty.clone())) @@ -181,12 +174,75 @@ impl PhysicalPlanBuilder { Ok(PhysicalPlan::UnionAll(UnionAll { plan_id: 0, - left: Box::new(left_plan), - right: Box::new(right_plan), - pairs, + children: union_children, schema: DataSchemaRefExt::create(fields), - + output_cols, stat_info: Some(stat_info), })) } + + #[async_recursion::async_recursion] + async fn flatten_union( + &mut self, + s_expr: &SExpr, + union: &crate::plans::UnionAll, + required: ColumnSet, + union_children: &mut Vec>, + output_cols: &mut Vec>, + ) -> Result<()> { + let right_required = union + .right_cols + .iter() + .fold(required.clone(), |mut acc, v| { + acc.insert(*v); + acc + }); + let right_plan = self.build(s_expr.child(1)?, right_required).await?; + union_children.push(Box::new(right_plan)); + output_cols.push(union.right_cols.clone()); + let left_s_expr = s_expr.child(0)?; + if left_s_expr.plan.rel_op() != RelOp::UnionAll { + let left_required = union.left_cols.iter().fold(required, |mut acc, v| { + acc.insert(*v); + acc + }); + let plan = self.build(left_s_expr, left_required).await?; + union_children.push(Box::new(plan)); + output_cols.push(union.left_cols.clone()); + return Ok(()); + } + self.flatten_union( + left_s_expr, + &left_s_expr.plan().clone().try_into()?, + required, + union_children, + output_cols, + ) + .await + } +} + +fn common_super_types_for_union( + common_types: Vec, + left_plan: &PhysicalPlan, + left_output_cols: &[IndexType], +) -> Result> { + let mut new_common_types = Vec::with_capacity(common_types.len()); + let left_schema = left_plan.output_schema()?; + for (right_type, idx) in common_types.iter().zip(left_output_cols.iter()) { + let left_field = left_schema.field_with_name(&idx.to_string())?; + let common_type = common_super_type( + right_type.clone(), + left_field.data_type().clone(), + &BUILTIN_FUNCTIONS.default_cast_rules, + ); + new_common_types.push(common_type.ok_or_else(|| { + ErrorCode::SemanticError(format!( + "SetOperation's types cannot be matched, left type: {:?}, right type: {:?}", + left_field.data_type(), + right_type, + )) + })?) + } + Ok(new_common_types) } diff --git a/src/query/sql/src/planner/binder/select.rs b/src/query/sql/src/planner/binder/select.rs index 9ab5be85899b3..2579ee933c99f 100644 --- a/src/query/sql/src/planner/binder/select.rs +++ b/src/query/sql/src/planner/binder/select.rs @@ -40,11 +40,8 @@ use databend_common_ast::ast::TableReference; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::Span; -use databend_common_expression::type_check::common_super_type; -use databend_common_expression::types::DataType; use databend_common_expression::ROW_ID_COLUMN_ID; use databend_common_expression::ROW_ID_COL_NAME; -use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_license::license::Feature; use databend_common_license::license_manager::get_license_manager; use derive_visitor::Drive; @@ -56,7 +53,6 @@ use super::Finder; use crate::binder::join::JoinConditions; use crate::binder::project_set::SrfCollector; use crate::binder::scalar_common::split_conjunctions; -use crate::binder::ColumnBindingBuilder; use crate::binder::CteInfo; use crate::binder::ExprContext; use crate::binder::INTERNAL_COLUMN_FACTORY; @@ -66,8 +62,6 @@ use crate::planner::binder::scalar::ScalarBinder; use crate::planner::binder::BindContext; use crate::planner::binder::Binder; use crate::plans::BoundColumnRef; -use crate::plans::CastExpr; -use crate::plans::EvalScalar; use crate::plans::Filter; use crate::plans::JoinType; use crate::plans::ScalarExpr; @@ -79,7 +73,6 @@ use crate::ColumnEntry; use crate::IndexType; use crate::UdfRewriter; use crate::VirtualColumnRewriter; -use crate::Visibility; // A normalized IR for `SELECT` clause. #[derive(Debug, Default)] @@ -523,7 +516,6 @@ impl Binder { } (SetOperator::Union, true) => self.bind_union( left.span(), - right.span(), left_bind_context, right_bind_context, left_expr, @@ -532,7 +524,6 @@ impl Binder { ), (SetOperator::Union, false) => self.bind_union( left.span(), - right.span(), left_bind_context, right_bind_context, left_expr, @@ -549,50 +540,18 @@ impl Binder { pub fn bind_union( &mut self, left_span: Span, - right_span: Span, left_context: BindContext, right_context: BindContext, left_expr: SExpr, right_expr: SExpr, distinct: bool, ) -> Result<(SExpr, BindContext)> { - let mut coercion_types = Vec::with_capacity(left_context.columns.len()); - for (left_col, right_col) in left_context - .columns - .iter() - .zip(right_context.columns.iter()) - { - if left_col.data_type != right_col.data_type { - if let Some(data_type) = common_super_type( - *left_col.data_type.clone(), - *right_col.data_type.clone(), - &BUILTIN_FUNCTIONS.default_cast_rules, - ) { - coercion_types.push(data_type); - } else { - return Err(ErrorCode::SemanticError(format!( - "SetOperation's types cannot be matched, left column {:?}, type: {:?}, right column {:?}, type: {:?}", - left_col.column_name, - left_col.data_type, - right_col.column_name, - right_col.data_type - ))); - } - } else { - coercion_types.push(*left_col.data_type.clone()); - } - } - let (new_bind_context, pairs, left_expr, right_expr) = self.coercion_union_type( - left_span, - right_span, - left_context, - right_context, - left_expr, - right_expr, - coercion_types, - )?; - - let union_plan = UnionAll { pairs }; + let left_cols = left_context.columns.iter().map(|l| l.index).collect(); + let right_cols = right_context.columns.iter().map(|r| r.index).collect(); + let union_plan = UnionAll { + left_cols, + right_cols, + }; let mut new_expr = SExpr::create_binary( Arc::new(union_plan.into()), Arc::new(left_expr), @@ -602,14 +561,14 @@ impl Binder { if distinct { new_expr = self.bind_distinct( left_span, - &new_bind_context, - new_bind_context.all_column_bindings(), + &left_context, + left_context.all_column_bindings(), &mut HashMap::new(), new_expr, )?; } - Ok((new_expr, new_bind_context)) + Ok((new_expr, left_context)) } pub fn bind_intersect( @@ -703,114 +662,6 @@ impl Binder { Ok((s_expr, left_context)) } - #[allow(clippy::type_complexity)] - #[allow(clippy::too_many_arguments)] - fn coercion_union_type( - &self, - left_span: Span, - right_span: Span, - left_bind_context: BindContext, - right_bind_context: BindContext, - mut left_expr: SExpr, - mut right_expr: SExpr, - coercion_types: Vec, - ) -> Result<(BindContext, Vec<(IndexType, IndexType)>, SExpr, SExpr)> { - let mut left_scalar_items = Vec::with_capacity(left_bind_context.columns.len()); - let mut right_scalar_items = Vec::with_capacity(right_bind_context.columns.len()); - let mut new_bind_context = BindContext::new(); - let mut pairs = Vec::with_capacity(left_bind_context.columns.len()); - for (idx, (left_col, right_col)) in left_bind_context - .columns - .iter() - .zip(right_bind_context.columns.iter()) - .enumerate() - { - let left_index = if *left_col.data_type != coercion_types[idx] { - let new_column_index = self - .metadata - .write() - .add_derived_column(left_col.column_name.clone(), coercion_types[idx].clone()); - let column_binding = ColumnBindingBuilder::new( - left_col.column_name.clone(), - new_column_index, - Box::new(coercion_types[idx].clone()), - Visibility::Visible, - ) - .build(); - let left_coercion_expr = CastExpr { - span: left_span, - is_try: false, - argument: Box::new( - BoundColumnRef { - span: left_span, - column: left_col.clone(), - } - .into(), - ), - target_type: Box::new(coercion_types[idx].clone()), - }; - left_scalar_items.push(ScalarItem { - scalar: left_coercion_expr.into(), - index: new_column_index, - }); - new_bind_context.add_column_binding(column_binding); - new_column_index - } else { - new_bind_context.add_column_binding(left_col.clone()); - left_col.index - }; - let right_index = if *right_col.data_type != coercion_types[idx] { - let new_column_index = self - .metadata - .write() - .add_derived_column(right_col.column_name.clone(), coercion_types[idx].clone()); - let right_coercion_expr = CastExpr { - span: right_span, - is_try: false, - argument: Box::new( - BoundColumnRef { - span: right_span, - column: right_col.clone(), - } - .into(), - ), - target_type: Box::new(coercion_types[idx].clone()), - }; - right_scalar_items.push(ScalarItem { - scalar: right_coercion_expr.into(), - index: new_column_index, - }); - new_column_index - } else { - right_col.index - }; - pairs.push((left_index, right_index)); - } - if !left_scalar_items.is_empty() { - left_expr = SExpr::create_unary( - Arc::new( - EvalScalar { - items: left_scalar_items, - } - .into(), - ), - Arc::new(left_expr), - ); - } - if !right_scalar_items.is_empty() { - right_expr = SExpr::create_unary( - Arc::new( - EvalScalar { - items: right_scalar_items, - } - .into(), - ), - Arc::new(right_expr), - ); - } - Ok((new_bind_context, pairs, left_expr, right_expr)) - } - #[allow(clippy::too_many_arguments)] fn analyze_lazy_materialization( &self, diff --git a/src/query/sql/src/planner/dataframe.rs b/src/query/sql/src/planner/dataframe.rs index d675aed2c429f..bd00070e7a1df 100644 --- a/src/query/sql/src/planner/dataframe.rs +++ b/src/query/sql/src/planner/dataframe.rs @@ -510,7 +510,6 @@ impl Dataframe { pub async fn union(mut self, dataframe: Dataframe) -> Result { let (s_expr, bind_context) = self.binder.bind_union( - None, None, self.bind_context, dataframe.bind_context, @@ -525,7 +524,6 @@ impl Dataframe { pub async fn union_distinct(mut self, dataframe: Dataframe) -> Result { let (s_expr, bind_context) = self.binder.bind_union( - None, None, self.bind_context, dataframe.bind_context, diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_union.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_union.rs index ae9f77c8b0fa5..7c1e1208a23f2 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_union.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_union.rs @@ -74,8 +74,12 @@ impl Rule for RulePushDownFilterUnion { let union: UnionAll = union_s_expr.plan().clone().try_into()?; // Create a filter which matches union's right child. - let index_pairs: HashMap = - union.pairs.iter().map(|pair| (pair.0, pair.1)).collect(); + let index_pairs: HashMap = union + .left_cols + .iter() + .zip(union.right_cols.iter()) + .map(|(l, r)| (*l, *r)) + .collect(); let new_predicates = filter .predicates .iter() diff --git a/src/query/sql/src/planner/plans/union_all.rs b/src/query/sql/src/planner/plans/union_all.rs index 9b6ecc7d578c9..7f8bc95081427 100644 --- a/src/query/sql/src/planner/plans/union_all.rs +++ b/src/query/sql/src/planner/plans/union_all.rs @@ -31,14 +31,16 @@ use crate::IndexType; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct UnionAll { - // Pairs of unioned columns - pub pairs: Vec<(IndexType, IndexType)>, + // Left output columns + pub left_cols: Vec, + // Right output columns + pub right_cols: Vec, } impl UnionAll { pub fn used_columns(&self) -> Result { let mut used_columns = ColumnSet::new(); - for (left, right) in &self.pairs { + for (left, right) in self.left_cols.iter().zip(self.right_cols.iter()) { used_columns.insert(*left); used_columns.insert(*right); }