Skip to content

feat: support using fragment forest to execute additional broadcast operations. #17872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
71b069b
update
SkyFan2002 Apr 17, 2025
c990993
avoid committing multiple times
SkyFan2002 Apr 18, 2025
7aa4421
Merge remote-tracking branch 'upstream/main' into shuffle_rf
SkyFan2002 Apr 18, 2025
59be6a3
fix merge
SkyFan2002 Apr 18, 2025
8dafec8
fix fragment type
SkyFan2002 Apr 18, 2025
0060d46
rm unused modify
SkyFan2002 Apr 18, 2025
097f94c
fix
SkyFan2002 Apr 18, 2025
413fd1c
improve err msg
SkyFan2002 Apr 21, 2025
45dc770
fix build pipeline
SkyFan2002 Apr 22, 2025
7939cad
fix exhchange
SkyFan2002 Apr 22, 2025
509fa2d
refactor
SkyFan2002 Apr 22, 2025
5711316
fix root pipeline
SkyFan2002 Apr 24, 2025
baa0f22
fix pipeline
SkyFan2002 Apr 24, 2025
0c2e757
improve error msg
SkyFan2002 Apr 24, 2025
ec1c88b
fix serialize block
SkyFan2002 Apr 24, 2025
aa7e5af
fix sink
SkyFan2002 Apr 24, 2025
cbbfb38
add log
SkyFan2002 Apr 25, 2025
ef53984
fix send
SkyFan2002 Apr 25, 2025
95f767b
merge rf
SkyFan2002 Apr 27, 2025
2a808bf
Merge branch 'main' into shuffle_rf
SkyFan2002 Apr 27, 2025
f668d1c
fix
SkyFan2002 Apr 28, 2025
10e0fae
fix
SkyFan2002 Apr 28, 2025
00b834a
fix
SkyFan2002 Apr 28, 2025
648c9f8
Merge remote-tracking branch 'upstream/main' into shuffle_rf
SkyFan2002 Apr 28, 2025
beff896
fix conflict
SkyFan2002 Apr 28, 2025
b8f9544
update
SkyFan2002 Apr 30, 2025
9d7e431
Merge remote-tracking branch 'upstream/main' into cluster
SkyFan2002 Apr 30, 2025
6a8a352
fix
SkyFan2002 Apr 30, 2025
feef21c
rm unused modify
SkyFan2002 Apr 30, 2025
5d5c0ed
make lint
SkyFan2002 Apr 30, 2025
0357265
fix
SkyFan2002 Apr 30, 2025
f2a9107
rm log
SkyFan2002 May 2, 2025
e05b400
rm empty impl
SkyFan2002 May 2, 2025
1aa0cc6
fix
SkyFan2002 May 30, 2025
b28877e
add comment
SkyFan2002 Apr 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,15 @@ pub trait TableContext: Send + Sync {
fn set_pruned_partitions_stats(&self, _partitions: PartStatistics) {
unimplemented!()
}

/// Calling this function will automatically create a pipeline for broadcast data in `build_distributed_pipeline()`
///
/// The returned id can be used to get sender and receiver for broadcasting data.
fn get_next_broadcast_id(&self) -> u32;

fn reset_broadcast_id(&self) {
unimplemented!()
}
}

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;
Expand Down
39 changes: 39 additions & 0 deletions src/query/service/src/pipelines/builders/builder_broadcast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::Result;
use databend_common_sql::executor::physical_plans::BroadcastSink;
use databend_common_sql::executor::physical_plans::BroadcastSource;

use crate::pipelines::processors::transforms::BroadcastSinkProcessor;
use crate::pipelines::processors::transforms::BroadcastSourceProcessor;
use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
pub(crate) fn build_broadcast_source(&mut self, source: &BroadcastSource) -> Result<()> {
let receiver = self.ctx.broadcast_source_receiver(source.broadcast_id);
self.main_pipeline.add_source(
|output| BroadcastSourceProcessor::create(self.ctx.clone(), receiver.clone(), output),
1,
)
}

pub(crate) fn build_broadcast_sink(&mut self, sink: &BroadcastSink) -> Result<()> {
self.build_pipeline(&sink.input)?;
self.main_pipeline.resize(1, true)?;
self.main_pipeline.add_sink(|input| {
BroadcastSinkProcessor::create(input, self.ctx.broadcast_sink_sender(sink.broadcast_id))
})
}
}
1 change: 1 addition & 0 deletions src/query/service/src/pipelines/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod builder_add_stream_column;
mod builder_aggregate;
mod builder_append_table;
mod builder_async_function;
mod builder_broadcast;
mod builder_column_mutation;
mod builder_commit;
mod builder_compact;
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ impl PipelineBuilder {
}
PhysicalPlan::Shuffle(shuffle) => self.build_shuffle(shuffle),
PhysicalPlan::Duplicate(duplicate) => self.build_duplicate(duplicate),
PhysicalPlan::BroadcastSource(source) => self.build_broadcast_source(source),
PhysicalPlan::BroadcastSink(sink) => self.build_broadcast_sink(sink),

// ==============================
// 4. Data Modification Operations
Expand Down
98 changes: 98 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/broadcast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_channel::Receiver;
use async_channel::Sender;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfoPtr;
use databend_common_expression::DataBlock;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sinks::AsyncSink;
use databend_common_pipeline_sinks::AsyncSinker;
use databend_common_pipeline_sources::AsyncSource;
use databend_common_pipeline_sources::AsyncSourcer;

pub struct BroadcastSourceProcessor {
pub receiver: Receiver<BlockMetaInfoPtr>,
}

