Skip to content

feat: support using fragment forest to execute additional broadcast operations. #17872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: main
Choose a base branch
from

Conversation

SkyFan2002
Copy link
Member

@SkyFan2002 SkyFan2002 commented Apr 30, 2025

I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/

Summary

This PR introduces a new mechanism for broadcasting additional metadata outside the main data pipeline, aiming to simplify the implementation of runtime filters in shuffle joins and to facilitate range shuffle joins.

Interface

Please refer to the three functions annotated in this commit:b28877e

Implemention

When building the distributed pipeline (build_distributed_pipeline), additional fragments dedicated to broadcasting metadata are created. These fragments form independent trees, and these are connected to the main fragment via channels.

Example

This commit implements a simple demo to illustrate its usage. On the build side of the hash join, basic information (build_num, build_key, node_id) is broadcast, and once all broadcasted data has been received, it is printed to the log.

Execute these sqls in 3-node cluster:

create or replace table t as select number as a from numbers(1);
create or replace table t1 as select number as a from numbers(10);
create or replace table t2 as select number as a from numbers(100);
select * from t, t1, t2 where t.a = t1.a and t1.a = t2.a;

Observe logs:

sky@hp:~/databend/.databend$ grep "build_info" logs_1/databend-query-test_cluster.2025-04-30-20.0
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.156467+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "eXjUTahh2GEShIyGF1B0a2", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 0, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.158979+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "093MVACY8nT17KOudNhbE1", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 0, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.159109+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "HJlRxQwwUnvkjaJ1Zy90g5", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 0, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.164335+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "eXjUTahh2GEShIyGF1B0a2", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 1, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.165755+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "093MVACY8nT17KOudNhbE1", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 1, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.165954+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "HJlRxQwwUnvkjaJ1Zy90g5", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 1, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }

sky@hp:~/databend/.databend$ grep "build_info" logs_2/databend-query-test_cluster.2025-04-30-20.0
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.157708+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "eXjUTahh2GEShIyGF1B0a2", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 0, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.158105+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "HJlRxQwwUnvkjaJ1Zy90g5", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 0, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.158782+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "093MVACY8nT17KOudNhbE1", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 0, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.165091+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "HJlRxQwwUnvkjaJ1Zy90g5", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 1, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.165232+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "eXjUTahh2GEShIyGF1B0a2", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 1, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.165729+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "093MVACY8nT17KOudNhbE1", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 1, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }

sky@hp:~/databend/.databend$ grep "build_info" logs_3/databend-query-test_cluster.2025-04-30-20.0
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.157723+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "eXjUTahh2GEShIyGF1B0a2", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 0, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.158232+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "093MVACY8nT17KOudNhbE1", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 0, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.158711+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "HJlRxQwwUnvkjaJ1Zy90g5", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 0, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.165042+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "093MVACY8nT17KOudNhbE1", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 1, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.165153+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "eXjUTahh2GEShIyGF1B0a2", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 1, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }
fb6a8e86-e905-489e-bcb9-2ff57b3831cb 2025-04-30T20:00:55.165747+08:00  INFO databend_query::pipelines::processors::transforms::hash_join::hash_join_build_state: hash_join_build_state.rs:311 receive build_info: BuildInfo { node_id: "HJlRxQwwUnvkjaJ1Zy90g5", num_rows: 1, build_key: [ColumnRef { span: Some(7..8), id: 1, data_type: Nullable(Number(UInt64)), display_name: "t.a (#0)" }] }

Tests

  • Unit Test
  • Logic Test
  • Benchmark Test
  • No Test - Explain why

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Breaking Change (fix or feature that could cause existing functionality not to work as expected)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

This change is Reviewable

@github-actions github-actions bot added the pr-feature this PR introduces a new feature to the codebase label Apr 30, 2025
# Conflicts:
#	src/query/service/src/pipelines/pipeline_builder.rs
@BohuTANG BohuTANG marked this pull request as draft April 30, 2025 08:10
@SkyFan2002 SkyFan2002 marked this pull request as ready for review April 30, 2025 11:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr-feature this PR introduces a new feature to the codebase
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants