Skip to content

feat: support runtime filter in shuffle join #17803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 13 additions & 35 deletions src/query/catalog/src/runtime_filter_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,54 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use databend_common_base::base::tokio::sync::watch;
use databend_common_base::base::tokio::sync::watch::Receiver;
use databend_common_base::base::tokio::sync::watch::Sender;
use databend_common_expression::Expr;
use xorf::BinaryFuse16;

#[derive(Clone, Debug, Default)]
pub struct RuntimeFilterInfo {
inlist: Vec<Expr<String>>,
min_max: Vec<Expr<String>>,
bloom: Vec<(String, BinaryFuse16)>,
pub struct RuntimeFiltersForScan {
pub inlist: HashMap<usize, Expr<String>>,
pub min_max: HashMap<usize, Expr<String>>,
pub bloom: HashMap<usize, (String, BinaryFuse16)>,
}

impl RuntimeFilterInfo {
pub fn add_inlist(&mut self, expr: Expr<String>) {
self.inlist.push(expr);
}

pub fn add_bloom(&mut self, bloom: (String, BinaryFuse16)) {
self.bloom.push(bloom);
}

pub fn add_min_max(&mut self, expr: Expr<String>) {
self.min_max.push(expr);
}

pub fn get_inlist(&self) -> &Vec<Expr<String>> {
&self.inlist
}

pub fn get_bloom(&self) -> &Vec<(String, BinaryFuse16)> {
&self.bloom
}

pub fn get_min_max(&self) -> &Vec<Expr<String>> {
&self.min_max
}

pub fn blooms(self) -> Vec<(String, BinaryFuse16)> {
self.bloom
impl RuntimeFiltersForScan {
pub fn add_inlist(&mut self, rf_id: usize, expr: Expr<String>) {
self.inlist.insert(rf_id, expr);
}

pub fn inlists(self) -> Vec<Expr<String>> {
self.inlist
pub fn add_bloom(&mut self, rf_id: usize, bloom: (String, BinaryFuse16)) {
self.bloom.insert(rf_id, bloom);
}

pub fn min_maxs(self) -> Vec<Expr<String>> {
self.min_max
pub fn add_min_max(&mut self, rf_id: usize, expr: Expr<String>) {
self.min_max.insert(rf_id, expr);
}

pub fn is_empty(&self) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ use crate::plan::PartInfoPtr;
use crate::plan::PartStatistics;
use crate::plan::Partitions;
use crate::query_kind::QueryKind;
use crate::runtime_filter_info::RuntimeFilterInfo;
use crate::runtime_filter_info::RuntimeFilterReady;
use crate::runtime_filter_info::RuntimeFiltersForScan;
use crate::statistics::data_cache_statistics::DataCacheMetrics;
use crate::table::Table;

Expand Down Expand Up @@ -324,7 +324,7 @@ pub trait TableContext: Send + Sync {

fn get_query_profiles(&self) -> Vec<PlanProfile>;

fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo));
fn set_runtime_filter(&self, filters: (usize, RuntimeFiltersForScan));

fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc<RuntimeFilterReady>);

Expand Down
4 changes: 3 additions & 1 deletion src/query/pipeline/sources/src/async_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {
return Ok(());
}
match self.inner.generate().await? {
None => self.is_finish = true,
None => {
self.is_finish = true;
}
Some(data_block) => {
if !data_block.is_empty() {
let progress_values = ProgressValues {
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::pipelines::processors::transforms::range_join::TransformRangeJoinLeft
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinRight;
use crate::pipelines::processors::transforms::HashJoinBuildState;
use crate::pipelines::processors::transforms::HashJoinProbeState;
use crate::pipelines::processors::transforms::RuntimeFilterChannels;
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
use crate::pipelines::processors::HashJoinDesc;
Expand Down Expand Up @@ -102,6 +103,7 @@ impl PipelineBuilder {
self.hash_join_states
.insert(build_cache_index, state.clone());
}

self.expand_build_side_pipeline(&join.build, join, state.clone())?;
self.build_join_probe(join, state)?;

Expand Down Expand Up @@ -153,6 +155,13 @@ impl PipelineBuilder {
&hash_join_plan.build_projections,
join_state.clone(),
output_len,
hash_join_plan
.runtime_filter_plan
.as_ref()
.map(|_| RuntimeFilterChannels {
rf_src_send: self.ctx.rf_src_send(hash_join_plan.join_id),
rf_sink_recv: self.ctx.rf_sink_recv(hash_join_plan.join_id),
}),
)?;
build_state.add_runtime_filter_ready();

Expand Down
46 changes: 46 additions & 0 deletions src/query/service/src/pipelines/builders/builder_runtime_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::Result;
use databend_common_sql::executor::physical_plans::RuntimeFilterSink;
use databend_common_sql::executor::physical_plans::RuntimeFilterSource;
use databend_common_storages_fuse::TableContext;

use crate::pipelines::processors::transforms::RuntimeFilterSinkProcessor;
use crate::pipelines::processors::transforms::RuntimeFilterSourceProcessor;
use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
pub(crate) fn build_runtime_filter_source(
&mut self,
_source: &RuntimeFilterSource,
) -> Result<()> {
let receiver = self.ctx.rf_src_recv(_source.join_id);
self.main_pipeline.add_source(
|output| {
RuntimeFilterSourceProcessor::create(self.ctx.clone(), receiver.clone(), output)
},
1,
)
}

pub(crate) fn build_runtime_filter_sink(&mut self, sink: &RuntimeFilterSink) -> Result<()> {
self.build_pipeline(&sink.input)?;
self.main_pipeline.resize(1, true)?;
let node_num = self.ctx.get_cluster().nodes.len();
self.main_pipeline.add_sink(|input| {
RuntimeFilterSinkProcessor::create(input, node_num, self.ctx.rf_sink_send(sink.join_id))
})
}
}
1 change: 1 addition & 0 deletions src/query/service/src/pipelines/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod builder_recluster;
mod builder_recursive_cte;
mod builder_replace_into;
mod builder_row_fetch;
mod builder_runtime_filter;
mod builder_scalar;
mod builder_scan;
mod builder_sort;
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ impl PipelineBuilder {
PhysicalPlan::ColumnMutation(column_mutation) => {
self.build_column_mutation(column_mutation)
}
PhysicalPlan::RuntimeFilterSource(source) => self.build_runtime_filter_source(source),
PhysicalPlan::RuntimeFilterSink(sink) => self.build_runtime_filter_sink(sink),
}?;

self.is_exchange_neighbor = is_exchange_neighbor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use databend_common_expression::RemoteExpr;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_sql::executor::cast_expr_to_non_null_boolean;
use databend_common_sql::executor::physical_plans::HashJoin;
use databend_common_sql::executor::PhysicalRuntimeFilter;
use databend_common_sql::executor::PhysicalRuntimeFilters;
use databend_common_sql::executor::RemoteRuntimeFilterDesc;
use databend_common_sql::executor::RemoteRuntimeFiltersDesc;
use parking_lot::RwLock;

use crate::sql::plans::JoinType;
Expand Down Expand Up @@ -54,7 +54,7 @@ pub struct HashJoinDesc {
}

pub struct RuntimeFilterDesc {
pub _id: usize,
pub id: usize,
pub build_key: Expr,
pub probe_key: Expr<String>,
pub scan_id: usize,
Expand All @@ -67,18 +67,18 @@ pub struct RuntimeFiltersDesc {
pub filters: Vec<RuntimeFilterDesc>,
}

impl From<&PhysicalRuntimeFilters> for RuntimeFiltersDesc {
fn from(runtime_filter: &PhysicalRuntimeFilters) -> Self {
impl From<&RemoteRuntimeFiltersDesc> for RuntimeFiltersDesc {
fn from(runtime_filter: &RemoteRuntimeFiltersDesc) -> Self {
Self {
filters: runtime_filter.filters.iter().map(|rf| rf.into()).collect(),
}
}
}

impl From<&PhysicalRuntimeFilter> for RuntimeFilterDesc {
fn from(runtime_filter: &PhysicalRuntimeFilter) -> Self {
impl From<&RemoteRuntimeFilterDesc> for RuntimeFilterDesc {
fn from(runtime_filter: &RemoteRuntimeFilterDesc) -> Self {
Self {
_id: runtime_filter.id,
id: runtime_filter.id,
build_key: runtime_filter.build_key.as_expr(&BUILTIN_FUNCTIONS),
probe_key: runtime_filter.probe_key.as_expr(&BUILTIN_FUNCTIONS),
scan_id: runtime_filter.scan_id,
Expand Down Expand Up @@ -117,7 +117,7 @@ impl HashJoinDesc {
from_correlated_subquery: join.from_correlated_subquery,
broadcast: join.broadcast,
single_to_inner: join.single_to_inner.clone(),
runtime_filter: (&join.runtime_filter).into(),
runtime_filter: (&join.runtime_filter_desc).into(),
})
}

Expand Down
Loading
Loading