impl BroadcastSourceProcessor {
pub fn create(
ctx: Arc<dyn TableContext>,
receiver: Receiver<BlockMetaInfoPtr>,
output_port: Arc<OutputPort>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx, output_port, Self { receiver })
}
}

#[async_trait::async_trait]
impl AsyncSource for BroadcastSourceProcessor {
const NAME: &'static str = "BroadcastSource";
const SKIP_EMPTY_DATA_BLOCK: bool = false;

#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
let received = self.receiver.recv().await;
match received {
Ok(meta) => Ok(Some(DataBlock::empty_with_meta(meta))),
Err(_) => {
// The channel is closed, we should return None to stop generating
Ok(None)
}
}
}
}

pub struct BroadcastSinkProcessor {
received: Vec<BlockMetaInfoPtr>,
sender: Sender<BlockMetaInfoPtr>,
}

impl BroadcastSinkProcessor {
pub fn create(input: Arc<InputPort>, sender: Sender<BlockMetaInfoPtr>) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(AsyncSinker::create(input, Self {
received: vec![],
sender,
})))
}
}

#[async_trait::async_trait]
impl AsyncSink for BroadcastSinkProcessor {
const NAME: &'static str = "BroadcastSink";

async fn on_finish(&mut self) -> Result<()> {
self.sender.close();
Ok(())
}

async fn consume(&mut self, mut data_block: DataBlock) -> Result<bool> {
let meta = data_block
.take_meta()
.ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to BroadcastMeta"))?;
log::info!("BroadcastSinkProcessor recv meta: {:?}", meta);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need remove.

self.sender
.send(meta)
.await
.map_err(|_| ErrorCode::Internal("BroadcastSinkProcessor send error"))?;
Ok(false)
}
}
4 changes: 4 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

pub mod aggregator;
#[allow(dead_code)]
mod broadcast;
mod hash_join;
pub(crate) mod range_join;
mod runtime_pool;
Expand Down Expand Up @@ -40,6 +42,8 @@ mod transform_udf_script;
mod transform_udf_server;
mod window;

pub use broadcast::BroadcastSinkProcessor;
pub use broadcast::BroadcastSourceProcessor;
pub use hash_join::*;
pub use transform_add_computed_columns::TransformAddComputedColumns;
pub use transform_add_const_columns::TransformAddConstColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ async fn create_memory_table_for_cte_scan(
| PhysicalPlan::ChunkFillAndReorder(_)
| PhysicalPlan::ChunkAppendData(_)
| PhysicalPlan::ChunkMerge(_)
| PhysicalPlan::ChunkCommitInsert(_) => {}
| PhysicalPlan::ChunkCommitInsert(_)
| PhysicalPlan::BroadcastSource(_)
| PhysicalPlan::BroadcastSink(_) => {}
}
Ok(())
}
7 changes: 6 additions & 1 deletion src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,13 @@ impl Fragmenter {

pub fn build_fragment(mut self, plan: &PhysicalPlan) -> Result<PlanFragment> {
let root = self.replace(plan)?;
let fragment_type = match plan {
PhysicalPlan::BroadcastSink(_) => FragmentType::Intermediate,
_ => FragmentType::Root,
};
let mut root_fragment = PlanFragment {
plan: root,
fragment_type: FragmentType::Root,
fragment_type,
fragment_id: self.ctx.get_fragment_id(),
exchange: None,
query_id: self.query_id.clone(),
Expand Down Expand Up @@ -218,6 +222,7 @@ impl PhysicalPlanReplacer for Fragmenter {
fragments.append(&mut self.fragments);
let probe_input = self.replace(plan.probe.as_ref())?;
fragments.append(&mut self.fragments);

self.fragments = fragments;

Ok(PhysicalPlan::HashJoin(HashJoin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,16 @@ impl QueryFragmentsActions {
self.ctx.get_cluster().local_id()
}

pub fn get_root_actions(&self) -> Result<&QueryFragmentActions> {
self.fragments_actions.last().ok_or_else(|| {
ErrorCode::Internal(
"Logical error, call get_root_actions in empty QueryFragmentsActions",
)
})
pub fn get_root_fragment_ids(&self) -> Result<Vec<usize>> {
let mut fragment_ids = Vec::new();
for fragment_actions in &self.fragments_actions {
let plan = &fragment_actions.fragment_actions[0].physical_plan;
if !matches!(plan, PhysicalPlan::ExchangeSink(_)) {
fragment_ids.push(fragment_actions.fragment_id);
}
}

Ok(fragment_ids)
}

pub fn pop_root_actions(&mut self) -> Option<QueryFragmentActions> {
Expand Down
13 changes: 9 additions & 4 deletions src/query/service/src/schedulers/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_sql::executor::build_broadcast_plans;
use databend_common_sql::planner::QueryExecutor;
use databend_common_sql::Planner;
use futures_util::TryStreamExt;
Expand Down Expand Up @@ -99,11 +100,15 @@ pub async fn build_distributed_pipeline(
ctx: &Arc<QueryContext>,
plan: &PhysicalPlan,
) -> Result<PipelineBuildResult> {
let fragmenter = Fragmenter::try_create(ctx.clone())?;

let root_fragment = fragmenter.build_fragment(plan)?;
let mut fragments_actions = QueryFragmentsActions::create(ctx.clone());
root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?;
for plan in build_broadcast_plans(ctx.as_ref())?
.iter()
.chain(std::iter::once(plan))
{
let fragmenter = Fragmenter::try_create(ctx.clone())?;
let root_fragment = fragmenter.build_fragment(plan)?;
root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?;
}

let exchange_manager = ctx.get_exchange_manager();

Expand Down
Loading
Loading