From d27db54f8573d86a83bbb9c75e46fff4b617dbde Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 14 Feb 2026 17:20:00 +0800 Subject: [PATCH] [fix](local exchange) Do global shuffle for shuffled join/set operator (#60758) Make followed_by_shuffled_operator() virtual and override it in hash join and set operators to correctly report shuffle requirements. For multi-child operators (hash join, set operations), the method now returns true if the operator itself is shuffled (and not colocated), ensuring proper global shuffle behavior in local exchange. Changes: - Make followed_by_shuffled_operator() virtual in base OperatorX class - Override in HashJoinBuildSinkOperatorX and HashJoinProbeOperatorX to return is_shuffled_operator() && !is_colocated_operator() - Override in PartitionedHashJoinProbeOperatorX and PartitionedHashJoinSinkOperatorX to delegate to inner operator - Override in SetProbeSinkOperatorX and SetSinkOperatorX to return !_is_colocate This fixes incorrect data distribution when shuffled join/set operators are used in pipelines with local exchange. --- be/src/pipeline/exec/hashjoin_build_sink.h | 4 ++++ be/src/pipeline/exec/hashjoin_probe_operator.h | 4 ++++ be/src/pipeline/exec/operator.h | 6 ++++-- be/src/pipeline/exec/partitioned_hash_join_probe_operator.h | 3 +++ be/src/pipeline/exec/partitioned_hash_join_sink_operator.h | 3 +++ be/src/pipeline/exec/set_probe_sink_operator.h | 4 ++++ be/src/pipeline/exec/set_sink_operator.h | 4 ++++ 7 files changed, 26 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 2bb9407ef5cd9a..f0cf73a82dd403 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -148,6 +148,10 @@ class HashJoinBuildSinkOperatorX MOCK_REMOVE(final) return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE; } + bool followed_by_shuffled_operator() const override { + return (is_shuffled_operator() && !is_colocated_operator()) || + _followed_by_shuffled_operator; + } std::vector& is_null_safe_eq_join() { return _is_null_safe_eq_join; } bool allow_left_semi_direct_return(RuntimeState* state) const { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index eff32f432ffeca..6c5232b3720d9b 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -153,6 +153,10 @@ class HashJoinProbeOperatorX MOCK_REMOVE(final) return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE; } + bool followed_by_shuffled_operator() const override { + return (is_shuffled_operator() && !is_colocated_operator()) || + _followed_by_shuffled_operator; + } bool need_finalize_variant_column() const { return _need_finalize_variant_column; } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 5ab3fdf6e7fca2..875d9ce18ef2c6 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -163,12 +163,14 @@ class OperatorBase { */ [[nodiscard]] virtual bool is_shuffled_operator() const { return false; } /** - * Return True if this operator is followed by a shuffled operator. + * For multiple children's operators, return true if this is a shuffled operator or this is followed by a shuffled operator (HASH JOIN and SET OPERATION). + * + * For single child's operators, return true if this operator is followed by a shuffled operator. * For example, in the plan fragment: * `UNION` -> `SHUFFLED HASH JOIN` * The `SHUFFLED HASH JOIN` is a shuffled operator so the UNION operator is followed by a shuffled operator. */ - [[nodiscard]] bool followed_by_shuffled_operator() const { + [[nodiscard]] virtual bool followed_by_shuffled_operator() const { return _followed_by_shuffled_operator; } /** diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 1532acfda337aa..6419e4676cedd5 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -164,6 +164,9 @@ class PartitionedHashJoinProbeOperatorX final bool is_colocated_operator() const override { return _inner_probe_operator->is_colocated_operator(); } + bool followed_by_shuffled_operator() const override { + return _inner_probe_operator->followed_by_shuffled_operator(); + } void update_operator(const TPlanNode& tnode, bool followed_by_shuffled_operator, bool require_bucket_distribution) override { diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index e098d071daa5f3..59eed7aac6637b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -150,6 +150,9 @@ class PartitionedHashJoinSinkOperatorX bool is_shuffled_operator() const override { return _inner_sink_operator->is_shuffled_operator(); } + bool followed_by_shuffled_operator() const override { + return _inner_sink_operator->followed_by_shuffled_operator(); + } void update_operator(const TPlanNode& tnode, bool followed_by_shuffled_operator, bool require_bucket_distribution) override { diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index c0ee8b7731ed4b..896088fce0df27 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -115,6 +115,10 @@ class SetProbeSinkOperatorX final : public DataSinkOperatorX& local_state); diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 513612fb0916ea..11c243ef24db80 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -119,6 +119,10 @@ class SetSinkOperatorX final : public DataSinkOperatorX