diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 770e19d60b241..de45201b3a818 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -418,6 +418,15 @@ pub trait TableContext: Send + Sync { fn set_pruned_partitions_stats(&self, _partitions: PartStatistics) { unimplemented!() } + + /// Calling this function will automatically create a pipeline for broadcast data in `build_distributed_pipeline()` + /// + /// The returned id can be used to get sender and receiver for broadcasting data. + fn get_next_broadcast_id(&self) -> u32; + + fn reset_broadcast_id(&self) { + unimplemented!() + } } pub type AbortChecker = Arc; diff --git a/src/query/service/src/pipelines/builders/builder_broadcast.rs b/src/query/service/src/pipelines/builders/builder_broadcast.rs new file mode 100644 index 0000000000000..fc2b8ab28a9ad --- /dev/null +++ b/src/query/service/src/pipelines/builders/builder_broadcast.rs @@ -0,0 +1,39 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_sql::executor::physical_plans::BroadcastSink; +use databend_common_sql::executor::physical_plans::BroadcastSource; + +use crate::pipelines::processors::transforms::BroadcastSinkProcessor; +use crate::pipelines::processors::transforms::BroadcastSourceProcessor; +use crate::pipelines::PipelineBuilder; + +impl PipelineBuilder { + pub(crate) fn build_broadcast_source(&mut self, source: &BroadcastSource) -> Result<()> { + let receiver = self.ctx.broadcast_source_receiver(source.broadcast_id); + self.main_pipeline.add_source( + |output| BroadcastSourceProcessor::create(self.ctx.clone(), receiver.clone(), output), + 1, + ) + } + + pub(crate) fn build_broadcast_sink(&mut self, sink: &BroadcastSink) -> Result<()> { + self.build_pipeline(&sink.input)?; + self.main_pipeline.resize(1, true)?; + self.main_pipeline.add_sink(|input| { + BroadcastSinkProcessor::create(input, self.ctx.broadcast_sink_sender(sink.broadcast_id)) + }) + } +} diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs index 8a2e5a481a349..552580bc7c467 100644 --- a/src/query/service/src/pipelines/builders/mod.rs +++ b/src/query/service/src/pipelines/builders/mod.rs @@ -16,6 +16,7 @@ mod builder_add_stream_column; mod builder_aggregate; mod builder_append_table; mod builder_async_function; +mod builder_broadcast; mod builder_column_mutation; mod builder_commit; mod builder_compact; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index fec66ac033e1f..ee9593427c659 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -220,6 +220,8 @@ impl PipelineBuilder { } PhysicalPlan::Shuffle(shuffle) => self.build_shuffle(shuffle), PhysicalPlan::Duplicate(duplicate) => self.build_duplicate(duplicate), + PhysicalPlan::BroadcastSource(source) => self.build_broadcast_source(source), + PhysicalPlan::BroadcastSink(sink) => self.build_broadcast_sink(sink), // ============================== // 4. Data Modification Operations diff --git a/src/query/service/src/pipelines/processors/transforms/broadcast.rs b/src/query/service/src/pipelines/processors/transforms/broadcast.rs new file mode 100644 index 0000000000000..e8c26d677863d --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/broadcast.rs @@ -0,0 +1,98 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_channel::Receiver; +use async_channel::Sender; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoPtr; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_sinks::AsyncSink; +use databend_common_pipeline_sinks::AsyncSinker; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; + +pub struct BroadcastSourceProcessor { + pub receiver: Receiver, +} + +impl BroadcastSourceProcessor { + pub fn create( + ctx: Arc, + receiver: Receiver, + output_port: Arc, + ) -> Result { + AsyncSourcer::create(ctx, output_port, Self { receiver }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for BroadcastSourceProcessor { + const NAME: &'static str = "BroadcastSource"; + const SKIP_EMPTY_DATA_BLOCK: bool = false; + + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + let received = self.receiver.recv().await; + match received { + Ok(meta) => Ok(Some(DataBlock::empty_with_meta(meta))), + Err(_) => { + // The channel is closed, we should return None to stop generating + Ok(None) + } + } + } +} + +pub struct BroadcastSinkProcessor { + received: Vec, + sender: Sender, +} + +impl BroadcastSinkProcessor { + pub fn create(input: Arc, sender: Sender) -> Result { + Ok(ProcessorPtr::create(AsyncSinker::create(input, Self { + received: vec![], + sender, + }))) + } +} + +#[async_trait::async_trait] +impl AsyncSink for BroadcastSinkProcessor { + const NAME: &'static str = "BroadcastSink"; + + async fn on_finish(&mut self) -> Result<()> { + self.sender.close(); + Ok(()) + } + + async fn consume(&mut self, mut data_block: DataBlock) -> Result { + let meta = data_block + .take_meta() + .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to BroadcastMeta"))?; + log::info!("BroadcastSinkProcessor recv meta: {:?}", meta); + self.sender + .send(meta) + .await + .map_err(|_| ErrorCode::Internal("BroadcastSinkProcessor send error"))?; + Ok(false) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index da8b50455878f..d5eca831d93bb 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -13,6 +13,8 @@ // limitations under the License. pub mod aggregator; +#[allow(dead_code)] +mod broadcast; mod hash_join; pub(crate) mod range_join; mod runtime_pool; @@ -40,6 +42,8 @@ mod transform_udf_script; mod transform_udf_server; mod window; +pub use broadcast::BroadcastSinkProcessor; +pub use broadcast::BroadcastSourceProcessor; pub use hash_join::*; pub use transform_add_computed_columns::TransformAddComputedColumns; pub use transform_add_const_columns::TransformAddConstColumns; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs index 126ae6c959b5c..e14efe1ca66df 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs @@ -351,7 +351,9 @@ async fn create_memory_table_for_cte_scan( | PhysicalPlan::ChunkFillAndReorder(_) | PhysicalPlan::ChunkAppendData(_) | PhysicalPlan::ChunkMerge(_) - | PhysicalPlan::ChunkCommitInsert(_) => {} + | PhysicalPlan::ChunkCommitInsert(_) + | PhysicalPlan::BroadcastSource(_) + | PhysicalPlan::BroadcastSink(_) => {} } Ok(()) } diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index a205a4ec6e963..5018dee6569e6 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -123,9 +123,13 @@ impl Fragmenter { pub fn build_fragment(mut self, plan: &PhysicalPlan) -> Result { let root = self.replace(plan)?; + let fragment_type = match plan { + PhysicalPlan::BroadcastSink(_) => FragmentType::Intermediate, + _ => FragmentType::Root, + }; let mut root_fragment = PlanFragment { plan: root, - fragment_type: FragmentType::Root, + fragment_type, fragment_id: self.ctx.get_fragment_id(), exchange: None, query_id: self.query_id.clone(), @@ -218,6 +222,7 @@ impl PhysicalPlanReplacer for Fragmenter { fragments.append(&mut self.fragments); let probe_input = self.replace(plan.probe.as_ref())?; fragments.append(&mut self.fragments); + self.fragments = fragments; Ok(PhysicalPlan::HashJoin(HashJoin { diff --git a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs index dece1fdfb21c5..9fa10c1585bfc 100644 --- a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs +++ b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs @@ -126,12 +126,16 @@ impl QueryFragmentsActions { self.ctx.get_cluster().local_id() } - pub fn get_root_actions(&self) -> Result<&QueryFragmentActions> { - self.fragments_actions.last().ok_or_else(|| { - ErrorCode::Internal( - "Logical error, call get_root_actions in empty QueryFragmentsActions", - ) - }) + pub fn get_root_fragment_ids(&self) -> Result> { + let mut fragment_ids = Vec::new(); + for fragment_actions in &self.fragments_actions { + let plan = &fragment_actions.fragment_actions[0].physical_plan; + if !matches!(plan, PhysicalPlan::ExchangeSink(_)) { + fragment_ids.push(fragment_actions.fragment_id); + } + } + + Ok(fragment_ids) } pub fn pop_root_actions(&mut self) -> Option { diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index f3259edae81c6..110ec28d7bdb1 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use databend_common_exception::Result; use databend_common_expression::DataBlock; +use databend_common_sql::executor::build_broadcast_plans; use databend_common_sql::planner::QueryExecutor; use databend_common_sql::Planner; use futures_util::TryStreamExt; @@ -99,11 +100,15 @@ pub async fn build_distributed_pipeline( ctx: &Arc, plan: &PhysicalPlan, ) -> Result { - let fragmenter = Fragmenter::try_create(ctx.clone())?; - - let root_fragment = fragmenter.build_fragment(plan)?; let mut fragments_actions = QueryFragmentsActions::create(ctx.clone()); - root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?; + for plan in build_broadcast_plans(ctx.as_ref())? + .iter() + .chain(std::iter::once(plan)) + { + let fragmenter = Fragmenter::try_create(ctx.clone())?; + let root_fragment = fragmenter.build_fragment(plan)?; + root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?; + } let exchange_manager = ctx.get_exchange_manager(); diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 1eb5adb336efd..64b0e23c23617 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -57,7 +57,6 @@ use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; use crate::pipelines::PipelineBuilder; -use crate::schedulers::QueryFragmentActions; use crate::schedulers::QueryFragmentsActions; use crate::servers::flight::v1::actions::init_query_fragments; use crate::servers::flight::v1::actions::INIT_QUERY_FRAGMENTS; @@ -443,7 +442,7 @@ impl DataExchangeManager { retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; - let root_actions = actions.get_root_actions()?; + let mut root_fragment_ids = actions.get_root_fragment_ids()?; let conf = GlobalConfig::instance(); // Initialize query env between cluster nodes @@ -466,7 +465,8 @@ impl DataExchangeManager { } // Get local pipeline of local task - let build_res = self.get_root_pipeline(ctx, root_actions)?; + let main_fragment_id = root_fragment_ids.pop().unwrap(); + let build_res = self.get_root_pipeline(ctx, main_fragment_id, root_fragment_ids)?; let prepared_query = actions.prepared_query()?; let _: HashMap = cluster @@ -479,10 +479,10 @@ impl DataExchangeManager { fn get_root_pipeline( &self, ctx: Arc, - root_actions: &QueryFragmentActions, + main_fragment_id: usize, + fragment_ids: Vec, ) -> Result { let query_id = ctx.get_id(); - let fragment_id = root_actions.fragment_id; let queries_coordinator_guard = self.queries_coordinator.lock(); let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; @@ -490,10 +490,34 @@ impl DataExchangeManager { match queries_coordinator.get_mut(&query_id) { None => Err(ErrorCode::Internal("Query not exists.")), Some(query_coordinator) => { - assert!(query_coordinator.fragment_exchanges.is_empty()); + assert!( + query_coordinator.fragment_exchanges.is_empty(), + "query_coordinator.fragment_exchanges is not empty: {:?}", + query_coordinator + .fragment_exchanges + .keys() + .collect::>() + ); let injector = DefaultExchangeInjector::create(); - let mut build_res = - query_coordinator.subscribe_fragment(&ctx, fragment_id, injector)?; + let mut build_res = query_coordinator.subscribe_fragment( + &ctx, + main_fragment_id, + injector.clone(), + )?; + + for fragment_id in fragment_ids { + let sub_build_res = query_coordinator.subscribe_fragment( + &ctx, + fragment_id, + injector.clone(), + )?; + build_res + .sources_pipelines + .push(sub_build_res.main_pipeline); + build_res + .sources_pipelines + .extend(sub_build_res.sources_pipelines); + } let exchanges = std::mem::take(&mut query_coordinator.statistics_exchanges); let statistics_receiver = StatisticsReceiver::spawn_receiver(&ctx, exchanges)?; @@ -775,16 +799,18 @@ impl QueryCoordinator { return Ok(fragment_coordinator.pipeline_build_res.unwrap()); } - let exchange_params = fragment_coordinator.create_exchange_params( - info, - fragment_coordinator - .pipeline_build_res - .as_ref() - .map(|x| x.exchange_injector.clone()) - .ok_or_else(|| { - ErrorCode::Internal("Pipeline build result is none, It's a bug") - })?, - )?; + let exchange_params = fragment_coordinator + .create_exchange_params( + info, + fragment_coordinator + .pipeline_build_res + .as_ref() + .map(|x| x.exchange_injector.clone()) + .ok_or_else(|| { + ErrorCode::Internal("Pipeline build result is none, It's a bug") + })?, + )? + .unwrap(); let mut build_res = fragment_coordinator.pipeline_build_res.unwrap(); // Add exchange data transform. @@ -854,12 +880,19 @@ impl QueryCoordinator { if let Some(mut build_res) = coordinator.pipeline_build_res.take() { build_res.set_max_threads(max_threads as usize); - if !build_res.main_pipeline.is_pulling_pipeline()? { - return Err(ErrorCode::Internal("Logical error, It's a bug")); - } - - // Add exchange data publisher. - ExchangeSink::via(&info.query_ctx, ¶ms, &mut build_res.main_pipeline)?; + if build_res.main_pipeline.is_pulling_pipeline()? { + let Some(params) = params else { + return Err(ErrorCode::Internal( + "pipeline is pulling pipeline, but exchange params is none", + )); + }; + // Add exchange data publisher. + ExchangeSink::via(&info.query_ctx, ¶ms, &mut build_res.main_pipeline)?; + } else if build_res.main_pipeline.is_complete_pipeline()? && params.is_some() { + return Err(ErrorCode::Internal( + "pipeline is complete pipeline, but exchange params is some", + )); + }; if !build_res.main_pipeline.is_complete_pipeline()? { return Err(ErrorCode::Internal("Logical error, It's a bug")); @@ -938,48 +971,47 @@ impl FragmentCoordinator { &self, info: &QueryInfo, exchange_injector: Arc, - ) -> Result { - if let Some(data_exchange) = &self.data_exchange { - return match data_exchange { - DataExchange::Merge(exchange) => { - Ok(ExchangeParams::MergeExchange(MergeExchangeParams { - exchange_injector: exchange_injector.clone(), - schema: self.physical_plan.output_schema()?, - fragment_id: self.fragment_id, - query_id: info.query_id.to_string(), - destination_id: exchange.destination_id.clone(), - allow_adjust_parallelism: exchange.allow_adjust_parallelism, - ignore_exchange: exchange.ignore_exchange, - })) - } - DataExchange::Broadcast(exchange) => { - Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams { - exchange_injector: exchange_injector.clone(), - schema: self.physical_plan.output_schema()?, - fragment_id: self.fragment_id, - query_id: info.query_id.to_string(), - executor_id: info.current_executor.to_string(), - destination_ids: exchange.destination_ids.to_owned(), - shuffle_scatter: exchange_injector - .flight_scatter(&info.query_ctx, data_exchange)?, - })) - } - DataExchange::ShuffleDataExchange(exchange) => { - Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams { - exchange_injector: exchange_injector.clone(), - schema: self.physical_plan.output_schema()?, - fragment_id: self.fragment_id, - query_id: info.query_id.to_string(), - executor_id: info.current_executor.to_string(), - destination_ids: exchange.destination_ids.to_owned(), - shuffle_scatter: exchange_injector - .flight_scatter(&info.query_ctx, data_exchange)?, - })) - } - }; + ) -> Result> { + let Some(data_exchange) = &self.data_exchange else { + return Ok(None); + }; + match data_exchange { + DataExchange::Merge(exchange) => { + Ok(Some(ExchangeParams::MergeExchange(MergeExchangeParams { + exchange_injector: exchange_injector.clone(), + schema: self.physical_plan.output_schema()?, + fragment_id: self.fragment_id, + query_id: info.query_id.to_string(), + destination_id: exchange.destination_id.clone(), + allow_adjust_parallelism: exchange.allow_adjust_parallelism, + ignore_exchange: exchange.ignore_exchange, + }))) + } + DataExchange::Broadcast(exchange) => Ok(Some(ExchangeParams::ShuffleExchange( + ShuffleExchangeParams { + exchange_injector: exchange_injector.clone(), + schema: self.physical_plan.output_schema()?, + fragment_id: self.fragment_id, + query_id: info.query_id.to_string(), + executor_id: info.current_executor.to_string(), + destination_ids: exchange.destination_ids.to_owned(), + shuffle_scatter: exchange_injector + .flight_scatter(&info.query_ctx, data_exchange)?, + }, + ))), + DataExchange::ShuffleDataExchange(exchange) => Ok(Some( + ExchangeParams::ShuffleExchange(ShuffleExchangeParams { + exchange_injector: exchange_injector.clone(), + schema: self.physical_plan.output_schema()?, + fragment_id: self.fragment_id, + query_id: info.query_id.to_string(), + executor_id: info.current_executor.to_string(), + destination_ids: exchange.destination_ids.to_owned(), + shuffle_scatter: exchange_injector + .flight_scatter(&info.query_ctx, data_exchange)?, + }), + )), } - - Err(ErrorCode::Internal("Cannot find data exchange.")) } pub fn prepare_pipeline(&mut self, ctx: Arc) -> Result<()> { diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs index abebc2ba6a254..91bdf774f9bf8 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs @@ -70,10 +70,10 @@ impl AsyncSink for ExchangeWriterSink { async fn consume(&mut self, mut data_block: DataBlock) -> Result { let serialize_meta = match data_block.take_meta() { None => Err(ErrorCode::Internal( - "ExchangeWriterSink only recv ExchangeSerializeMeta.", + "ExchangeWriterSink only recv ExchangeSerializeMeta, but got none.", )), Some(block_meta) => ExchangeSerializeMeta::downcast_from(block_meta).ok_or_else(|| { - ErrorCode::Internal("ExchangeWriterSink only recv ExchangeSerializeMeta.") + ErrorCode::Internal("ExchangeWriterSink only recv ExchangeSerializeMeta") }), }?; diff --git a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs index 5a757f37ba299..11d17ff6641fe 100644 --- a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs @@ -159,11 +159,6 @@ impl BlockMetaTransform for TransformScatterExchangeSeriali fn transform(&mut self, meta: ExchangeShuffleMeta) -> Result> { let mut new_blocks = Vec::with_capacity(meta.blocks.len()); for (index, block) in meta.blocks.into_iter().enumerate() { - if block.is_empty() { - new_blocks.push(block); - continue; - } - new_blocks.push(match self.local_pos == index { true => block, false => serialize_block(0, block, &self.options)?, diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 0458bca7eb53d..fd6ce4bf018c2 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -28,6 +28,8 @@ use std::time::Instant; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use async_channel::Receiver; +use async_channel::Sender; use chrono_tz::Tz; use dashmap::mapref::multiple::RefMulti; use dashmap::DashMap; @@ -62,6 +64,7 @@ use databend_common_catalog::table_context::StageAttachment; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::BlockThresholds; use databend_common_expression::Expr; use databend_common_expression::FunctionContext; @@ -285,6 +288,29 @@ impl QueryContext { self.shared.attach_table(catalog, database, name, table) } + pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver { + self.shared.broadcast_source_receiver(broadcast_id) + } + + /// Get a sender to broadcast data + /// + /// Note: The channel must be closed by calling close() after data transmission is completed + pub fn broadcast_source_sender(&self, broadcast_id: u32) -> Sender { + self.shared.broadcast_source_sender(broadcast_id) + } + + /// A receiver to receive broadcast data + /// + /// Note: receive() can be called repeatedly until an Error is returned, indicating + /// that the upstream channel has been closed + pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver { + self.shared.broadcast_sink_receiver(broadcast_id) + } + + pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender { + self.shared.broadcast_sink_sender(broadcast_id) + } + pub fn get_exchange_manager(&self) -> Arc { DataExchangeManager::instance() } @@ -1842,6 +1868,16 @@ impl TableContext for QueryContext { fn set_pruned_partitions_stats(&self, partitions: PartStatistics) { self.shared.set_pruned_partitions_stats(partitions); } + + fn get_next_broadcast_id(&self) -> u32 { + self.shared + .next_broadcast_id + .fetch_add(1, Ordering::Acquire) + } + + fn reset_broadcast_id(&self) { + self.shared.next_broadcast_id.store(0, Ordering::Release); + } } impl TrySpawn for QueryContext { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 2ab2f3c864d8f..e79ba8c83d709 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -16,6 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::collections::HashSet; use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU32; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -23,6 +24,8 @@ use std::sync::Weak; use std::time::Duration; use std::time::SystemTime; +use async_channel::Receiver; +use async_channel::Sender; use dashmap::DashMap; use databend_common_base::base::short_sql; use databend_common_base::base::Progress; @@ -43,6 +46,7 @@ use databend_common_catalog::table_context::StageAttachment; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoPtr; use databend_common_meta_app::principal::OnErrorMode; use databend_common_meta_app::principal::RoleInfo; use databend_common_meta_app::principal::UserDefinedConnection; @@ -170,6 +174,17 @@ pub struct QueryContextShared { pub(in crate::sessions) selected_segment_locs: Arc>>, pub(in crate::sessions) pruned_partitions_stats: Arc>>, + + pub(in crate::sessions) next_broadcast_id: AtomicU32, + pub(in crate::sessions) broadcast_channels: Arc>>, +} + +#[derive(Default)] +pub struct BroadcastChannel { + pub source_sender: Option>, + pub source_receiver: Option>, + pub sink_sender: Option>, + pub sink_receiver: Option>, } impl QueryContextShared { @@ -236,9 +251,61 @@ impl QueryContextShared { node_memory_usage: Arc::new(RwLock::new(HashMap::new())), selected_segment_locs: Default::default(), pruned_partitions_stats: Arc::new(RwLock::new(None)), + next_broadcast_id: AtomicU32::new(0), + broadcast_channels: Arc::new(Mutex::new(HashMap::new())), })) } + pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver { + let mut broadcast_channels = self.broadcast_channels.lock(); + let entry = broadcast_channels.entry(broadcast_id).or_default(); + match entry.source_receiver.take() { + Some(receiver) => receiver, + None => { + let (sender, receiver) = async_channel::unbounded(); + entry.source_sender = Some(sender); + receiver + } + } + } + pub fn broadcast_source_sender(&self, broadcast_id: u32) -> Sender { + let mut broadcast_channels = self.broadcast_channels.lock(); + let entry = broadcast_channels.entry(broadcast_id).or_default(); + match entry.source_sender.take() { + Some(sender) => sender, + None => { + let (sender, receiver) = async_channel::unbounded(); + entry.source_receiver = Some(receiver); + sender + } + } + } + + pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver { + let mut broadcast_channels = self.broadcast_channels.lock(); + let entry = broadcast_channels.entry(broadcast_id).or_default(); + match entry.sink_receiver.take() { + Some(receiver) => receiver, + None => { + let (sender, receiver) = async_channel::unbounded(); + entry.sink_sender = Some(sender); + receiver + } + } + } + pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender { + let mut broadcast_channels = self.broadcast_channels.lock(); + let entry = broadcast_channels.entry(broadcast_id).or_default(); + match entry.sink_sender.take() { + Some(sender) => sender, + None => { + let (sender, receiver) = async_channel::unbounded(); + entry.sink_receiver = Some(receiver); + sender + } + } + } + pub fn set_error(&self, err: ErrorCode) { let err = err.with_context("query context error"); diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index ae063e3556796..3957876fa5354 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -1037,6 +1037,10 @@ impl TableContext for CtxDelegation { async fn get_warehouse_cluster(&self) -> Result> { todo!() } + + fn get_next_broadcast_id(&self) -> u32 { + self.ctx.get_next_broadcast_id() + } } #[tokio::test(flavor = "multi_thread")] diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 1dc98e7b13f06..a2d86ade67de4 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -396,6 +396,10 @@ impl TableContext for CtxDelegation { todo!() } + fn get_next_broadcast_id(&self) -> u32 { + self.ctx.get_next_broadcast_id() + } + fn txn_mgr(&self) -> TxnManagerRef { self.ctx.txn_mgr() } diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index b7166308a7855..1e3f8879339f3 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -26,7 +26,7 @@ use databend_common_pipeline_core::processors::PlanProfile; use itertools::Itertools; use super::physical_plans::AddStreamColumn; -use super::PhysicalRuntimeFilter; +use super::physical_plans::PhysicalRuntimeFilter; use crate::binder::MutationType; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plans::AggregateExpand; @@ -521,6 +521,12 @@ fn to_format_tree( PhysicalPlan::AsyncFunction(plan) => { async_function_to_format_tree(plan, metadata, profs, context) } + PhysicalPlan::BroadcastSource(_plan) => { + Ok(FormatTreeNode::new("RuntimeFilterSource".to_string())) + } + PhysicalPlan::BroadcastSink(_plan) => { + Ok(FormatTreeNode::new("RuntimeFilterSink".to_string())) + } } } diff --git a/src/query/sql/src/executor/mod.rs b/src/query/sql/src/executor/mod.rs index 73f2b65710b82..3e40852552cac 100644 --- a/src/query/sql/src/executor/mod.rs +++ b/src/query/sql/src/executor/mod.rs @@ -27,6 +27,7 @@ pub use physical_plan::PhysicalPlan; pub use physical_plan_builder::MutationBuildInfo; pub use physical_plan_builder::PhysicalPlanBuilder; pub use physical_plan_visitor::PhysicalPlanReplacer; +pub use physical_plans::build_broadcast_plans; pub use physical_plans::PhysicalRuntimeFilter; pub use physical_plans::PhysicalRuntimeFilters; pub use util::*; diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 0c81455aa489f..f6dd20cf95c70 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -26,6 +26,8 @@ use enum_as_inner::EnumAsInner; use itertools::Itertools; use super::physical_plans::AddStreamColumn; +use super::physical_plans::BroadcastSink; +use super::physical_plans::BroadcastSource; use super::physical_plans::HilbertPartition; use super::physical_plans::MutationManipulate; use super::physical_plans::MutationOrganize; @@ -155,6 +157,10 @@ pub enum PhysicalPlan { // async function call AsyncFunction(AsyncFunction), + + // broadcast + BroadcastSource(BroadcastSource), + BroadcastSink(BroadcastSink), } impl PhysicalPlan { @@ -407,6 +413,15 @@ impl PhysicalPlan { *next_id += 1; plan.input.adjust_plan_id(next_id); } + PhysicalPlan::BroadcastSource(plan) => { + plan.plan_id = *next_id; + *next_id += 1; + } + PhysicalPlan::BroadcastSink(plan) => { + plan.plan_id = *next_id; + *next_id += 1; + plan.input.adjust_plan_id(next_id); + } } } @@ -463,6 +478,8 @@ impl PhysicalPlan { PhysicalPlan::ChunkMerge(v) => v.plan_id, PhysicalPlan::ChunkCommitInsert(v) => v.plan_id, PhysicalPlan::RecursiveCteScan(v) => v.plan_id, + PhysicalPlan::BroadcastSource(v) => v.plan_id, + PhysicalPlan::BroadcastSink(v) => v.plan_id, } } @@ -508,6 +525,8 @@ impl PhysicalPlan { | PhysicalPlan::CommitSink(_) | PhysicalPlan::DistributedInsertSelect(_) | PhysicalPlan::Recluster(_) + | PhysicalPlan::BroadcastSource(_) + | PhysicalPlan::BroadcastSink(_) | PhysicalPlan::HilbertPartition(_) => Ok(DataSchemaRef::default()), PhysicalPlan::Duplicate(plan) => plan.input.output_schema(), PhysicalPlan::Shuffle(plan) => plan.input.output_schema(), @@ -579,6 +598,8 @@ impl PhysicalPlan { PhysicalPlan::ChunkAppendData(_) => "WriteData".to_string(), PhysicalPlan::ChunkMerge(_) => "ChunkMerge".to_string(), PhysicalPlan::ChunkCommitInsert(_) => "Commit".to_string(), + PhysicalPlan::BroadcastSource(_) => "RuntimeFilterSource".to_string(), + PhysicalPlan::BroadcastSink(_) => "RuntimeFilterSink".to_string(), } } @@ -591,7 +612,8 @@ impl PhysicalPlan { | PhysicalPlan::CompactSource(_) | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) - | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), + | PhysicalPlan::RecursiveCteScan(_) + | PhysicalPlan::BroadcastSource(_) => Box::new(std::iter::empty()), PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::EvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())), @@ -609,6 +631,7 @@ impl PhysicalPlan { PhysicalPlan::ExpressionScan(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), + PhysicalPlan::BroadcastSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::UnionAll(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), @@ -663,6 +686,7 @@ impl PhysicalPlan { | PhysicalPlan::CompactSource(_) | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) + | PhysicalPlan::BroadcastSource(_) | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_mut())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_mut())), @@ -723,6 +747,7 @@ impl PhysicalPlan { CopyIntoTableSource::Query(v) => Box::new(std::iter::once(v.as_mut())), CopyIntoTableSource::Stage(v) => Box::new(std::iter::once(v.as_mut())), }, + PhysicalPlan::BroadcastSink(plan) => Box::new(std::iter::once(plan.input.as_mut())), } } @@ -778,7 +803,9 @@ impl PhysicalPlan { | PhysicalPlan::ChunkFillAndReorder(_) | PhysicalPlan::ChunkAppendData(_) | PhysicalPlan::ChunkMerge(_) - | PhysicalPlan::ChunkCommitInsert(_) => None, + | PhysicalPlan::ChunkCommitInsert(_) + | PhysicalPlan::BroadcastSource(_) + | PhysicalPlan::BroadcastSink(_) => None, } } diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 8a1ced834b01a..9948039837727 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -15,6 +15,8 @@ use databend_common_exception::Result; use super::physical_plans::AddStreamColumn; +use super::physical_plans::BroadcastSink; +use super::physical_plans::BroadcastSource; use super::physical_plans::CacheScan; use super::physical_plans::ExpressionScan; use super::physical_plans::HilbertPartition; @@ -120,9 +122,24 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::ChunkAppendData(plan) => self.replace_chunk_append_data(plan), PhysicalPlan::ChunkMerge(plan) => self.replace_chunk_merge(plan), PhysicalPlan::ChunkCommitInsert(plan) => self.replace_chunk_commit_insert(plan), + PhysicalPlan::BroadcastSource(plan) => self.replace_runtime_filter_source(plan), + PhysicalPlan::BroadcastSink(plan) => self.replace_runtime_filter_sink(plan), } } + fn replace_runtime_filter_source(&mut self, plan: &BroadcastSource) -> Result { + Ok(PhysicalPlan::BroadcastSource(plan.clone())) + } + + fn replace_runtime_filter_sink(&mut self, plan: &BroadcastSink) -> Result { + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::BroadcastSink(BroadcastSink { + plan_id: plan.plan_id, + broadcast_id: plan.broadcast_id, + input: Box::new(input), + })) + } + fn replace_recluster(&mut self, plan: &Recluster) -> Result { Ok(PhysicalPlan::Recluster(Box::new(plan.clone()))) } @@ -646,7 +663,8 @@ impl PhysicalPlan { | PhysicalPlan::HilbertPartition(_) | PhysicalPlan::ExchangeSource(_) | PhysicalPlan::CompactSource(_) - | PhysicalPlan::MutationSource(_) => {} + | PhysicalPlan::MutationSource(_) + | PhysicalPlan::BroadcastSource(_) => {} PhysicalPlan::Filter(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } @@ -772,6 +790,9 @@ impl PhysicalPlan { PhysicalPlan::ChunkCommitInsert(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } + PhysicalPlan::BroadcastSink(plan) => { + Self::traverse(&plan.input, pre_visit, visit, post_visit); + } } post_visit(plan); } diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index ab3e282abec34..2e04928a2a55c 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -18,6 +18,8 @@ mod physical_aggregate_expand; mod physical_aggregate_final; mod physical_aggregate_partial; mod physical_async_func; +#[allow(dead_code)] +mod physical_broadcast; mod physical_cache_scan; mod physical_column_mutation; mod physical_commit_sink; @@ -65,6 +67,9 @@ pub use physical_aggregate_final::AggregateFinal; pub use physical_aggregate_partial::AggregatePartial; pub use physical_async_func::AsyncFunction; pub use physical_async_func::AsyncFunctionDesc; +pub use physical_broadcast::BroadcastSink; +pub use physical_broadcast::BroadcastSource; +pub use physical_broadcast::*; pub use physical_cache_scan::CacheScan; pub use physical_column_mutation::ColumnMutation; pub use physical_commit_sink::*; diff --git a/src/query/sql/src/executor/physical_plans/physical_broadcast.rs b/src/query/sql/src/executor/physical_plans/physical_broadcast.rs new file mode 100644 index 0000000000000..36e42243e4ff0 --- /dev/null +++ b/src/query/sql/src/executor/physical_plans/physical_broadcast.rs @@ -0,0 +1,64 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; + +use super::Exchange; +use super::FragmentKind; +use crate::executor::PhysicalPlan; + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct BroadcastSource { + pub plan_id: u32, + pub broadcast_id: u32, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct BroadcastSink { + pub plan_id: u32, + pub broadcast_id: u32, + pub input: Box, +} + +pub fn build_broadcast_plan(broadcast_id: u32) -> Result { + let broadcast_source = Box::new(PhysicalPlan::BroadcastSource(BroadcastSource { + plan_id: 0, + broadcast_id, + })); + let exchange = Box::new(PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: broadcast_source, + kind: FragmentKind::Expansive, + keys: vec![], + allow_adjust_parallelism: true, + ignore_exchange: false, + })); + let broadcast_sink = PhysicalPlan::BroadcastSink(BroadcastSink { + plan_id: 0, + broadcast_id, + input: exchange, + }); + Ok(broadcast_sink) +} + +pub fn build_broadcast_plans(ctx: &dyn TableContext) -> Result> { + let mut plans = vec![]; + let next_broadcast_id = ctx.get_next_broadcast_id(); + ctx.reset_broadcast_id(); + for broadcast_id in 0..next_broadcast_id { + plans.push(build_broadcast_plan(broadcast_id)?); + } + Ok(plans) +} diff --git a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs index c2b7cc21c1b72..59a8bd3239e86 100644 --- a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs @@ -27,8 +27,8 @@ use databend_common_expression::DataSchemaRefExt; use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; +use super::physical_join_filter::PhysicalRuntimeFilters; use super::JoinRuntimeFilter; -use super::PhysicalRuntimeFilters; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plans::Exchange; use crate::executor::physical_plans::FragmentKind;