diff --git a/src/query/catalog/src/runtime_filter_info.rs b/src/query/catalog/src/runtime_filter_info.rs
index 5caeae4090789..8bffe222cb0a2 100644
--- a/src/query/catalog/src/runtime_filter_info.rs
+++ b/src/query/catalog/src/runtime_filter_info.rs
@@ -12,6 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use std::collections::HashMap;
+
 use databend_common_base::base::tokio::sync::watch;
 use databend_common_base::base::tokio::sync::watch::Receiver;
 use databend_common_base::base::tokio::sync::watch::Sender;
@@ -19,47 +21,23 @@ use databend_common_expression::Expr;
 use xorf::BinaryFuse16;
 
 #[derive(Clone, Debug, Default)]
-pub struct RuntimeFilterInfo {
-    inlist: Vec<Expr<String>>,
-    min_max: Vec<Expr<String>>,
-    bloom: Vec<(String, BinaryFuse16)>,
+pub struct RuntimeFiltersForScan {
+    pub inlist: HashMap<usize, Expr<String>>,
+    pub min_max: HashMap<usize, Expr<String>>,
+    pub bloom: HashMap<usize, (String, BinaryFuse16)>,
 }
 
-impl RuntimeFilterInfo {
-    pub fn add_inlist(&mut self, expr: Expr<String>) {
-        self.inlist.push(expr);
-    }
-
-    pub fn add_bloom(&mut self, bloom: (String, BinaryFuse16)) {
-        self.bloom.push(bloom);
-    }
-
-    pub fn add_min_max(&mut self, expr: Expr<String>) {
-        self.min_max.push(expr);
-    }
-
-    pub fn get_inlist(&self) -> &Vec<Expr<String>> {
-        &self.inlist
-    }
-
-    pub fn get_bloom(&self) -> &Vec<(String, BinaryFuse16)> {
-        &self.bloom
-    }
-
-    pub fn get_min_max(&self) -> &Vec<Expr<String>> {
-        &self.min_max
-    }
-
-    pub fn blooms(self) -> Vec<(String, BinaryFuse16)> {
-        self.bloom
+impl RuntimeFiltersForScan {
+    pub fn add_inlist(&mut self, rf_id: usize, expr: Expr<String>) {
+        self.inlist.insert(rf_id, expr);
     }
 
-    pub fn inlists(self) -> Vec<Expr<String>> {
-        self.inlist
+    pub fn add_bloom(&mut self, rf_id: usize, bloom: (String, BinaryFuse16)) {
+        self.bloom.insert(rf_id, bloom);
     }
 
-    pub fn min_maxs(self) -> Vec<Expr<String>> {
-        self.min_max
+    pub fn add_min_max(&mut self, rf_id: usize, expr: Expr<String>) {
+        self.min_max.insert(rf_id, expr);
     }
 
     pub fn is_empty(&self) -> bool {
diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs
index 5eb826974ef68..346d1a959364e 100644
--- a/src/query/catalog/src/table_context.rs
+++ b/src/query/catalog/src/table_context.rs
@@ -74,8 +74,8 @@ use crate::plan::PartInfoPtr;
 use crate::plan::PartStatistics;
 use crate::plan::Partitions;
 use crate::query_kind::QueryKind;
-use crate::runtime_filter_info::RuntimeFilterInfo;
 use crate::runtime_filter_info::RuntimeFilterReady;
+use crate::runtime_filter_info::RuntimeFiltersForScan;
 use crate::statistics::data_cache_statistics::DataCacheMetrics;
 use crate::table::Table;
 
@@ -324,7 +324,7 @@ pub trait TableContext: Send + Sync {
 
     fn get_query_profiles(&self) -> Vec<PlanProfile>;
 
-    fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo));
+    fn set_runtime_filter(&self, filters: (usize, RuntimeFiltersForScan));
 
     fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc<RuntimeFilterReady>);
 
diff --git a/src/query/pipeline/sources/src/async_source.rs b/src/query/pipeline/sources/src/async_source.rs
index 77360d4045e2d..72904a145b124 100644
--- a/src/query/pipeline/sources/src/async_source.rs
+++ b/src/query/pipeline/sources/src/async_source.rs
@@ -131,7 +131,9 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {
             return Ok(());
         }
         match self.inner.generate().await? {
-            None => self.is_finish = true,
+            None => {
+                self.is_finish = true;
+            }
             Some(data_block) => {
                 if !data_block.is_empty() {
                     let progress_values = ProgressValues {
diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs
index 5dc183497928d..d4d7d5cfa3639 100644
--- a/src/query/service/src/pipelines/builders/builder_join.rs
+++ b/src/query/service/src/pipelines/builders/builder_join.rs
@@ -27,6 +27,7 @@ use crate::pipelines::processors::transforms::range_join::TransformRangeJoinLeft
 use crate::pipelines::processors::transforms::range_join::TransformRangeJoinRight;
 use crate::pipelines::processors::transforms::HashJoinBuildState;
 use crate::pipelines::processors::transforms::HashJoinProbeState;
+use crate::pipelines::processors::transforms::RuntimeFilterChannels;
 use crate::pipelines::processors::transforms::TransformHashJoinBuild;
 use crate::pipelines::processors::transforms::TransformHashJoinProbe;
 use crate::pipelines::processors::HashJoinDesc;
@@ -102,6 +103,7 @@ impl PipelineBuilder {
             self.hash_join_states
                 .insert(build_cache_index, state.clone());
         }
+
         self.expand_build_side_pipeline(&join.build, join, state.clone())?;
         self.build_join_probe(join, state)?;
 
@@ -153,6 +155,13 @@ impl PipelineBuilder {
             &hash_join_plan.build_projections,
             join_state.clone(),
             output_len,
+            hash_join_plan
+                .runtime_filter_plan
+                .as_ref()
+                .map(|_| RuntimeFilterChannels {
+                    rf_src_send: self.ctx.rf_src_send(hash_join_plan.join_id),
+                    rf_sink_recv: self.ctx.rf_sink_recv(hash_join_plan.join_id),
+                }),
         )?;
         build_state.add_runtime_filter_ready();
 
diff --git a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs
new file mode 100644
index 0000000000000..4be2d23e42699
--- /dev/null
+++ b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs
@@ -0,0 +1,46 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use databend_common_exception::Result;
+use databend_common_sql::executor::physical_plans::RuntimeFilterSink;
+use databend_common_sql::executor::physical_plans::RuntimeFilterSource;
+use databend_common_storages_fuse::TableContext;
+
+use crate::pipelines::processors::transforms::RuntimeFilterSinkProcessor;
+use crate::pipelines::processors::transforms::RuntimeFilterSourceProcessor;
+use crate::pipelines::PipelineBuilder;
+
+impl PipelineBuilder {
+    pub(crate) fn build_runtime_filter_source(
+        &mut self,
+        _source: &RuntimeFilterSource,
+    ) -> Result<()> {
+        let receiver = self.ctx.rf_src_recv(_source.join_id);
+        self.main_pipeline.add_source(
+            |output| {
+                RuntimeFilterSourceProcessor::create(self.ctx.clone(), receiver.clone(), output)
+            },
+            1,
+        )
+    }
+
+    pub(crate) fn build_runtime_filter_sink(&mut self, sink: &RuntimeFilterSink) -> Result<()> {
+        self.build_pipeline(&sink.input)?;
+        self.main_pipeline.resize(1, true)?;
+        let node_num = self.ctx.get_cluster().nodes.len();
+        self.main_pipeline.add_sink(|input| {
+            RuntimeFilterSinkProcessor::create(input, node_num, self.ctx.rf_sink_send(sink.join_id))
+        })
+    }
+}
diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs
index 8a2e5a481a349..3118b795f8b47 100644
--- a/src/query/service/src/pipelines/builders/mod.rs
+++ b/src/query/service/src/pipelines/builders/mod.rs
@@ -40,6 +40,7 @@ mod builder_recluster;
 mod builder_recursive_cte;
 mod builder_replace_into;
 mod builder_row_fetch;
+mod builder_runtime_filter;
 mod builder_scalar;
 mod builder_scan;
 mod builder_sort;
diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs
index 49bb9eccdb24e..0799456da42e5 100644
--- a/src/query/service/src/pipelines/pipeline_builder.rs
+++ b/src/query/service/src/pipelines/pipeline_builder.rs
@@ -258,6 +258,8 @@ impl PipelineBuilder {
             PhysicalPlan::ColumnMutation(column_mutation) => {
                 self.build_column_mutation(column_mutation)
             }
+            PhysicalPlan::RuntimeFilterSource(source) => self.build_runtime_filter_source(source),
+            PhysicalPlan::RuntimeFilterSink(sink) => self.build_runtime_filter_sink(sink),
         }?;
 
         self.is_exchange_neighbor = is_exchange_neighbor;
diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs
index 428ad2f03ae38..f464b3c358846 100644
--- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs
+++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs
@@ -20,8 +20,8 @@ use databend_common_expression::RemoteExpr;
 use databend_common_functions::BUILTIN_FUNCTIONS;
 use databend_common_sql::executor::cast_expr_to_non_null_boolean;
 use databend_common_sql::executor::physical_plans::HashJoin;
-use databend_common_sql::executor::PhysicalRuntimeFilter;
-use databend_common_sql::executor::PhysicalRuntimeFilters;
+use databend_common_sql::executor::RemoteRuntimeFilterDesc;
+use databend_common_sql::executor::RemoteRuntimeFiltersDesc;
 use parking_lot::RwLock;
 
 use crate::sql::plans::JoinType;
@@ -54,7 +54,7 @@ pub struct HashJoinDesc {
 }
 
 pub struct RuntimeFilterDesc {
-    pub _id: usize,
+    pub id: usize,
     pub build_key: Expr,
     pub probe_key: Expr<String>,
     pub scan_id: usize,
@@ -67,18 +67,18 @@ pub struct RuntimeFiltersDesc {
     pub filters: Vec<RuntimeFilterDesc>,
 }
 
-impl From<&PhysicalRuntimeFilters> for RuntimeFiltersDesc {
-    fn from(runtime_filter: &PhysicalRuntimeFilters) -> Self {
+impl From<&RemoteRuntimeFiltersDesc> for RuntimeFiltersDesc {
+    fn from(runtime_filter: &RemoteRuntimeFiltersDesc) -> Self {
         Self {
             filters: runtime_filter.filters.iter().map(|rf| rf.into()).collect(),
         }
     }
 }
 
-impl From<&PhysicalRuntimeFilter> for RuntimeFilterDesc {
-    fn from(runtime_filter: &PhysicalRuntimeFilter) -> Self {
+impl From<&RemoteRuntimeFilterDesc> for RuntimeFilterDesc {
+    fn from(runtime_filter: &RemoteRuntimeFilterDesc) -> Self {
         Self {
-            _id: runtime_filter.id,
+            id: runtime_filter.id,
             build_key: runtime_filter.build_key.as_expr(&BUILTIN_FUNCTIONS),
             probe_key: runtime_filter.probe_key.as_expr(&BUILTIN_FUNCTIONS),
             scan_id: runtime_filter.scan_id,
@@ -117,7 +117,7 @@ impl HashJoinDesc {
             from_correlated_subquery: join.from_correlated_subquery,
             broadcast: join.broadcast,
             single_to_inner: join.single_to_inner.clone(),
-            runtime_filter: (&join.runtime_filter).into(),
+            runtime_filter: (&join.runtime_filter_desc).into(),
         })
     }
 
diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs
index 0af13ee98f003..f4a1459499f67 100644
--- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs
+++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs
@@ -12,6 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use std::collections::HashMap;
 use std::collections::HashSet;
 use std::collections::VecDeque;
 use std::ops::ControlFlow;
@@ -20,9 +21,11 @@ use std::sync::atomic::AtomicUsize;
 use std::sync::atomic::Ordering;
 use std::sync::Arc;
 
+use async_channel::Receiver;
+use async_channel::Sender;
 use databend_common_base::base::tokio::sync::Barrier;
-use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
 use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
+use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan;
 use databend_common_catalog::table_context::TableContext;
 use databend_common_column::bitmap::Bitmap;
 use databend_common_exception::ErrorCode;
@@ -78,6 +81,7 @@ use crate::pipelines::processors::transforms::hash_join::FixedKeyHashJoinHashTab
 use crate::pipelines::processors::transforms::hash_join::HashJoinHashTable;
 use crate::pipelines::processors::transforms::hash_join::SerializerHashJoinHashTable;
 use crate::pipelines::processors::transforms::hash_join::SingleBinaryHashJoinHashTable;
+use crate::pipelines::processors::transforms::RemoteRuntimeFilters;
 use crate::pipelines::processors::HashJoinState;
 use crate::sessions::QueryContext;
 
@@ -117,6 +121,14 @@ pub struct HashJoinBuildState {
 
     /// Spill related states.
     pub(crate) memory_settings: MemorySettings,
+    pub(crate) rf_channels: Option<RuntimeFilterChannels>,
+}
+
+pub struct RuntimeFilterChannels {
+    /// send runtime filter to `RuntimeFilterSourceProcessor`
+    pub(crate) rf_src_send: Sender<RemoteRuntimeFilters>,
+    /// receive runtime filter from `RuntimeFilterSinkProcessor`
+    pub(crate) rf_sink_recv: Receiver<RemoteRuntimeFilters>,
 }
 
 impl HashJoinBuildState {
@@ -128,6 +140,7 @@ impl HashJoinBuildState {
         build_projections: &ColumnSet,
         hash_join_state: Arc<HashJoinState>,
         num_threads: usize,
+        rf_channels: Option<RuntimeFilterChannels>,
     ) -> Result<Arc<HashJoinBuildState>> {
         let hash_key_types = build_keys
             .iter()
@@ -164,6 +177,7 @@ impl HashJoinBuildState {
             build_hash_table_tasks: Default::default(),
             mutex: Default::default(),
             memory_settings,
+            rf_channels,
         }))
     }
 
@@ -286,6 +300,7 @@ impl HashJoinBuildState {
                     .build_watcher
                     .send(HashTableType::Empty)
                     .map_err(|_| ErrorCode::TokioError("build_watcher channel is closed"))?;
+                self.send_runtime_filter_meta(None)?;
                 self.set_bloom_filter_ready(false)?;
                 return Ok(());
             }
@@ -837,14 +852,16 @@ impl HashJoinBuildState {
 
     fn add_runtime_filter(&self, build_chunks: &[DataBlock], build_num_rows: usize) -> Result<()> {
         let mut bloom_filter_ready = false;
+        let mut runtime_filters = HashMap::new();
         for rf in self.runtime_filter_desc() {
-            let mut runtime_filter = RuntimeFilterInfo::default();
+            let mut runtime_filter = RuntimeFiltersForScan::default();
             if rf.enable_inlist_runtime_filter && build_num_rows < INLIST_RUNTIME_FILTER_THRESHOLD {
                 self.inlist_runtime_filter(
                     &mut runtime_filter,
                     build_chunks,
                     &rf.build_key,
                     &rf.probe_key,
+                    rf.id,
                 )?;
             }
             if rf.enable_bloom_runtime_filter {
@@ -853,6 +870,7 @@ impl HashJoinBuildState {
                     &mut runtime_filter,
                     &rf.build_key,
                     &rf.probe_key,
+                    rf.id,
                 )?;
             }
             if rf.enable_min_max_runtime_filter {
@@ -861,13 +879,15 @@ impl HashJoinBuildState {
                     &mut runtime_filter,
                     &rf.build_key,
                     &rf.probe_key,
+                    rf.id,
                 )?;
             }
             if !runtime_filter.is_empty() {
                 bloom_filter_ready |= !runtime_filter.is_blooms_empty();
-                self.ctx.set_runtime_filter((rf.scan_id, runtime_filter));
+                runtime_filters.insert(rf.scan_id, runtime_filter);
             }
         }
+        self.send_runtime_filter_meta(Some(runtime_filters))?;
         self.set_bloom_filter_ready(bloom_filter_ready)?;
         Ok(())
     }
@@ -875,9 +895,10 @@ impl HashJoinBuildState {
     fn bloom_runtime_filter(
         &self,
         data_blocks: &[DataBlock],
-        runtime_filter: &mut RuntimeFilterInfo,
+        runtime_filter: &mut RuntimeFiltersForScan,
         build_key: &Expr,
         probe_key: &Expr<String>,
+        rf_id: usize,
     ) -> Result<()> {
         if !build_key.data_type().remove_nullable().is_number()
             && !build_key.data_type().remove_nullable().is_string()
@@ -915,23 +936,24 @@ impl HashJoinBuildState {
             hashes_vec.push(hash);
         });
         let filter = BinaryFuse16::try_from(&hashes_vec)?;
-        runtime_filter.add_bloom((id.to_string(), filter));
+        runtime_filter.add_bloom(rf_id, (id.to_string(), filter));
         Ok(())
     }
 
     fn inlist_runtime_filter(
         &self,
-        runtime_filter: &mut RuntimeFilterInfo,
+        runtime_filter: &mut RuntimeFiltersForScan,
         data_blocks: &[DataBlock],
         build_key: &Expr,
         probe_key: &Expr<String>,
+        rf_id: usize,
     ) -> Result<()> {
         if let Some(distinct_build_column) =
             dedup_build_key_column(&self.func_ctx, data_blocks, build_key)?
         {
             if let Some(filter) = inlist_filter(probe_key, distinct_build_column.clone())? {
                 info!("inlist_filter: {:?}", filter.sql_display());
-                runtime_filter.add_inlist(filter);
+                runtime_filter.add_inlist(rf_id, filter);
             }
         }
         Ok(())
@@ -940,9 +962,10 @@ impl HashJoinBuildState {
     fn min_max_runtime_filter(
         &self,
         data_blocks: &[DataBlock],
-        runtime_filter: &mut RuntimeFilterInfo,
+        runtime_filter: &mut RuntimeFiltersForScan,
         build_key: &Expr,
         probe_key: &Expr<String>,
+        rf_id: usize,
     ) -> Result<()> {
         if !build_key.runtime_filter_supported_types() {
             return Ok(());
@@ -1035,7 +1058,7 @@ impl HashJoinBuildState {
             };
             if let Some(min_max_filter) = min_max_filter {
                 info!("min_max_filter: {:?}", min_max_filter.sql_display());
-                runtime_filter.add_min_max(min_max_filter);
+                runtime_filter.add_min_max(rf_id, min_max_filter);
             }
         }
         Ok(())
@@ -1068,4 +1091,26 @@ impl HashJoinBuildState {
             .iter()
             .any(|rf| rf.enable_min_max_runtime_filter)
     }
+
+    fn send_runtime_filter_meta(
+        &self,
+        mut rf: Option<HashMap<usize, RuntimeFiltersForScan>>,
+    ) -> Result<()> {
+        if let Some(channels) = self.rf_channels.as_ref() {
+            channels
+                .rf_src_send
+                .send_blocking(rf.into())
+                .map_err(|_| ErrorCode::TokioError("send runtime filter meta failed"))?;
+            channels.rf_src_send.close();
+            let merged_rf = channels
+                .rf_sink_recv
+                .recv_blocking()
+                .map_err(|_| ErrorCode::TokioError("receive runtime filter meta failed"))?;
+            rf = merged_rf.into();
+        }
+        for (scan_id, runtime_filter) in rf.unwrap_or_default().into_iter() {
+            self.ctx.set_runtime_filter((scan_id, runtime_filter));
+        }
+        Ok(())
+    }
 }
diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs
index d50ee45b166eb..1337e15f35cf3 100644
--- a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs
+++ b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs
@@ -31,6 +31,7 @@ mod util;
 
 pub use desc::HashJoinDesc;
 pub use hash_join_build_state::HashJoinBuildState;
+pub use hash_join_build_state::RuntimeFilterChannels;
 pub use hash_join_probe_state::HashJoinProbeState;
 pub use hash_join_spiller::HashJoinSpiller;
 pub use hash_join_state::*;
diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs
index da8b50455878f..904af8803524b 100644
--- a/src/query/service/src/pipelines/processors/transforms/mod.rs
+++ b/src/query/service/src/pipelines/processors/transforms/mod.rs
@@ -15,6 +15,7 @@
 pub mod aggregator;
 mod hash_join;
 pub(crate) mod range_join;
+mod runtime_filter;
 mod runtime_pool;
 mod transform_add_computed_columns;
 mod transform_add_const_columns;
@@ -41,6 +42,9 @@ mod transform_udf_server;
 mod window;
 
 pub use hash_join::*;
+pub use runtime_filter::RemoteRuntimeFilters;
+pub use runtime_filter::RuntimeFilterSinkProcessor;
+pub use runtime_filter::RuntimeFilterSourceProcessor;
 pub use transform_add_computed_columns::TransformAddComputedColumns;
 pub use transform_add_const_columns::TransformAddConstColumns;
 pub use transform_add_internal_columns::TransformAddInternalColumns;
diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs
new file mode 100644
index 0000000000000..4f29c19377521
--- /dev/null
+++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs
@@ -0,0 +1,294 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_channel::Receiver;
+use async_channel::Sender;
+use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan;
+use databend_common_catalog::table_context::TableContext;
+use databend_common_exception::ErrorCode;
+use databend_common_exception::Result;
+use databend_common_expression::types::DataType;
+use databend_common_expression::BlockMetaInfo;
+use databend_common_expression::BlockMetaInfoDowncast;
+use databend_common_expression::DataBlock;
+use databend_common_expression::FunctionID;
+use databend_common_expression::RemoteExpr;
+use databend_common_functions::BUILTIN_FUNCTIONS;
+use databend_common_pipeline_core::processors::InputPort;
+use databend_common_pipeline_core::processors::OutputPort;
+use databend_common_pipeline_core::processors::ProcessorPtr;
+use databend_common_pipeline_sinks::AsyncSink;
+use databend_common_pipeline_sinks::AsyncSinker;
+use databend_common_pipeline_sources::AsyncSource;
+use databend_common_pipeline_sources::AsyncSourcer;
+
+pub struct RuntimeFilterSourceProcessor {
+    pub receiver: Receiver<RemoteRuntimeFilters>,
+}
+
+impl RuntimeFilterSourceProcessor {
+    pub fn create(
+        ctx: Arc<dyn TableContext>,
+        receiver: Receiver<RemoteRuntimeFilters>,
+        output_port: Arc<OutputPort>,
+    ) -> Result<ProcessorPtr> {
+        AsyncSourcer::create(ctx, output_port, Self { receiver })
+    }
+}
+
+#[async_trait::async_trait]
+impl AsyncSource for RuntimeFilterSourceProcessor {
+    const NAME: &'static str = "RuntimeFilterSource";
+    const SKIP_EMPTY_DATA_BLOCK: bool = false;
+
+    #[async_backtrace::framed]
+    async fn generate(&mut self) -> Result<Option<DataBlock>> {
+        let start = std::time::Instant::now();
+        log::info!("RuntimeFilterSource recv() start");
+        let rf = self.receiver.recv().await;
+        log::info!(
+            "RuntimeFilterSource recv() take {:?},get {}",
+            start.elapsed(),
+            rf.is_ok()
+        );
+        match rf {
+            Ok(runtime_filter) => Ok(Some(DataBlock::empty_with_meta(Box::new(runtime_filter)))),
+            Err(_) => {
+                // The channel is closed, we should return None to stop generating
+                Ok(None)
+            }
+        }
+    }
+}
+
+pub struct RuntimeFilterSinkProcessor {
+    node_num: usize,
+    recv_num: usize,
+    rf: Vec<RemoteRuntimeFilters>,
+    sender: Sender<RemoteRuntimeFilters>,
+}
+
+impl RuntimeFilterSinkProcessor {
+    pub fn create(
+        input: Arc<InputPort>,
+        node_num: usize,
+        sender: Sender<RemoteRuntimeFilters>,
+    ) -> Result<ProcessorPtr> {
+        Ok(ProcessorPtr::create(AsyncSinker::create(input, Self {
+            node_num,
+            recv_num: 0,
+            rf: vec![],
+            sender,
+        })))
+    }
+}
+
+impl RuntimeFilterSinkProcessor {}
+
+#[async_trait::async_trait]
+impl AsyncSink for RuntimeFilterSinkProcessor {
+    const NAME: &'static str = "RuntimeFilterSink";
+
+    async fn on_finish(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn consume(&mut self, mut data_block: DataBlock) -> Result<bool> {
+        let ptr = data_block
+            .take_meta()
+            .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?;
+        let runtime_filter = RemoteRuntimeFilters::downcast_from(ptr)
+            .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?;
+        log::info!(
+            "RuntimeFilterSinkProcessor recv runtime filter: {:?}",
+            runtime_filter
+        );
+        self.recv_num += 1;
+        self.rf.push(runtime_filter);
+        let all_recv = self.node_num == self.recv_num;
+        if all_recv {
+            let merged_rf = RemoteRuntimeFilters::merge(&self.rf);
+            self.sender.send(merged_rf).await.map_err(|_| {
+                ErrorCode::Internal("RuntimeFilterSinkProcessor failed to send runtime filter")
+            })?;
+        }
+        Ok(all_recv)
+    }
+}
+
+/// One-to-one correspondence with HashJoin operator.
+///
+/// When the build side is empty, `scan_id_to_runtime_filter` is `None`.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
+pub struct RemoteRuntimeFilters {
+    scan_id_to_runtime_filter: Option<HashMap<usize, RemoteRuntimeFiltersForScan>>,
+}
+
+impl From<Option<HashMap<usize, RuntimeFiltersForScan>>> for RemoteRuntimeFilters {
+    fn from(rfs: Option<HashMap<usize, RuntimeFiltersForScan>>) -> Self {
+        RemoteRuntimeFilters {
+            scan_id_to_runtime_filter: rfs.map(|rfs| {
+                rfs.into_iter()
+                    .map(|(scan_id, runtime_filter)| (scan_id, runtime_filter.into()))
+                    .collect()
+            }),
+        }
+    }
+}
+
+impl From<RemoteRuntimeFilters> for Option<HashMap<usize, RuntimeFiltersForScan>> {
+    fn from(rfs: RemoteRuntimeFilters) -> Self {
+        rfs.scan_id_to_runtime_filter.map(|rfs| {
+            rfs.into_iter()
+                .map(|(scan_id, runtime_filter)| (scan_id, runtime_filter.into()))
+                .collect()
+        })
+    }
+}
+
+impl From<RemoteRuntimeFiltersForScan> for RuntimeFiltersForScan {
+    fn from(rfs: RemoteRuntimeFiltersForScan) -> Self {
+        Self {
+            inlist: rfs
+                .rf_id_to_inlist
+                .into_iter()
+                .map(|(id, expr)| (id, expr.as_expr(&BUILTIN_FUNCTIONS)))
+                .collect(),
+            min_max: rfs
+                .rf_id_to_min_max
+                .into_iter()
+                .map(|(id, expr)| (id, expr.as_expr(&BUILTIN_FUNCTIONS)))
+                .collect(),
+            bloom: Default::default(),
+        }
+    }
+}
+
+impl From<RuntimeFiltersForScan> for RemoteRuntimeFiltersForScan {
+    fn from(rfs: RuntimeFiltersForScan) -> Self {
+        Self {
+            rf_id_to_inlist: rfs
+                .inlist
+                .iter()
+                .map(|(id, expr)| (*id, expr.as_remote_expr()))
+                .collect(),
+            rf_id_to_min_max: rfs
+                .min_max
+                .iter()
+                .map(|(id, expr)| (*id, expr.as_remote_expr()))
+                .collect(),
+        }
+    }
+}
+impl RemoteRuntimeFilters {
+    pub fn merge(rfs: &[RemoteRuntimeFilters]) -> Self {
+        log::info!("start merge runtime filters: {:?}", rfs);
+        let rfs = rfs
+            .iter()
+            .filter_map(|rfs| rfs.scan_id_to_runtime_filter.as_ref())
+            .collect::<Vec<_>>();
+
+        if rfs.is_empty() {
+            return RemoteRuntimeFilters::default();
+        }
+
+        let mut common_scans: Vec<usize> = rfs[0].keys().cloned().collect();
+        for rf in &rfs[1..] {
+            common_scans.retain(|scan_id| rf.contains_key(scan_id));
+        }
+
+        let mut merged = HashMap::new();
+
+        for scan_id in common_scans {
+            let mut merged_for_scan = RemoteRuntimeFiltersForScan::default();
+            let first_scan = &rfs[0][&scan_id];
+
+            let mut common_inlist_ids: Vec<usize> =
+                first_scan.rf_id_to_inlist.keys().cloned().collect();
+            let mut common_min_max_ids: Vec<usize> =
+                first_scan.rf_id_to_min_max.keys().cloned().collect();
+            for rf in &rfs[1..] {
+                let scan_filter = &rf[&scan_id];
+                common_inlist_ids.retain(|id| scan_filter.rf_id_to_inlist.contains_key(id));
+                common_min_max_ids.retain(|id| scan_filter.rf_id_to_min_max.contains_key(id));
+            }
+
+            for rf_id in &common_inlist_ids {
+                let mut exprs = Vec::new();
+                for rf in rfs.iter() {
+                    exprs.push(rf[&scan_id].rf_id_to_inlist[rf_id].clone());
+                }
+                log::info!("merge inlist: {:?}, rf_id: {:?}", exprs, rf_id);
+                let merged_expr = exprs
+                    .into_iter()
+                    .reduce(|acc, expr| RemoteExpr::FunctionCall {
+                        span: None,
+                        id: Box::new(FunctionID::Builtin {
+                            name: "or".to_string(),
+                            id: 1,
+                        }),
+                        generics: vec![],
+                        args: vec![acc, expr],
+                        return_type: DataType::Nullable(Box::new(DataType::Boolean)),
+                    })
+                    .unwrap();
+                merged_for_scan.rf_id_to_inlist.insert(*rf_id, merged_expr);
+            }
+
+            for rf_id in &common_min_max_ids {
+                let mut exprs = Vec::new();
+                for rf in rfs.iter() {
+                    exprs.push(rf[&scan_id].rf_id_to_min_max[rf_id].clone());
+                }
+                log::info!("merge min_max: {:?}, rf_id: {:?}", exprs, rf_id);
+                let merged_expr = exprs
+                    .into_iter()
+                    .reduce(|acc, expr| RemoteExpr::FunctionCall {
+                        span: None,
+                        id: Box::new(FunctionID::Builtin {
+                            name: "or".to_string(),
+                            id: 1,
+                        }),
+                        generics: vec![],
+                        args: vec![acc, expr],
+                        return_type: DataType::Nullable(Box::new(DataType::Boolean)),
+                    })
+                    .unwrap();
+                merged_for_scan.rf_id_to_min_max.insert(*rf_id, merged_expr);
+            }
+
+            merged.insert(scan_id, merged_for_scan);
+        }
+
+        RemoteRuntimeFilters {
+            scan_id_to_runtime_filter: Some(merged),
+        }
+    }
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
+pub struct RemoteRuntimeFiltersForScan {
+    rf_id_to_inlist: HashMap<usize, RemoteExpr<String>>,
+    rf_id_to_min_max: HashMap<usize, RemoteExpr<String>>,
+}
+
+#[typetag::serde(name = "runtime_filters_for_join")]
+impl BlockMetaInfo for RemoteRuntimeFilters {
+    fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
+        Box::new(self.clone())
+    }
+}
diff --git a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs
index 126ae6c959b5c..4516180f93220 100644
--- a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs
+++ b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs
@@ -351,7 +351,9 @@ async fn create_memory_table_for_cte_scan(
         | PhysicalPlan::ChunkFillAndReorder(_)
         | PhysicalPlan::ChunkAppendData(_)
         | PhysicalPlan::ChunkMerge(_)
-        | PhysicalPlan::ChunkCommitInsert(_) => {}
+        | PhysicalPlan::ChunkCommitInsert(_)
+        | PhysicalPlan::RuntimeFilterSource(_)
+        | PhysicalPlan::RuntimeFilterSink(_) => {}
     }
     Ok(())
 }
diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs
index a205a4ec6e963..161117159256d 100644
--- a/src/query/service/src/schedulers/fragments/fragmenter.rs
+++ b/src/query/service/src/schedulers/fragments/fragmenter.rs
@@ -123,9 +123,13 @@ impl Fragmenter {
 
     pub fn build_fragment(mut self, plan: &PhysicalPlan) -> Result<PlanFragment> {
         let root = self.replace(plan)?;
+        let fragment_type = match plan {
+            PhysicalPlan::RuntimeFilterSink(_) => FragmentType::Intermediate,
+            _ => FragmentType::Root,
+        };
         let mut root_fragment = PlanFragment {
             plan: root,
-            fragment_type: FragmentType::Root,
+            fragment_type,
             fragment_id: self.ctx.get_fragment_id(),
             exchange: None,
             query_id: self.query_id.clone(),
@@ -218,6 +222,11 @@ impl PhysicalPlanReplacer for Fragmenter {
         fragments.append(&mut self.fragments);
         let probe_input = self.replace(plan.probe.as_ref())?;
         fragments.append(&mut self.fragments);
+
+        let runtime_filter_plan = match &plan.runtime_filter_plan {
+            Some(runtime_filter_plan) => Some(Box::new(self.replace(runtime_filter_plan)?)),
+            None => None,
+        };
         self.fragments = fragments;
 
         Ok(PhysicalPlan::HashJoin(HashJoin {
@@ -241,7 +250,9 @@ impl PhysicalPlanReplacer for Fragmenter {
             broadcast: plan.broadcast,
             single_to_inner: plan.single_to_inner.clone(),
             build_side_cache_info: plan.build_side_cache_info.clone(),
-            runtime_filter: plan.runtime_filter.clone(),
+            runtime_filter_desc: plan.runtime_filter_desc.clone(),
+            runtime_filter_plan,
+            join_id: plan.join_id,
         }))
     }
 
diff --git a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs
index dece1fdfb21c5..9fa10c1585bfc 100644
--- a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs
+++ b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs
@@ -126,12 +126,16 @@ impl QueryFragmentsActions {
         self.ctx.get_cluster().local_id()
     }
 
-    pub fn get_root_actions(&self) -> Result<&QueryFragmentActions> {
-        self.fragments_actions.last().ok_or_else(|| {
-            ErrorCode::Internal(
-                "Logical error, call get_root_actions in empty QueryFragmentsActions",
-            )
-        })
+    pub fn get_root_fragment_ids(&self) -> Result<Vec<usize>> {
+        let mut fragment_ids = Vec::new();
+        for fragment_actions in &self.fragments_actions {
+            let plan = &fragment_actions.fragment_actions[0].physical_plan;
+            if !matches!(plan, PhysicalPlan::ExchangeSink(_)) {
+                fragment_ids.push(fragment_actions.fragment_id);
+            }
+        }
+
+        Ok(fragment_ids)
     }
 
     pub fn pop_root_actions(&mut self) -> Option<QueryFragmentActions> {
diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs
index f3259edae81c6..6a6224d788341 100644
--- a/src/query/service/src/schedulers/scheduler.rs
+++ b/src/query/service/src/schedulers/scheduler.rs
@@ -99,11 +99,15 @@ pub async fn build_distributed_pipeline(
     ctx: &Arc<QueryContext>,
     plan: &PhysicalPlan,
 ) -> Result<PipelineBuildResult> {
-    let fragmenter = Fragmenter::try_create(ctx.clone())?;
-
-    let root_fragment = fragmenter.build_fragment(plan)?;
     let mut fragments_actions = QueryFragmentsActions::create(ctx.clone());
-    root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?;
+    for plan in collect_runtime_filter_broadcast_plans(plan)?
+        .iter()
+        .chain(std::iter::once(plan))
+    {
+        let fragmenter = Fragmenter::try_create(ctx.clone())?;
+        let root_fragment = fragmenter.build_fragment(plan)?;
+        root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?;
+    }
 
     let exchange_manager = ctx.get_exchange_manager();
 
@@ -123,6 +127,27 @@ pub async fn build_distributed_pipeline(
     }
 }
 
+fn collect_runtime_filter_broadcast_plans(plan: &PhysicalPlan) -> Result<Vec<PhysicalPlan>> {
+    let mut runtime_filter_broadcast_plans = Vec::new();
+
+    let mut collect_runtime_filter_broadcast_plans = |plan: &PhysicalPlan| {
+        if let PhysicalPlan::HashJoin(hash_join) = plan {
+            if let Some(runtime_filter_plan) = &hash_join.runtime_filter_plan {
+                runtime_filter_broadcast_plans.push(runtime_filter_plan.as_ref().clone());
+            }
+        }
+    };
+
+    PhysicalPlan::traverse(
+        plan,
+        &mut |_| true,
+        &mut collect_runtime_filter_broadcast_plans,
+        &mut |_| {},
+    );
+
+    Ok(runtime_filter_broadcast_plans)
+}
+
 pub struct ServiceQueryExecutor {
     ctx: Arc<QueryContext>,
 }
diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs
index 1eb5adb336efd..64b0e23c23617 100644
--- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs
+++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs
@@ -57,7 +57,6 @@ use crate::pipelines::executor::ExecutorSettings;
 use crate::pipelines::executor::PipelineCompleteExecutor;
 use crate::pipelines::PipelineBuildResult;
 use crate::pipelines::PipelineBuilder;
-use crate::schedulers::QueryFragmentActions;
 use crate::schedulers::QueryFragmentsActions;
 use crate::servers::flight::v1::actions::init_query_fragments;
 use crate::servers::flight::v1::actions::INIT_QUERY_FRAGMENTS;
@@ -443,7 +442,7 @@ impl DataExchangeManager {
             retry_times: settings.get_flight_max_retry_times()?,
             retry_interval: settings.get_flight_retry_interval()?,
         };
-        let root_actions = actions.get_root_actions()?;
+        let mut root_fragment_ids = actions.get_root_fragment_ids()?;
         let conf = GlobalConfig::instance();
 
         // Initialize query env between cluster nodes
@@ -466,7 +465,8 @@ impl DataExchangeManager {
         }
 
         // Get local pipeline of local task
-        let build_res = self.get_root_pipeline(ctx, root_actions)?;
+        let main_fragment_id = root_fragment_ids.pop().unwrap();
+        let build_res = self.get_root_pipeline(ctx, main_fragment_id, root_fragment_ids)?;
 
         let prepared_query = actions.prepared_query()?;
         let _: HashMap<String, ()> = cluster
@@ -479,10 +479,10 @@ impl DataExchangeManager {
     fn get_root_pipeline(
         &self,
         ctx: Arc<QueryContext>,
-        root_actions: &QueryFragmentActions,
+        main_fragment_id: usize,
+        fragment_ids: Vec<usize>,
     ) -> Result<PipelineBuildResult> {
         let query_id = ctx.get_id();
-        let fragment_id = root_actions.fragment_id;
 
         let queries_coordinator_guard = self.queries_coordinator.lock();
         let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };
@@ -490,10 +490,34 @@ impl DataExchangeManager {
         match queries_coordinator.get_mut(&query_id) {
             None => Err(ErrorCode::Internal("Query not exists.")),
             Some(query_coordinator) => {
-                assert!(query_coordinator.fragment_exchanges.is_empty());
+                assert!(
+                    query_coordinator.fragment_exchanges.is_empty(),
+                    "query_coordinator.fragment_exchanges is not empty: {:?}",
+                    query_coordinator
+                        .fragment_exchanges
+                        .keys()
+                        .collect::<Vec<_>>()
+                );
                 let injector = DefaultExchangeInjector::create();
-                let mut build_res =
-                    query_coordinator.subscribe_fragment(&ctx, fragment_id, injector)?;
+                let mut build_res = query_coordinator.subscribe_fragment(
+                    &ctx,
+                    main_fragment_id,
+                    injector.clone(),
+                )?;
+
+                for fragment_id in fragment_ids {
+                    let sub_build_res = query_coordinator.subscribe_fragment(
+                        &ctx,
+                        fragment_id,
+                        injector.clone(),
+                    )?;
+                    build_res
+                        .sources_pipelines
+                        .push(sub_build_res.main_pipeline);
+                    build_res
+                        .sources_pipelines
+                        .extend(sub_build_res.sources_pipelines);
+                }
 
                 let exchanges = std::mem::take(&mut query_coordinator.statistics_exchanges);
                 let statistics_receiver = StatisticsReceiver::spawn_receiver(&ctx, exchanges)?;
@@ -775,16 +799,18 @@ impl QueryCoordinator {
                 return Ok(fragment_coordinator.pipeline_build_res.unwrap());
             }
 
-            let exchange_params = fragment_coordinator.create_exchange_params(
-                info,
-                fragment_coordinator
-                    .pipeline_build_res
-                    .as_ref()
-                    .map(|x| x.exchange_injector.clone())
-                    .ok_or_else(|| {
-                        ErrorCode::Internal("Pipeline build result is none, It's a bug")
-                    })?,
-            )?;
+            let exchange_params = fragment_coordinator
+                .create_exchange_params(
+                    info,
+                    fragment_coordinator
+                        .pipeline_build_res
+                        .as_ref()
+                        .map(|x| x.exchange_injector.clone())
+                        .ok_or_else(|| {
+                            ErrorCode::Internal("Pipeline build result is none, It's a bug")
+                        })?,
+                )?
+                .unwrap();
             let mut build_res = fragment_coordinator.pipeline_build_res.unwrap();
 
             // Add exchange data transform.
@@ -854,12 +880,19 @@ impl QueryCoordinator {
             if let Some(mut build_res) = coordinator.pipeline_build_res.take() {
                 build_res.set_max_threads(max_threads as usize);
 
-                if !build_res.main_pipeline.is_pulling_pipeline()? {
-                    return Err(ErrorCode::Internal("Logical error, It's a bug"));
-                }
-
-                // Add exchange data publisher.
-                ExchangeSink::via(&info.query_ctx, &params, &mut build_res.main_pipeline)?;
+                if build_res.main_pipeline.is_pulling_pipeline()? {
+                    let Some(params) = params else {
+                        return Err(ErrorCode::Internal(
+                            "pipeline is pulling pipeline, but exchange params is none",
+                        ));
+                    };
+                    // Add exchange data publisher.
+                    ExchangeSink::via(&info.query_ctx, &params, &mut build_res.main_pipeline)?;
+                } else if build_res.main_pipeline.is_complete_pipeline()? && params.is_some() {
+                    return Err(ErrorCode::Internal(
+                        "pipeline is complete pipeline, but exchange params is some",
+                    ));
+                };
 
                 if !build_res.main_pipeline.is_complete_pipeline()? {
                     return Err(ErrorCode::Internal("Logical error, It's a bug"));
@@ -938,48 +971,47 @@ impl FragmentCoordinator {
         &self,
         info: &QueryInfo,
         exchange_injector: Arc<dyn ExchangeInjector>,
-    ) -> Result<ExchangeParams> {
-        if let Some(data_exchange) = &self.data_exchange {
-            return match data_exchange {
-                DataExchange::Merge(exchange) => {
-                    Ok(ExchangeParams::MergeExchange(MergeExchangeParams {
-                        exchange_injector: exchange_injector.clone(),
-                        schema: self.physical_plan.output_schema()?,
-                        fragment_id: self.fragment_id,
-                        query_id: info.query_id.to_string(),
-                        destination_id: exchange.destination_id.clone(),
-                        allow_adjust_parallelism: exchange.allow_adjust_parallelism,
-                        ignore_exchange: exchange.ignore_exchange,
-                    }))
-                }
-                DataExchange::Broadcast(exchange) => {
-                    Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
-                        exchange_injector: exchange_injector.clone(),
-                        schema: self.physical_plan.output_schema()?,
-                        fragment_id: self.fragment_id,
-                        query_id: info.query_id.to_string(),
-                        executor_id: info.current_executor.to_string(),
-                        destination_ids: exchange.destination_ids.to_owned(),
-                        shuffle_scatter: exchange_injector
-                            .flight_scatter(&info.query_ctx, data_exchange)?,
-                    }))
-                }
-                DataExchange::ShuffleDataExchange(exchange) => {
-                    Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
-                        exchange_injector: exchange_injector.clone(),
-                        schema: self.physical_plan.output_schema()?,
-                        fragment_id: self.fragment_id,
-                        query_id: info.query_id.to_string(),
-                        executor_id: info.current_executor.to_string(),
-                        destination_ids: exchange.destination_ids.to_owned(),
-                        shuffle_scatter: exchange_injector
-                            .flight_scatter(&info.query_ctx, data_exchange)?,
-                    }))
-                }
-            };
+    ) -> Result<Option<ExchangeParams>> {
+        let Some(data_exchange) = &self.data_exchange else {
+            return Ok(None);
+        };
+        match data_exchange {
+            DataExchange::Merge(exchange) => {
+                Ok(Some(ExchangeParams::MergeExchange(MergeExchangeParams {
+                    exchange_injector: exchange_injector.clone(),
+                    schema: self.physical_plan.output_schema()?,
+                    fragment_id: self.fragment_id,
+                    query_id: info.query_id.to_string(),
+                    destination_id: exchange.destination_id.clone(),
+                    allow_adjust_parallelism: exchange.allow_adjust_parallelism,
+                    ignore_exchange: exchange.ignore_exchange,
+                })))
+            }
+            DataExchange::Broadcast(exchange) => Ok(Some(ExchangeParams::ShuffleExchange(
+                ShuffleExchangeParams {
+                    exchange_injector: exchange_injector.clone(),
+                    schema: self.physical_plan.output_schema()?,
+                    fragment_id: self.fragment_id,
+                    query_id: info.query_id.to_string(),
+                    executor_id: info.current_executor.to_string(),
+                    destination_ids: exchange.destination_ids.to_owned(),
+                    shuffle_scatter: exchange_injector
+                        .flight_scatter(&info.query_ctx, data_exchange)?,
+                },
+            ))),
+            DataExchange::ShuffleDataExchange(exchange) => Ok(Some(
+                ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
+                    exchange_injector: exchange_injector.clone(),
+                    schema: self.physical_plan.output_schema()?,
+                    fragment_id: self.fragment_id,
+                    query_id: info.query_id.to_string(),
+                    executor_id: info.current_executor.to_string(),
+                    destination_ids: exchange.destination_ids.to_owned(),
+                    shuffle_scatter: exchange_injector
+                        .flight_scatter(&info.query_ctx, data_exchange)?,
+                }),
+            )),
         }
-
-        Err(ErrorCode::Internal("Cannot find data exchange."))
     }
 
     pub fn prepare_pipeline(&mut self, ctx: Arc<QueryContext>) -> Result<()> {
diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs
index abebc2ba6a254..91bdf774f9bf8 100644
--- a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs
+++ b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs
@@ -70,10 +70,10 @@ impl AsyncSink for ExchangeWriterSink {
     async fn consume(&mut self, mut data_block: DataBlock) -> Result<bool> {
         let serialize_meta = match data_block.take_meta() {
             None => Err(ErrorCode::Internal(
-                "ExchangeWriterSink only recv ExchangeSerializeMeta.",
+                "ExchangeWriterSink only recv ExchangeSerializeMeta, but got none.",
             )),
             Some(block_meta) => ExchangeSerializeMeta::downcast_from(block_meta).ok_or_else(|| {
-                ErrorCode::Internal("ExchangeWriterSink only recv ExchangeSerializeMeta.")
+                ErrorCode::Internal("ExchangeWriterSink only recv ExchangeSerializeMeta")
             }),
         }?;
 
diff --git a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs
index 5a757f37ba299..11d17ff6641fe 100644
--- a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs
+++ b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs
@@ -159,11 +159,6 @@ impl BlockMetaTransform<ExchangeShuffleMeta> for TransformScatterExchangeSeriali
     fn transform(&mut self, meta: ExchangeShuffleMeta) -> Result<Vec<DataBlock>> {
         let mut new_blocks = Vec::with_capacity(meta.blocks.len());
         for (index, block) in meta.blocks.into_iter().enumerate() {
-            if block.is_empty() {
-                new_blocks.push(block);
-                continue;
-            }
-
             new_blocks.push(match self.local_pos == index {
                 true => block,
                 false => serialize_block(0, block, &self.options)?,
diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs
index 458882d1561df..e44a7ad6c3914 100644
--- a/src/query/service/src/sessions/query_ctx.rs
+++ b/src/query/service/src/sessions/query_ctx.rs
@@ -28,6 +28,8 @@ use std::time::Instant;
 use std::time::SystemTime;
 use std::time::UNIX_EPOCH;
 
+use async_channel::Receiver;
+use async_channel::Sender;
 use chrono_tz::Tz;
 use dashmap::mapref::multiple::RefMulti;
 use dashmap::DashMap;
@@ -52,8 +54,8 @@ use databend_common_catalog::plan::PartStatistics;
 use databend_common_catalog::plan::Partitions;
 use databend_common_catalog::plan::StageTableInfo;
 use databend_common_catalog::query_kind::QueryKind;
-use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
 use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
+use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan;
 use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
 use databend_common_catalog::table_args::TableArgs;
 use databend_common_catalog::table_context::ContextError;
@@ -135,6 +137,7 @@ use crate::clusters::Cluster;
 use crate::clusters::ClusterHelper;
 use crate::locks::LockManager;
 use crate::pipelines::executor::PipelineExecutor;
+use crate::pipelines::processors::transforms::RemoteRuntimeFilters;
 use crate::servers::flight::v1::exchange::DataExchangeManager;
 use crate::sessions::query_affect::QueryAffect;
 use crate::sessions::query_ctx_shared::MemoryUpdater;
@@ -281,6 +284,22 @@ impl QueryContext {
         }
     }
 
+    pub fn rf_src_recv(&self, join_id: u32) -> Receiver<RemoteRuntimeFilters> {
+        self.shared.rf_src_recv(join_id)
+    }
+
+    pub fn rf_src_send(&self, join_id: u32) -> Sender<RemoteRuntimeFilters> {
+        self.shared.rf_src_send(join_id)
+    }
+
+    pub fn rf_sink_recv(&self, join_id: u32) -> Receiver<RemoteRuntimeFilters> {
+        self.shared.rf_sink_recv(join_id)
+    }
+
+    pub fn rf_sink_send(&self, join_id: u32) -> Sender<RemoteRuntimeFilters> {
+        self.shared.rf_sink_send(join_id)
+    }
+
     pub fn attach_table(&self, catalog: &str, database: &str, name: &str, table: Arc<dyn Table>) {
         self.shared.attach_table(catalog, database, name, table)
     }
@@ -1381,21 +1400,21 @@ impl TableContext for QueryContext {
         runtime_filters.clear();
     }
 
-    fn set_runtime_filter(&self, filters: (IndexType, RuntimeFilterInfo)) {
+    fn set_runtime_filter(&self, filters: (IndexType, RuntimeFiltersForScan)) {
         let mut runtime_filters = self.shared.runtime_filters.write();
         match runtime_filters.entry(filters.0) {
             Entry::Vacant(v) => {
                 v.insert(filters.1);
             }
             Entry::Occupied(mut v) => {
-                for filter in filters.1.get_inlist() {
-                    v.get_mut().add_inlist(filter.clone());
+                for (rf_id, filter) in filters.1.inlist.into_iter() {
+                    v.get_mut().add_inlist(rf_id, filter);
                 }
-                for filter in filters.1.get_min_max() {
-                    v.get_mut().add_min_max(filter.clone());
+                for (rf_id, filter) in filters.1.min_max.into_iter() {
+                    v.get_mut().add_min_max(rf_id, filter);
                 }
-                for filter in filters.1.blooms() {
-                    v.get_mut().add_bloom(filter);
+                for (rf_id, filter) in filters.1.bloom.into_iter() {
+                    v.get_mut().add_bloom(rf_id, filter);
                 }
             }
         }
@@ -1446,7 +1465,7 @@ impl TableContext for QueryContext {
     fn get_bloom_runtime_filter_with_id(&self, id: IndexType) -> Vec<(String, BinaryFuse16)> {
         let runtime_filters = self.shared.runtime_filters.read();
         match runtime_filters.get(&id) {
-            Some(v) => (v.get_bloom()).clone(),
+            Some(v) => v.bloom.values().cloned().collect(),
             None => vec![],
         }
     }
@@ -1454,7 +1473,7 @@ impl TableContext for QueryContext {
     fn get_inlist_runtime_filter_with_id(&self, id: IndexType) -> Vec<Expr<String>> {
         let runtime_filters = self.shared.runtime_filters.read();
         match runtime_filters.get(&id) {
-            Some(v) => (v.get_inlist()).clone(),
+            Some(v) => v.inlist.values().cloned().collect(),
             None => vec![],
         }
     }
@@ -1462,14 +1481,14 @@ impl TableContext for QueryContext {
     fn get_min_max_runtime_filter_with_id(&self, id: IndexType) -> Vec<Expr<String>> {
         let runtime_filters = self.shared.runtime_filters.read();
         match runtime_filters.get(&id) {
-            Some(v) => (v.get_min_max()).clone(),
+            Some(v) => v.min_max.values().cloned().collect(),
             None => vec![],
         }
     }
 
     fn has_bloom_runtime_filters(&self, id: usize) -> bool {
         if let Some(runtime_filter) = self.shared.runtime_filters.read().get(&id) {
-            return !runtime_filter.get_bloom().is_empty();
+            return !runtime_filter.bloom.is_empty();
         }
         false
     }
diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs
index 2ab2f3c864d8f..6d74defe45344 100644
--- a/src/query/service/src/sessions/query_ctx_shared.rs
+++ b/src/query/service/src/sessions/query_ctx_shared.rs
@@ -23,6 +23,8 @@ use std::sync::Weak;
 use std::time::Duration;
 use std::time::SystemTime;
 
+use async_channel::Receiver;
+use async_channel::Sender;
 use dashmap::DashMap;
 use databend_common_base::base::short_sql;
 use databend_common_base::base::Progress;
@@ -35,8 +37,8 @@ use databend_common_catalog::catalog::CatalogManager;
 use databend_common_catalog::merge_into_join::MergeIntoJoin;
 use databend_common_catalog::plan::PartStatistics;
 use databend_common_catalog::query_kind::QueryKind;
-use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
 use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
+use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan;
 use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
 use databend_common_catalog::table_context::ContextError;
 use databend_common_catalog::table_context::StageAttachment;
@@ -68,6 +70,7 @@ use uuid::Uuid;
 use crate::clusters::Cluster;
 use crate::clusters::ClusterDiscovery;
 use crate::pipelines::executor::PipelineExecutor;
+use crate::pipelines::processors::transforms::RemoteRuntimeFilters;
 use crate::sessions::query_affect::QueryAffect;
 use crate::sessions::Session;
 use crate::storages::Table;
@@ -144,7 +147,7 @@ pub struct QueryContextShared {
 
     pub(in crate::sessions) query_profiles: Arc<RwLock<HashMap<Option<u32>, PlanProfile>>>,
 
-    pub(in crate::sessions) runtime_filters: Arc<RwLock<HashMap<IndexType, RuntimeFilterInfo>>>,
+    pub(in crate::sessions) runtime_filters: Arc<RwLock<HashMap<IndexType, RuntimeFiltersForScan>>>,
 
     pub(in crate::sessions) runtime_filter_ready:
         Arc<RwLock<HashMap<IndexType, Vec<Arc<RuntimeFilterReady>>>>>,
@@ -169,9 +172,19 @@ pub struct QueryContextShared {
     // Used by hilbert clustering when do recluster.
     pub(in crate::sessions) selected_segment_locs: Arc<RwLock<HashSet<Location>>>,
 
+    // join_id -> (sender, receiver)
+    pub(in crate::sessions) rf_source: Arc<Mutex<HashMap<u32, RuntimeFilterSourceChannel>>>,
+
+    pub(in crate::sessions) rf_sink: Arc<Mutex<HashMap<u32, RuntimeFilterSourceChannel>>>,
+
     pub(in crate::sessions) pruned_partitions_stats: Arc<RwLock<Option<PartStatistics>>>,
 }
 
+type RuntimeFilterSourceChannel = (
+    Option<Sender<RemoteRuntimeFilters>>,
+    Option<Receiver<RemoteRuntimeFilters>>,
+);
+
 impl QueryContextShared {
     pub fn try_create(
         session: Arc<Session>,
@@ -235,10 +248,58 @@ impl QueryContextShared {
             mem_stat: Arc::new(RwLock::new(None)),
             node_memory_usage: Arc::new(RwLock::new(HashMap::new())),
             selected_segment_locs: Default::default(),
+            rf_source: Arc::new(Mutex::new(HashMap::new())),
+            rf_sink: Arc::new(Mutex::new(HashMap::new())),
             pruned_partitions_stats: Arc::new(RwLock::new(None)),
         }))
     }
 
+    pub fn rf_src_recv(&self, join_id: u32) -> Receiver<RemoteRuntimeFilters> {
+        let mut rf_source = self.rf_source.lock();
+        match rf_source.get_mut(&join_id).map(|(_, receiver)| receiver) {
+            Some(receiver) => receiver.take().unwrap(),
+            None => {
+                let (sender, receiver) = async_channel::unbounded();
+                rf_source.insert(join_id, (Some(sender), None));
+                receiver
+            }
+        }
+    }
+    pub fn rf_src_send(&self, join_id: u32) -> Sender<RemoteRuntimeFilters> {
+        let mut rf_source = self.rf_source.lock();
+        match rf_source.get_mut(&join_id).map(|(sender, _)| sender) {
+            Some(sender) => sender.take().unwrap(),
+            None => {
+                let (sender, receiver) = async_channel::unbounded();
+                rf_source.insert(join_id, (None, Some(receiver)));
+                sender
+            }
+        }
+    }
+
+    pub fn rf_sink_recv(&self, join_id: u32) -> Receiver<RemoteRuntimeFilters> {
+        let mut rf_sink = self.rf_sink.lock();
+        match rf_sink.get_mut(&join_id).map(|(_, receiver)| receiver) {
+            Some(receiver) => receiver.take().unwrap(),
+            None => {
+                let (sender, receiver) = async_channel::unbounded();
+                rf_sink.insert(join_id, (Some(sender), None));
+                receiver
+            }
+        }
+    }
+    pub fn rf_sink_send(&self, join_id: u32) -> Sender<RemoteRuntimeFilters> {
+        let mut rf_sink = self.rf_sink.lock();
+        match rf_sink.get_mut(&join_id).map(|(sender, _)| sender) {
+            Some(sender) => sender.take().unwrap(),
+            None => {
+                let (sender, receiver) = async_channel::unbounded();
+                rf_sink.insert(join_id, (None, Some(receiver)));
+                sender
+            }
+        }
+    }
+
     pub fn set_error<C>(&self, err: ErrorCode<C>) {
         let err = err.with_context("query context error");
 
diff --git a/src/query/service/tests/it/pipelines/builders/runtime_filter.rs b/src/query/service/tests/it/pipelines/builders/runtime_filter.rs
index ce1e81871f3f1..b739612ea5a44 100644
--- a/src/query/service/tests/it/pipelines/builders/runtime_filter.rs
+++ b/src/query/service/tests/it/pipelines/builders/runtime_filter.rs
@@ -100,6 +100,7 @@ async fn join_build_state(
         &join.build_projections,
         join_state.clone(),
         1,
+        None,
     )?;
     Ok(build_state)
 }
diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs
index 0c2bebe146f42..272c96e97b748 100644
--- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs
+++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs
@@ -31,8 +31,8 @@ use databend_common_catalog::plan::DataSourcePlan;
 use databend_common_catalog::plan::PartInfoPtr;
 use databend_common_catalog::plan::Partitions;
 use databend_common_catalog::query_kind::QueryKind;
-use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
 use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
+use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan;
 use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
 use databend_common_catalog::table::Table;
 use databend_common_catalog::table_context::ContextError;
@@ -924,7 +924,7 @@ impl TableContext for CtxDelegation {
         todo!()
     }
 
-    fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFilterInfo)) {
+    fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFiltersForScan)) {
         todo!()
     }
 
diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs
index e41d6c7c2fb73..eeed126767ae1 100644
--- a/src/query/service/tests/it/storages/fuse/operations/commit.rs
+++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs
@@ -30,8 +30,8 @@ use databend_common_catalog::plan::DataSourcePlan;
 use databend_common_catalog::plan::PartInfoPtr;
 use databend_common_catalog::plan::Partitions;
 use databend_common_catalog::query_kind::QueryKind;
-use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
 use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
+use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan;
 use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
 use databend_common_catalog::table::Table;
 use databend_common_catalog::table_context::ContextError;
@@ -789,7 +789,7 @@ impl TableContext for CtxDelegation {
         todo!()
     }
 
-    fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFilterInfo)) {
+    fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFiltersForScan)) {
         todo!()
     }
 
diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs
index b7166308a7855..c0d37da3cf022 100644
--- a/src/query/sql/src/executor/format.rs
+++ b/src/query/sql/src/executor/format.rs
@@ -26,7 +26,7 @@ use databend_common_pipeline_core::processors::PlanProfile;
 use itertools::Itertools;
 
 use super::physical_plans::AddStreamColumn;
-use super::PhysicalRuntimeFilter;
+use super::RemoteRuntimeFilterDesc;
 use crate::binder::MutationType;
 use crate::executor::explain::PlanStatsInfo;
 use crate::executor::physical_plans::AggregateExpand;
@@ -340,7 +340,7 @@ pub fn format_partial_tree(
 }
 
 struct FormatContext {
-    scan_id_to_runtime_filters: HashMap<IndexType, Vec<PhysicalRuntimeFilter>>,
+    scan_id_to_runtime_filters: HashMap<IndexType, Vec<RemoteRuntimeFilterDesc>>,
 }
 
 #[recursive::recursive]
@@ -521,6 +521,12 @@ fn to_format_tree(
         PhysicalPlan::AsyncFunction(plan) => {
             async_function_to_format_tree(plan, metadata, profs, context)
         }
+        PhysicalPlan::RuntimeFilterSource(_plan) => {
+            Ok(FormatTreeNode::new("RuntimeFilterSource".to_string()))
+        }
+        PhysicalPlan::RuntimeFilterSink(_plan) => {
+            Ok(FormatTreeNode::new("RuntimeFilterSink".to_string()))
+        }
     }
 }
 
@@ -1517,7 +1523,7 @@ fn hash_join_to_format_tree(
     profs: &HashMap<u32, PlanProfile>,
     context: &mut FormatContext,
 ) -> Result<FormatTreeNode<String>> {
-    for rf in plan.runtime_filter.filters.iter() {
+    for rf in plan.runtime_filter_desc.filters.iter() {
         context
             .scan_id_to_runtime_filters
             .entry(rf.scan_id)
@@ -1551,7 +1557,7 @@ fn hash_join_to_format_tree(
     probe_child.payload = format!("{}(Probe)", probe_child.payload);
 
     let mut build_runtime_filters = vec![];
-    for rf in plan.runtime_filter.filters.iter() {
+    for rf in plan.runtime_filter_desc.filters.iter() {
         let mut s = format!(
             "filter id:{}, build key:{}, probe key:{}, filter type:",
             rf.id,
diff --git a/src/query/sql/src/executor/mod.rs b/src/query/sql/src/executor/mod.rs
index 73f2b65710b82..72925a854d6a8 100644
--- a/src/query/sql/src/executor/mod.rs
+++ b/src/query/sql/src/executor/mod.rs
@@ -27,6 +27,6 @@ pub use physical_plan::PhysicalPlan;
 pub use physical_plan_builder::MutationBuildInfo;
 pub use physical_plan_builder::PhysicalPlanBuilder;
 pub use physical_plan_visitor::PhysicalPlanReplacer;
-pub use physical_plans::PhysicalRuntimeFilter;
-pub use physical_plans::PhysicalRuntimeFilters;
+pub use physical_plans::RemoteRuntimeFilterDesc;
+pub use physical_plans::RemoteRuntimeFiltersDesc;
 pub use util::*;
diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs
index 0c81455aa489f..bc7191c62cc56 100644
--- a/src/query/sql/src/executor/physical_plan.rs
+++ b/src/query/sql/src/executor/physical_plan.rs
@@ -31,6 +31,8 @@ use super::physical_plans::MutationManipulate;
 use super::physical_plans::MutationOrganize;
 use super::physical_plans::MutationSource;
 use super::physical_plans::MutationSplit;
+use super::physical_plans::RuntimeFilterSink;
+use super::physical_plans::RuntimeFilterSource;
 use crate::executor::physical_plans::AggregateExpand;
 use crate::executor::physical_plans::AggregateFinal;
 use crate::executor::physical_plans::AggregatePartial;
@@ -155,6 +157,10 @@ pub enum PhysicalPlan {
 
     // async function call
     AsyncFunction(AsyncFunction),
+
+    // runtime filter
+    RuntimeFilterSource(RuntimeFilterSource),
+    RuntimeFilterSink(RuntimeFilterSink),
 }
 
 impl PhysicalPlan {
@@ -407,6 +413,15 @@ impl PhysicalPlan {
                 *next_id += 1;
                 plan.input.adjust_plan_id(next_id);
             }
+            PhysicalPlan::RuntimeFilterSource(plan) => {
+                plan.plan_id = *next_id;
+                *next_id += 1;
+            }
+            PhysicalPlan::RuntimeFilterSink(plan) => {
+                plan.plan_id = *next_id;
+                *next_id += 1;
+                plan.input.adjust_plan_id(next_id);
+            }
         }
     }
 
@@ -463,6 +478,8 @@ impl PhysicalPlan {
             PhysicalPlan::ChunkMerge(v) => v.plan_id,
             PhysicalPlan::ChunkCommitInsert(v) => v.plan_id,
             PhysicalPlan::RecursiveCteScan(v) => v.plan_id,
+            PhysicalPlan::RuntimeFilterSource(v) => v.plan_id,
+            PhysicalPlan::RuntimeFilterSink(v) => v.plan_id,
         }
     }
 
@@ -508,6 +525,8 @@ impl PhysicalPlan {
             | PhysicalPlan::CommitSink(_)
             | PhysicalPlan::DistributedInsertSelect(_)
             | PhysicalPlan::Recluster(_)
+            | PhysicalPlan::RuntimeFilterSource(_)
+            | PhysicalPlan::RuntimeFilterSink(_)
             | PhysicalPlan::HilbertPartition(_) => Ok(DataSchemaRef::default()),
             PhysicalPlan::Duplicate(plan) => plan.input.output_schema(),
             PhysicalPlan::Shuffle(plan) => plan.input.output_schema(),
@@ -579,6 +598,8 @@ impl PhysicalPlan {
             PhysicalPlan::ChunkAppendData(_) => "WriteData".to_string(),
             PhysicalPlan::ChunkMerge(_) => "ChunkMerge".to_string(),
             PhysicalPlan::ChunkCommitInsert(_) => "Commit".to_string(),
+            PhysicalPlan::RuntimeFilterSource(_) => "RuntimeFilterSource".to_string(),
+            PhysicalPlan::RuntimeFilterSink(_) => "RuntimeFilterSink".to_string(),
         }
     }
 
@@ -591,7 +612,8 @@ impl PhysicalPlan {
             | PhysicalPlan::CompactSource(_)
             | PhysicalPlan::ReplaceAsyncSourcer(_)
             | PhysicalPlan::Recluster(_)
-            | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()),
+            | PhysicalPlan::RecursiveCteScan(_)
+            | PhysicalPlan::RuntimeFilterSource(_) => Box::new(std::iter::empty()),
             PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_ref())),
             PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())),
             PhysicalPlan::EvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())),
@@ -609,6 +631,7 @@ impl PhysicalPlan {
             PhysicalPlan::ExpressionScan(plan) => Box::new(std::iter::once(plan.input.as_ref())),
             PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())),
             PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())),
+            PhysicalPlan::RuntimeFilterSink(plan) => Box::new(std::iter::once(plan.input.as_ref())),
             PhysicalPlan::UnionAll(plan) => Box::new(
                 std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())),
             ),
@@ -663,6 +686,7 @@ impl PhysicalPlan {
             | PhysicalPlan::CompactSource(_)
             | PhysicalPlan::ReplaceAsyncSourcer(_)
             | PhysicalPlan::Recluster(_)
+            | PhysicalPlan::RuntimeFilterSource(_)
             | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()),
             PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_mut())),
             PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_mut())),
@@ -723,6 +747,7 @@ impl PhysicalPlan {
                 CopyIntoTableSource::Query(v) => Box::new(std::iter::once(v.as_mut())),
                 CopyIntoTableSource::Stage(v) => Box::new(std::iter::once(v.as_mut())),
             },
+            PhysicalPlan::RuntimeFilterSink(plan) => Box::new(std::iter::once(plan.input.as_mut())),
         }
     }
 
@@ -778,7 +803,9 @@ impl PhysicalPlan {
             | PhysicalPlan::ChunkFillAndReorder(_)
             | PhysicalPlan::ChunkAppendData(_)
             | PhysicalPlan::ChunkMerge(_)
-            | PhysicalPlan::ChunkCommitInsert(_) => None,
+            | PhysicalPlan::ChunkCommitInsert(_)
+            | PhysicalPlan::RuntimeFilterSource(_)
+            | PhysicalPlan::RuntimeFilterSink(_) => None,
         }
     }
 
diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs
index c50adc9f30db4..b25b6a84b9b02 100644
--- a/src/query/sql/src/executor/physical_plan_builder.rs
+++ b/src/query/sql/src/executor/physical_plan_builder.rs
@@ -39,6 +39,7 @@ pub struct PhysicalPlanBuilder {
     pub(crate) dry_run: bool,
     // DataMutation info, used to build MergeInto physical plan
     pub(crate) mutation_build_info: Option<MutationBuildInfo>,
+    pub(crate) next_hash_join_id: u32,
 }
 
 impl PhysicalPlanBuilder {
@@ -50,6 +51,7 @@ impl PhysicalPlanBuilder {
             func_ctx,
             dry_run,
             mutation_build_info: None,
+            next_hash_join_id: 0,
         }
     }
 
diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs
index 8a1ced834b01a..031c8359dfc1a 100644
--- a/src/query/sql/src/executor/physical_plan_visitor.rs
+++ b/src/query/sql/src/executor/physical_plan_visitor.rs
@@ -22,6 +22,8 @@ use super::physical_plans::MutationManipulate;
 use super::physical_plans::MutationOrganize;
 use super::physical_plans::MutationSplit;
 use super::physical_plans::RecursiveCteScan;
+use super::physical_plans::RuntimeFilterSink;
+use super::physical_plans::RuntimeFilterSource;
 use crate::executor::physical_plan::PhysicalPlan;
 use crate::executor::physical_plans::AggregateExpand;
 use crate::executor::physical_plans::AggregateFinal;
@@ -120,9 +122,27 @@ pub trait PhysicalPlanReplacer {
             PhysicalPlan::ChunkAppendData(plan) => self.replace_chunk_append_data(plan),
             PhysicalPlan::ChunkMerge(plan) => self.replace_chunk_merge(plan),
             PhysicalPlan::ChunkCommitInsert(plan) => self.replace_chunk_commit_insert(plan),
+            PhysicalPlan::RuntimeFilterSource(plan) => self.replace_runtime_filter_source(plan),
+            PhysicalPlan::RuntimeFilterSink(plan) => self.replace_runtime_filter_sink(plan),
         }
     }
 
+    fn replace_runtime_filter_source(
+        &mut self,
+        plan: &RuntimeFilterSource,
+    ) -> Result<PhysicalPlan> {
+        Ok(PhysicalPlan::RuntimeFilterSource(plan.clone()))
+    }
+
+    fn replace_runtime_filter_sink(&mut self, plan: &RuntimeFilterSink) -> Result<PhysicalPlan> {
+        let input = self.replace(&plan.input)?;
+        Ok(PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink {
+            plan_id: plan.plan_id,
+            join_id: plan.join_id,
+            input: Box::new(input),
+        }))
+    }
+
     fn replace_recluster(&mut self, plan: &Recluster) -> Result<PhysicalPlan> {
         Ok(PhysicalPlan::Recluster(Box::new(plan.clone())))
     }
@@ -253,6 +273,12 @@ pub trait PhysicalPlanReplacer {
         let build = self.replace(&plan.build)?;
         let probe = self.replace(&plan.probe)?;
 
+        let runtime_filter_plan = if let Some(runtime_filter_plan) = &plan.runtime_filter_plan {
+            Some(Box::new(self.replace(runtime_filter_plan)?))
+        } else {
+            None
+        };
+
         Ok(PhysicalPlan::HashJoin(HashJoin {
             plan_id: plan.plan_id,
             projections: plan.projections.clone(),
@@ -271,10 +297,12 @@ pub trait PhysicalPlanReplacer {
             output_schema: plan.output_schema.clone(),
             need_hold_hash_table: plan.need_hold_hash_table,
             stat_info: plan.stat_info.clone(),
-            runtime_filter: plan.runtime_filter.clone(),
+            runtime_filter_desc: plan.runtime_filter_desc.clone(),
             broadcast: plan.broadcast,
             single_to_inner: plan.single_to_inner.clone(),
             build_side_cache_info: plan.build_side_cache_info.clone(),
+            runtime_filter_plan,
+            join_id: plan.join_id,
         }))
     }
 
@@ -646,7 +674,8 @@ impl PhysicalPlan {
                 | PhysicalPlan::HilbertPartition(_)
                 | PhysicalPlan::ExchangeSource(_)
                 | PhysicalPlan::CompactSource(_)
-                | PhysicalPlan::MutationSource(_) => {}
+                | PhysicalPlan::MutationSource(_)
+                | PhysicalPlan::RuntimeFilterSource(_) => {}
                 PhysicalPlan::Filter(plan) => {
                     Self::traverse(&plan.input, pre_visit, visit, post_visit);
                 }
@@ -772,6 +801,9 @@ impl PhysicalPlan {
                 PhysicalPlan::ChunkCommitInsert(plan) => {
                     Self::traverse(&plan.input, pre_visit, visit, post_visit);
                 }
+                PhysicalPlan::RuntimeFilterSink(plan) => {
+                    Self::traverse(&plan.input, pre_visit, visit, post_visit);
+                }
             }
             post_visit(plan);
         }
diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs
index ab3e282abec34..82a40fe356181 100644
--- a/src/query/sql/src/executor/physical_plans/mod.rs
+++ b/src/query/sql/src/executor/physical_plans/mod.rs
@@ -82,8 +82,10 @@ pub use physical_filter::Filter;
 pub use physical_hash_join::HashJoin;
 pub use physical_join::PhysicalJoinType;
 pub use physical_join_filter::JoinRuntimeFilter;
-pub use physical_join_filter::PhysicalRuntimeFilter;
-pub use physical_join_filter::PhysicalRuntimeFilters;
+pub use physical_join_filter::RemoteRuntimeFilterDesc;
+pub use physical_join_filter::RemoteRuntimeFiltersDesc;
+pub use physical_join_filter::RuntimeFilterSink;
+pub use physical_join_filter::RuntimeFilterSource;
 pub use physical_limit::Limit;
 pub use physical_multi_table_insert::*;
 pub use physical_mutation::*;
diff --git a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs
index c2b7cc21c1b72..dd9cc320e6cee 100644
--- a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs
+++ b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs
@@ -27,8 +27,9 @@ use databend_common_expression::DataSchemaRefExt;
 use databend_common_expression::RemoteExpr;
 use databend_common_functions::BUILTIN_FUNCTIONS;
 
+use super::physical_join_filter::build_runtime_filter_plan;
+use super::physical_join_filter::RemoteRuntimeFiltersDesc;
 use super::JoinRuntimeFilter;
-use super::PhysicalRuntimeFilters;
 use crate::executor::explain::PlanStatsInfo;
 use crate::executor::physical_plans::Exchange;
 use crate::executor::physical_plans::FragmentKind;
@@ -107,7 +108,9 @@ pub struct HashJoin {
     // a HashMap for mapping the column indexes to the BlockEntry indexes in DataBlock.
     pub build_side_cache_info: Option<(usize, HashMap<IndexType, usize>)>,
 
-    pub runtime_filter: PhysicalRuntimeFilters,
+    pub runtime_filter_desc: RemoteRuntimeFiltersDesc,
+    pub runtime_filter_plan: Option<Box<PhysicalPlan>>,
+    pub join_id: u32,
 }
 
 impl HashJoin {
@@ -815,7 +818,9 @@ impl PhysicalPlanBuilder {
         probe_to_build: Vec<(usize, (bool, bool))>,
         output_schema: DataSchemaRef,
         build_side_cache_info: Option<(usize, HashMap<IndexType, usize>)>,
-        runtime_filter: PhysicalRuntimeFilters,
+        runtime_filter_desc: RemoteRuntimeFiltersDesc,
+        runtime_filter_plan: Option<Box<PhysicalPlan>>,
+        join_id: u32,
         stat_info: PlanStatsInfo,
     ) -> Result<PhysicalPlan> {
         Ok(PhysicalPlan::HashJoin(HashJoin {
@@ -839,7 +844,9 @@ impl PhysicalPlanBuilder {
             broadcast: is_broadcast,
             single_to_inner: join.single_to_inner.clone(),
             build_side_cache_info,
-            runtime_filter,
+            runtime_filter_desc,
+            runtime_filter_plan,
+            join_id,
         }))
     }
 
@@ -915,7 +922,7 @@ impl PhysicalPlanBuilder {
         let non_equi_conditions = self.process_non_equi_conditions(join, &merged_schema)?;
 
         // Step 11: Build runtime filter
-        let runtime_filter = self
+        let runtime_filter_desc = self
             .build_runtime_filter(
                 join,
                 s_expr,
@@ -925,6 +932,18 @@ impl PhysicalPlanBuilder {
             )
             .await?;
 
+        let join_id = self.next_hash_join_id;
+        self.next_hash_join_id += 1;
+
+        let runtime_filter_plan = if !runtime_filter_desc.filters.is_empty()
+            && !self.ctx.get_cluster().is_empty()
+            && !is_broadcast
+        {
+            Some(build_runtime_filter_plan(join_id)?)
+        } else {
+            None
+        };
+
         // Step 12: Create and return the HashJoin
         self.create_hash_join(
             join,
@@ -941,7 +960,9 @@ impl PhysicalPlanBuilder {
             probe_to_build,
             output_schema,
             build_side_cache_info,
-            runtime_filter,
+            runtime_filter_desc,
+            runtime_filter_plan,
+            join_id,
             stat_info,
         )
     }
@@ -953,8 +974,8 @@ impl PhysicalPlanBuilder {
         is_broadcast: bool,
         build_keys: &[RemoteExpr],
         probe_keys: Vec<Option<(RemoteExpr<String>, usize, usize)>>,
-    ) -> Result<PhysicalRuntimeFilters> {
-        JoinRuntimeFilter::build_runtime_filter(
+    ) -> Result<RemoteRuntimeFiltersDesc> {
+        JoinRuntimeFilter::build_runtime_filter_desc(
             self.ctx.clone(),
             &self.metadata,
             join,
diff --git a/src/query/sql/src/executor/physical_plans/physical_join_filter.rs b/src/query/sql/src/executor/physical_plans/physical_join_filter.rs
index b28d0b581bbee..fa13a877f584d 100644
--- a/src/query/sql/src/executor/physical_plans/physical_join_filter.rs
+++ b/src/query/sql/src/executor/physical_plans/physical_join_filter.rs
@@ -22,6 +22,9 @@ use databend_common_expression::RemoteExpr;
 use databend_common_functions::BUILTIN_FUNCTIONS;
 use databend_storages_common_table_meta::table::get_change_type;
 
+use super::Exchange;
+use super::FragmentKind;
+use crate::executor::PhysicalPlan;
 use crate::optimizer::ir::RelExpr;
 use crate::optimizer::ir::SExpr;
 use crate::plans::Join;
@@ -30,12 +33,12 @@ use crate::IndexType;
 use crate::MetadataRef;
 
 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)]
-pub struct PhysicalRuntimeFilters {
-    pub filters: Vec<PhysicalRuntimeFilter>,
+pub struct RemoteRuntimeFiltersDesc {
+    pub filters: Vec<RemoteRuntimeFilterDesc>,
 }
 
 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
-pub struct PhysicalRuntimeFilter {
+pub struct RemoteRuntimeFilterDesc {
     pub id: usize,
     pub build_key: RemoteExpr,
     pub probe_key: RemoteExpr<String>,
@@ -136,7 +139,7 @@ impl JoinRuntimeFilter {
     }
 
     /// Build runtime filters for a join operation
-    pub async fn build_runtime_filter(
+    pub async fn build_runtime_filter_desc(
         ctx: Arc<dyn TableContext>,
         metadata: &MetadataRef,
         join: &Join,
@@ -144,18 +147,12 @@ impl JoinRuntimeFilter {
         is_broadcast: bool,
         build_keys: &[RemoteExpr],
         probe_keys: Vec<Option<(RemoteExpr<String>, usize, usize)>>,
-    ) -> Result<PhysicalRuntimeFilters> {
+    ) -> Result<RemoteRuntimeFiltersDesc> {
         // Early return if runtime filters are not supported for this join type
         if !Self::supported_join_type_for_runtime_filter(&join.join_type) {
             return Ok(Default::default());
         }
 
-        // For cluster, only support runtime filter for broadcast join
-        let is_cluster = !ctx.get_cluster().is_empty();
-        if is_cluster && !is_broadcast {
-            return Ok(Default::default());
-        }
-
         let mut filters = Vec::new();
 
         // Process each probe key that has runtime filter information
@@ -177,6 +174,8 @@ impl JoinRuntimeFilter {
 
             // Determine which filter types to enable based on data type and statistics
             let enable_bloom_runtime_filter = {
+                // shuffle join does not support bloom runtime filter for now
+                let is_shuffle = !ctx.get_cluster().is_empty() && !is_broadcast;
                 let is_supported_type = Self::is_type_supported_for_bloom_filter(&data_type);
                 let enable_bloom_runtime_filter_based_on_stats = Self::adjust_bloom_runtime_filter(
                     ctx.clone(),
@@ -185,14 +184,14 @@ impl JoinRuntimeFilter {
                     s_expr,
                 )
                 .await?;
-                is_supported_type && enable_bloom_runtime_filter_based_on_stats
+                !is_shuffle && is_supported_type && enable_bloom_runtime_filter_based_on_stats
             };
 
             let enable_min_max_runtime_filter =
                 Self::is_type_supported_for_min_max_filter(&data_type);
 
             // Create and add the runtime filter
-            let runtime_filter = PhysicalRuntimeFilter {
+            let runtime_filter = RemoteRuntimeFilterDesc {
                 id,
                 build_key: build_key.clone(),
                 probe_key,
@@ -204,6 +203,40 @@ impl JoinRuntimeFilter {
             filters.push(runtime_filter);
         }
 
-        Ok(PhysicalRuntimeFilters { filters })
+        Ok(RemoteRuntimeFiltersDesc { filters })
     }
 }
+
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
+pub struct RuntimeFilterSource {
+    pub plan_id: u32,
+    pub join_id: u32,
+}
+
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
+pub struct RuntimeFilterSink {
+    pub plan_id: u32,
+    pub join_id: u32,
+    pub input: Box<PhysicalPlan>,
+}
+
+pub fn build_runtime_filter_plan(join_id: u32) -> Result<Box<PhysicalPlan>> {
+    let runtime_filter_source = Box::new(PhysicalPlan::RuntimeFilterSource(RuntimeFilterSource {
+        plan_id: 0,
+        join_id,
+    }));
+    let exchange = Box::new(PhysicalPlan::Exchange(Exchange {
+        plan_id: 0,
+        input: runtime_filter_source,
+        kind: FragmentKind::Expansive,
+        keys: vec![],
+        allow_adjust_parallelism: true,
+        ignore_exchange: false,
+    }));
+    let runtime_filter_sink = Box::new(PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink {
+        plan_id: 0,
+        input: exchange,
+        join_id,
+    }));
+    Ok(runtime_filter_sink)
+}