Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ class HashJoinBuildSinkOperatorX MOCK_REMOVE(final)
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE;
}
bool followed_by_shuffled_operator() const override {
return (is_shuffled_operator() && !is_colocated_operator()) ||
_followed_by_shuffled_operator;
}
std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; }

bool allow_left_semi_direct_return(RuntimeState* state) const {
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ class HashJoinProbeOperatorX MOCK_REMOVE(final)
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE;
}
bool followed_by_shuffled_operator() const override {
return (is_shuffled_operator() && !is_colocated_operator()) ||
_followed_by_shuffled_operator;
}

bool need_finalize_variant_column() const { return _need_finalize_variant_column; }

Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,14 @@ class OperatorBase {
*/
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
/**
* Return True if this operator is followed by a shuffled operator.
* For multiple children's operators, return true if this is a shuffled operator or this is followed by a shuffled operator (HASH JOIN and SET OPERATION).
*
* For single child's operators, return true if this operator is followed by a shuffled operator.
* For example, in the plan fragment:
* `UNION` -> `SHUFFLED HASH JOIN`
* The `SHUFFLED HASH JOIN` is a shuffled operator so the UNION operator is followed by a shuffled operator.
*/
[[nodiscard]] bool followed_by_shuffled_operator() const {
[[nodiscard]] virtual bool followed_by_shuffled_operator() const {
return _followed_by_shuffled_operator;
}
/**
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ class PartitionedHashJoinProbeOperatorX final
bool is_colocated_operator() const override {
return _inner_probe_operator->is_colocated_operator();
}
bool followed_by_shuffled_operator() const override {
return _inner_probe_operator->followed_by_shuffled_operator();
}

void update_operator(const TPlanNode& tnode, bool followed_by_shuffled_operator,
bool require_bucket_distribution) override {
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ class PartitionedHashJoinSinkOperatorX
bool is_shuffled_operator() const override {
return _inner_sink_operator->is_shuffled_operator();
}
bool followed_by_shuffled_operator() const override {
return _inner_sink_operator->followed_by_shuffled_operator();
}

void update_operator(const TPlanNode& tnode, bool followed_by_shuffled_operator,
bool require_bucket_distribution) override {
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/set_probe_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ class SetProbeSinkOperatorX final : public DataSinkOperatorX<SetProbeSinkLocalSt

bool is_shuffled_operator() const override { return true; }
bool is_colocated_operator() const override { return _is_colocate; }
bool followed_by_shuffled_operator() const override {
return (is_shuffled_operator() && !is_colocated_operator()) ||
Base::_followed_by_shuffled_operator;
}

private:
void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int

bool is_shuffled_operator() const override { return true; }
bool is_colocated_operator() const override { return _is_colocate; }
bool followed_by_shuffled_operator() const override {
return (is_shuffled_operator() && !is_colocated_operator()) ||
Base::_followed_by_shuffled_operator;
}

private:
template <class HashTableContext, bool is_intersected>
Expand Down
Loading