Skip to content

Commit 2c13262

Browse files
committed
update
1 parent 18cb693 commit 2c13262

File tree

5 files changed

+62
-50
lines changed

5 files changed

+62
-50
lines changed

src/query/service/src/pipelines/builders/builder_union_all.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,15 @@ use crate::sessions::QueryContext;
2727

2828
impl PipelineBuilder {
2929
pub fn build_union_all(&mut self, union_all: &UnionAll) -> Result<()> {
30-
self.build_pipeline(&union_all.children[0])?;
30+
self.build_pipeline(union_all.children.last().unwrap())?;
3131
let mut remain_children_receivers = vec![];
32-
for (idx, remaining_child) in union_all.children.iter().skip(1).enumerate() {
33-
remain_children_receivers.push((idx + 1, self.expand_union_all(remaining_child)?));
32+
for (idx, remaining_child) in union_all
33+
.children
34+
.iter()
35+
.take(union_all.children.len() - 1)
36+
.enumerate()
37+
{
38+
remain_children_receivers.push((idx, self.expand_union_all(remaining_child)?));
3439
}
3540
let schemas: Vec<DataSchemaRef> = union_all
3641
.children

src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,35 @@ impl TransformMergeBlock {
6969
let columns = if let Some(idx) = idx {
7070
self.check_type(idx, &block)?
7171
} else {
72-
self.output_cols[0]
72+
self.output_cols
73+
.last()
74+
.unwrap()
7375
.iter()
7476
.map(|idx| {
7577
Ok(block
76-
.get_by_offset(self.schemas[0].index_of(&idx.to_string())?)
78+
.get_by_offset(self.schemas.last().unwrap().index_of(&idx.to_string())?)
7779
.clone())
7880
})
7981
.collect::<Result<Vec<_>>>()?
8082
};
81-
Ok(DataBlock::new(columns, num_rows))
83+
let block = DataBlock::new(columns, num_rows);
84+
Ok(block)
8285
}
8386

8487
fn check_type(&self, idx: usize, block: &DataBlock) -> Result<Vec<BlockEntry>> {
8588
let mut columns = vec![];
86-
for (left_idx, right_idx) in self.output_cols[0].iter().zip(self.output_cols[idx].iter()) {
87-
let left_field = self.schemas[0].field_with_name(&left_idx.to_string())?;
89+
for (left_idx, right_idx) in self
90+
.output_cols
91+
.last()
92+
.unwrap()
93+
.iter()
94+
.zip(self.output_cols[idx].iter())
95+
{
96+
let left_field = self
97+
.schemas
98+
.last()
99+
.unwrap()
100+
.field_with_name(&left_idx.to_string())?;
88101
let left_data_type = left_field.data_type();
89102

90103
let right_field = self.schemas[idx].field_with_name(&right_idx.to_string())?;
@@ -93,6 +106,7 @@ impl TransformMergeBlock {
93106
let offset = self.schemas[idx].index_of(&right_idx.to_string())?;
94107
if left_data_type == right_data_type {
95108
columns.push(block.get_by_offset(offset).clone());
109+
continue;
96110
}
97111

98112
if left_data_type.remove_nullable() == right_data_type.remove_nullable() {

src/query/sql/src/executor/format.rs

-1
Original file line numberDiff line numberDiff line change
@@ -974,7 +974,6 @@ fn union_all_to_format_tree(
974974
}
975975

976976
append_profile_info(&mut children, profs, plan.plan_id);
977-
978977
for child in plan.children.iter() {
979978
children.push(to_format_tree(child, metadata, profs)?);
980979
}

src/query/sql/src/executor/physical_plans/physical_union_all.rs

+35-34
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use crate::executor::explain::PlanStatsInfo;
2727
use crate::executor::PhysicalPlan;
2828
use crate::executor::PhysicalPlanBuilder;
2929
use crate::optimizer::SExpr;
30-
use crate::optimizer::StatInfo;
3130
use crate::plans::BoundColumnRef;
3231
use crate::plans::Operator;
3332
use crate::plans::RelOp;
@@ -75,12 +74,10 @@ impl PhysicalPlanBuilder {
7574
&mut output_cols,
7675
)
7776
.await?;
78-
79-
debug_assert!(union_children.len() >= 2);
80-
let left_schema = union_children[0].output_schema()?;
81-
let right_schema = union_children[1].output_schema()?;
77+
let right_schema = union_children[0].output_schema()?;
78+
let left_schema = union_children[1].output_schema()?;
8279
let mut common_types = vec![];
83-
for (l, r) in output_cols[0].iter().zip(output_cols[1].iter()) {
80+
for (r, l) in output_cols[0].iter().zip(output_cols[1].iter()) {
8481
let left_field = left_schema.field_with_name(&l.to_string())?;
8582
let right_field = right_schema.field_with_name(&r.to_string())?;
8683

@@ -100,10 +97,8 @@ impl PhysicalPlanBuilder {
10097
})?)
10198
}
10299

103-
for (right_plan, right_output_cols) in union_children.iter().zip(output_cols.iter()).skip(2)
104-
{
105-
common_types =
106-
common_super_types_for_union(common_types, right_plan, right_output_cols)?;
100+
for (left_plan, left_output_cols) in union_children.iter().zip(output_cols.iter()).skip(2) {
101+
common_types = common_super_types_for_union(common_types, left_plan, left_output_cols)?;
107102
}
108103

109104
async fn cast_plan(
@@ -168,7 +163,10 @@ impl PhysicalPlanBuilder {
168163
.await?;
169164
}
170165

171-
let fields = output_cols[0]
166+
// Last is the most left union child
167+
let fields = output_cols
168+
.last()
169+
.unwrap()
172170
.iter()
173171
.zip(&common_types)
174172
.map(|(index, ty)| DataField::new(&index.to_string(), ty.clone()))
@@ -192,27 +190,30 @@ impl PhysicalPlanBuilder {
192190
union_children: &mut Vec<Box<PhysicalPlan>>,
193191
output_cols: &mut Vec<Vec<IndexType>>,
194192
) -> Result<()> {
195-
let left_required = union.left_cols.iter().fold(required.clone(), |mut acc, v| {
196-
acc.insert(*v);
197-
acc
198-
});
199-
let left_plan = self.build(s_expr.child(0)?, left_required).await?;
200-
union_children.push(Box::new(left_plan));
201-
output_cols.push(union.left_cols.clone());
202-
let right_s_expr = s_expr.child(1)?;
203-
if right_s_expr.plan.rel_op() != RelOp::UnionAll {
204-
let right_required = union.right_cols.iter().fold(required, |mut acc, v| {
193+
let right_required = union
194+
.right_cols
195+
.iter()
196+
.fold(required.clone(), |mut acc, v| {
197+
acc.insert(*v);
198+
acc
199+
});
200+
let right_plan = self.build(s_expr.child(1)?, right_required).await?;
201+
union_children.push(Box::new(right_plan));
202+
output_cols.push(union.right_cols.clone());
203+
let left_s_expr = s_expr.child(0)?;
204+
if left_s_expr.plan.rel_op() != RelOp::UnionAll {
205+
let left_required = union.left_cols.iter().fold(required, |mut acc, v| {
205206
acc.insert(*v);
206207
acc
207208
});
208-
let plan = self.build(right_s_expr, right_required).await?;
209+
let plan = self.build(left_s_expr, left_required).await?;
209210
union_children.push(Box::new(plan));
210-
output_cols.push(union.right_cols.clone());
211+
output_cols.push(union.left_cols.clone());
211212
return Ok(());
212213
}
213214
self.flatten_union(
214-
right_s_expr,
215-
&right_s_expr.plan().clone().try_into()?,
215+
left_s_expr,
216+
&left_s_expr.plan().clone().try_into()?,
216217
required,
217218
union_children,
218219
output_cols,
@@ -223,23 +224,23 @@ impl PhysicalPlanBuilder {
223224

224225
fn common_super_types_for_union(
225226
common_types: Vec<DataType>,
226-
right_plan: &PhysicalPlan,
227-
right_output_cols: &[IndexType],
227+
left_plan: &PhysicalPlan,
228+
left_output_cols: &[IndexType],
228229
) -> Result<Vec<DataType>> {
229230
let mut new_common_types = Vec::with_capacity(common_types.len());
230-
let right_schema = right_plan.output_schema()?;
231-
for (left_type, idx) in common_types.iter().zip(right_output_cols.iter()) {
232-
let right_field = right_schema.field_with_name(&idx.to_string())?;
231+
let left_schema = left_plan.output_schema()?;
232+
for (right_type, idx) in common_types.iter().zip(left_output_cols.iter()) {
233+
let left_field = left_schema.field_with_name(&idx.to_string())?;
233234
let common_type = common_super_type(
234-
left_type.clone(),
235-
right_field.data_type().clone(),
235+
right_type.clone(),
236+
left_field.data_type().clone(),
236237
&BUILTIN_FUNCTIONS.default_cast_rules,
237238
);
238239
new_common_types.push(common_type.ok_or_else(|| {
239240
ErrorCode::SemanticError(format!(
240241
"SetOperation's types cannot be matched, left type: {:?}, right type: {:?}",
241-
left_type,
242-
right_field.data_type()
242+
left_field.data_type(),
243+
right_type,
243244
))
244245
})?)
245246
}

src/query/sql/src/planner/binder/select.rs

-7
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,15 @@ use databend_common_ast::Visitor;
3939
use databend_common_exception::ErrorCode;
4040
use databend_common_exception::Result;
4141
use databend_common_exception::Span;
42-
use databend_common_expression::type_check::common_super_type;
43-
use databend_common_expression::types::DataType;
4442
use databend_common_expression::ROW_ID_COLUMN_ID;
4543
use databend_common_expression::ROW_ID_COL_NAME;
46-
use databend_common_functions::BUILTIN_FUNCTIONS;
4744
use log::warn;
4845

4946
use super::sort::OrderItem;
5047
use super::Finder;
5148
use crate::binder::join::JoinConditions;
5249
use crate::binder::project_set::SrfCollector;
5350
use crate::binder::scalar_common::split_conjunctions;
54-
use crate::binder::ColumnBindingBuilder;
5551
use crate::binder::CteInfo;
5652
use crate::binder::ExprContext;
5753
use crate::binder::INTERNAL_COLUMN_FACTORY;
@@ -61,8 +57,6 @@ use crate::planner::binder::scalar::ScalarBinder;
6157
use crate::planner::binder::BindContext;
6258
use crate::planner::binder::Binder;
6359
use crate::plans::BoundColumnRef;
64-
use crate::plans::CastExpr;
65-
use crate::plans::EvalScalar;
6660
use crate::plans::Filter;
6761
use crate::plans::JoinType;
6862
use crate::plans::ScalarExpr;
@@ -74,7 +68,6 @@ use crate::ColumnEntry;
7468
use crate::IndexType;
7569
use crate::UdfRewriter;
7670
use crate::VirtualColumnRewriter;
77-
use crate::Visibility;
7871

7972
// A normalized IR for `SELECT` clause.
8073
#[derive(Debug, Default)]

0 commit comments

Comments
 (0)