Skip to content

maintainer,heartbeatpb: add drain target plumbing#4759

Open
hongyunyan wants to merge 1 commit intomasterfrom
split/pr-4190-2a-drain-protocol
Open

maintainer,heartbeatpb: add drain target plumbing#4759
hongyunyan wants to merge 1 commit intomasterfrom
split/pr-4190-2a-drain-protocol

Conversation

@hongyunyan
Copy link
Copy Markdown
Collaborator

@hongyunyan hongyunyan commented Apr 7, 2026

What problem does this PR solve?

The dispatcher-drain work split out of #4190 still mixes the drain target protocol, maintainer runtime behavior, coordinator scheduling, and public API orchestration in one review path. This PR extracts the drain target protocol and maintainer-manager plumbing into a dedicated review unit so reviewers can focus on how the target is propagated and acknowledged before looking at scheduler behavior.

Issue Number: ref #4190

What is changed and how it works?

Background:

  • coordinator,maintainer: add node liveness foundation #4522 already merged the node liveness foundation into master.
  • The remaining dispatcher-drain work from the old #4523 still mixed protocol, runtime scheduling, and API orchestration.
  • This PR starts a new stacked split from master and keeps only the drain target plumbing.

Motivation:

  • Separate protocol and manager-level state propagation from drain scheduling logic.
  • Let reviewers verify target monotonicity and acknowledgment rules before reading scheduler algorithms.
  • Keep protobuf and messaging changes in the same review unit as the maintainer-manager handlers that consume them.

Summary:

  • add SetDispatcherDrainTargetRequest
  • extend node heartbeat with dispatcher-drain target acknowledgment fields
  • add manager-level drain target state with monotonic update rules and clear-once behavior
  • fan out the latest drain target from maintainer manager to newly created and already running maintainers
  • add focused tests for target apply, stale-update rejection, and heartbeat acknowledgment

How it works:

  • coordinator can send a dispatcher-drain target to the maintainer manager
  • the maintainer manager applies it at node scope and reports the applied target back in node heartbeat
  • the latest target is also seeded into each local maintainer so later runtime layers can consume a consistent snapshot

Check List

Tests

  • Unit test
    • go test ./maintainer -run 'TestSetDispatcherDrainTarget|TestSetNodeLiveness'
    • go test ./pkg/messaging ./heartbeatpb

Questions

Will it cause performance regression or break compatibility?

This PR does not add a user-facing API by itself. It only adds the internal protocol and manager-level drain target propagation needed by the later runtime and API layers.

Do you need to update user documentation, design documentation or monitoring documentation?

No additional user-facing documentation is needed for this split. It is an internal decomposition of the drain-capture implementation.

Release note

None

Summary by CodeRabbit

  • New Features

    • Added dispatcher drain target coordination at the manager level, enabling control of which nodes dispatchers drain work from.
    • Extended heartbeat reporting to include dispatcher drain progress metrics and target information.
  • Improvements

    • Enhanced scheduler state management with drain-aware target filtering and balance scheduling controls.

@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot bot commented Apr 7, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign wk989898 for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 7, 2026

📝 Walkthrough

Walkthrough

This PR implements dispatcher drain target management across the maintainer subsystem, adding protobuf message definitions for drain progress tracking, manager-level drain target coordination, scheduler-level drain state, and message handling for coordinator-driven drain requests.

Changes

Cohort / File(s) Summary
Protobuf Message Definitions
heartbeatpb/heartbeat.proto
Added DrainProgress message with target node/epoch and dispatcher metrics. Extended MaintainerStatus with drain_progress field and NodeHeartbeat with dispatcher_drain_target_node_id/dispatcher_drain_target_epoch fields. Added SetDispatcherDrainTargetRequest message for drain target coordination requests.
Manager Node State & Drain Coordination
maintainer/maintainer_manager_node.go
Added manager-level dispatcher drain target tracking with mutex-protected state snapshot. Implemented onSetDispatcherDrainTargetRequest handler to validate, apply, and propagate coordinator-driven drain target updates, with monotonic update enforcement and forced node heartbeat acknowledgment.
Maintainer Controller & Drain State
maintainer/maintainer_controller.go, maintainer/maintainer.go
Added DrainState field to Controller, introduced SetDispatcherDrainTarget() method to forward drain target updates. Added public SetDispatcherDrainTarget() method to Maintainer that triggers status change notifications.
Manager Structure Refactoring
maintainer/maintainer_manager.go, maintainer/maintainer_manager_maintainers.go
Refactored Manager to use new managerMaintainerSet abstraction for maintainer lifecycle management. Consolidated maintainer add/remove/dispatch handling, status aggregation, and bootstrap response construction into new abstraction. Updated heartbeat sending loop and cleanup routines.
Scheduler Drain Logic
maintainer/scheduler/drain_common.go
Added DrainState type with RW-mutex-protected drain target tracking. Implemented drain-aware filtering functions and pause logic for balance scheduling based on active drain targets.
Message Type Support
pkg/messaging/message.go
Added TypeSetDispatcherDrainTargetRequest (IOType = 45) constant, updated String(), decodeIOType(), and NewSingleTargetMessage() to handle new drain target request messages.
Drain Target Tests
maintainer/node_liveness_test.go
Added three test cases: TestSetDispatcherDrainTargetApplyAndClear validates target application and clearing, TestSetDispatcherDrainTargetRejectStaleUpdate verifies stale update rejection and monotonic enforcement, and TestSetDispatcherDrainTargetSendsNodeHeartbeatAck confirms heartbeat acknowledgment with drain target fields.

Sequence Diagram(s)

sequenceDiagram
    participant Coordinator
    participant Manager
    participant MaintainerSet
    participant Maintainer
    participant Scheduler

    Coordinator->>Manager: onSetDispatcherDrainTargetRequest<br/>(target_node_id, target_epoch)
    Manager->>Manager: tryUpdateDispatcherDrainTarget<br/>(validate monotonic rules)
    Manager->>MaintainerSet: applyDispatcherDrainTarget<br/>(target, epoch)
    MaintainerSet->>Maintainer: SetDispatcherDrainTarget<br/>(target, epoch)
    Maintainer->>Scheduler: SetDispatcherDrainTarget<br/>(via controller)
    Scheduler->>Scheduler: Update DrainState snapshot
    Manager->>Manager: sendNodeHeartbeat(true)<br/>(forced ack with drain fields)
    Manager->>Coordinator: TypeNodeHeartbeatRequest<br/>(dispatcher_drain_target_node_id,<br/>dispatcher_drain_target_epoch)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Suggested labels

lgtm, approved, size/XXL

Suggested reviewers

  • 3AceShowHand
  • wk989898
  • asddongmen

Poem

🐰 Drains and targets dance in harmony,
A heartbeat whispers drain decree,
Managers coordinate with care,
Schedulers filter, nodes aware,
Dispatcher drains flow perfectly! 🎯

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 58.82% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'maintainer,heartbeatpb: add drain target plumbing' clearly summarizes the main change: adding drain target protocol plumbing across the maintainer and heartbeatpb packages.
Description check ✅ Passed The description provides comprehensive context including problem statement, detailed changes, motivation, test commands, compatibility assessment, and release notes per template requirements.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch split/pr-4190-2a-drain-protocol

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Apr 7, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot bot commented Apr 7, 2026

@hongyunyan: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 2b2b7ec link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot bot commented Apr 7, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a dispatcher drain mechanism, allowing the coordinator to instruct maintainer managers to drain specific nodes. It adds new protobuf definitions for drain progress and requests, updates the maintainer manager to handle these requests, and implements a node-scoped drain state. I have provided feedback regarding the use of log.Panic for input validation, which could lead to process crashes, and suggested ensuring that node heartbeats are sent to acknowledge drain target updates even when the update is a duplicate or stale.

Comment on lines +168 to +171
err := json.Unmarshal(req.Config, info)
if err != nil {
log.Panic("decode changefeed fail", zap.Error(err))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using log.Panic when unmarshaling the changefeed configuration from a network message is risky. If the coordinator (or a malicious actor) sends a malformed configuration, it will crash the entire maintainer manager process. It is better to log an error and return a failure status or ignore the request.

Comment on lines +172 to +177
if req.CheckpointTs == 0 {
log.Panic("add maintainer with invalid checkpointTs",
zap.Stringer("changefeedID", changefeedID),
zap.Uint64("checkpointTs", req.CheckpointTs),
zap.Any("info", info))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the JSON unmarshaling issue, using log.Panic for an invalid CheckpointTs can lead to process crashes on invalid input. Consider logging the error and rejecting the request instead of panicking.

Comment on lines +166 to +168
if !m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) {
return
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If tryUpdateDispatcherDrainTarget returns false (e.g., because the update is a duplicate or stale), the manager currently returns immediately without sending a heartbeat. For better responsiveness and to ensure the coordinator receives an acknowledgement even for retries or stale messages, it is recommended to always send a node heartbeat if the request is from the authorized coordinator.

Suggested change
if !m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) {
return
}
if m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) {
log.Info("dispatcher drain target updated",
zap.Stringer("targetNodeID", target),
zap.Uint64("targetEpoch", req.TargetEpoch))
m.maintainers.applyDispatcherDrainTarget(target, req.TargetEpoch)
}
// A manager-level heartbeat is the authoritative acknowledgement that this
// node has applied the latest drain target, even when no maintainers exist.
m.sendNodeHeartbeat(true)

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@maintainer/maintainer_manager_maintainers.go`:
- Around line 88-92: The log currently emits the whole rejected control message
(zap.Any("request", msg)) which can leak serialized changefeed config and
secrets; change the warning in the coordinator ID check (m.coordinatorID !=
msg.From) to avoid logging msg itself and instead log only safe metadata fields
from the AddMaintainerRequest such as msg.Type, msg.From (use zap.Stringer if
appropriate) and msg.ChangefeedID so the message body/config is never written to
logs.
- Around line 74-75: Register the maintainer in the registry atomically before
seeding it from a snapshot: instead of calling getDispatcherDrainTarget() and
then m.maintainers.handleAddMaintainer(...) which seeds the maintainer before it
is visible, first perform an atomic register (e.g., use a per-changefeed lock or
m.maintainers.LoadOrStore equivalent) to ensure only one add wins and the
maintainer becomes visible, then apply the snapshot/drain-target returned by
getDispatcherDrainTarget() to that already-registered maintainer; update the
same pattern at the other occurrences referenced (the blocks around lines
156-185 and 258-262) to eliminate the separate Load/Store window and prevent
duplicate-add and stale-target races.
- Around line 168-177: The code currently panics on json.Unmarshal(req.Config,
info) errors and when req.CheckpointTs == 0 (using log.Panic), which can crash
the node; instead, detect these two conditions (the json.Unmarshal failure for
req.Config into info, and req.CheckpointTs == 0 for the incoming
req/changefeedID), log a structured error (use log.Error or log.Warn with
zap.Error(err), zap.Stringer("changefeedID", changefeedID),
zap.Uint64("checkpointTs", req.CheckpointTs), zap.Any("info", info)) and reject
the add-maintainer request by returning an error (or otherwise signaling failure
to the caller) rather than calling log.Panic so the process does not exit and
the bad request is dropped/reported.
- Around line 277-279: Replace the bare return of ctx.Err() in the select block
with a wrapped error using errors.Trace to preserve stack information; i.e.,
when handling <-ctx.Done() capture the error (e := ctx.Err() or err :=
ctx.Err()) and return errors.Trace(err) instead of returning ctx.Err() directly
(update the select case in maintainer_manager_maintainers.go where the code
currently returns ctx.Err()).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 470fe784-acff-4162-88e8-31176a2ebaca

📥 Commits

Reviewing files that changed from the base of the PR and between 567506c and 2b2b7ec.

⛔ Files ignored due to path filters (1)
  • heartbeatpb/heartbeat.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (9)
  • heartbeatpb/heartbeat.proto
  • maintainer/maintainer.go
  • maintainer/maintainer_controller.go
  • maintainer/maintainer_manager.go
  • maintainer/maintainer_manager_maintainers.go
  • maintainer/maintainer_manager_node.go
  • maintainer/node_liveness_test.go
  • maintainer/scheduler/drain_common.go
  • pkg/messaging/message.go

Comment on lines +74 to +75
target, epoch := m.getDispatcherDrainTarget()
return m.maintainers.handleAddMaintainer(req, target, epoch)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make maintainer registration atomic with drain-target propagation.

This seeds the new maintainer from a snapshot before it is visible in registry. If a newer epoch is published concurrently, the fan-out path skips this maintainer and it can keep the stale target until the next update; the separate Load/Store window also allows duplicate add races. Please register atomically first (for example with per-changefeed locking or LoadOrStore) and only then apply the snapshot.

Also applies to: 156-185, 258-262

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer_manager_maintainers.go` around lines 74 - 75, Register
the maintainer in the registry atomically before seeding it from a snapshot:
instead of calling getDispatcherDrainTarget() and then
m.maintainers.handleAddMaintainer(...) which seeds the maintainer before it is
visible, first perform an atomic register (e.g., use a per-changefeed lock or
m.maintainers.LoadOrStore equivalent) to ensure only one add wins and the
maintainer becomes visible, then apply the snapshot/drain-target returned by
getDispatcherDrainTarget() to that already-registered maintainer; update the
same pattern at the other occurrences referenced (the blocks around lines
156-185 and 258-262) to eliminate the separate Load/Store window and prevent
duplicate-add and stale-target races.

Comment on lines +88 to +92
if m.coordinatorID != msg.From {
log.Warn("ignore invalid coordinator id",
zap.Any("request", msg),
zap.Any("coordinatorID", m.coordinatorID),
zap.Stringer("from", msg.From))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't log the whole rejected control message.

AddMaintainerRequest carries the serialized changefeed config from coordinator/changefeed/changefeed.go:249-259. zap.Any("request", msg) can therefore leak sink credentials or tokens into logs. Please log only metadata such as Type, From, and changefeedID.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer_manager_maintainers.go` around lines 88 - 92, The log
currently emits the whole rejected control message (zap.Any("request", msg))
which can leak serialized changefeed config and secrets; change the warning in
the coordinator ID check (m.coordinatorID != msg.From) to avoid logging msg
itself and instead log only safe metadata fields from the AddMaintainerRequest
such as msg.Type, msg.From (use zap.Stringer if appropriate) and
msg.ChangefeedID so the message body/config is never written to logs.

Comment on lines +168 to +177
err := json.Unmarshal(req.Config, info)
if err != nil {
log.Panic("decode changefeed fail", zap.Error(err))
}
if req.CheckpointTs == 0 {
log.Panic("add maintainer with invalid checkpointTs",
zap.Stringer("changefeedID", changefeedID),
zap.Uint64("checkpointTs", req.CheckpointTs),
zap.Any("info", info))
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Reject bad add requests instead of panicking the capture.

A JSON decode error or CheckpointTs == 0 currently brings the whole node down. coordinator/changefeed/changefeed.go:249-259 forwards CheckpointTs straight from changefeed status, so an uninitialized status can become a process crash here. Drop/report the bad request instead of calling log.Panic.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer_manager_maintainers.go` around lines 168 - 177, The
code currently panics on json.Unmarshal(req.Config, info) errors and when
req.CheckpointTs == 0 (using log.Panic), which can crash the node; instead,
detect these two conditions (the json.Unmarshal failure for req.Config into
info, and req.CheckpointTs == 0 for the incoming req/changefeedID), log a
structured error (use log.Error or log.Warn with zap.Error(err),
zap.Stringer("changefeedID", changefeedID), zap.Uint64("checkpointTs",
req.CheckpointTs), zap.Any("info", info)) and reject the add-maintainer request
by returning an error (or otherwise signaling failure to the caller) rather than
calling log.Panic so the process does not exit and the bad request is
dropped/reported.

Comment on lines +277 to +279
select {
case <-ctx.Done():
return ctx.Err()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat -n maintainer/maintainer_manager_maintainers.go | sed -n '270,290p'

Repository: pingcap/ticdc

Length of output: 617


🏁 Script executed:

rg 'ctx\.Err\(\)' --type go -B 3 -A 1 maintainer/

Repository: pingcap/ticdc

Length of output: 1392


🏁 Script executed:

rg 'errors\.Trace\(ctx\.Err' --type go

Repository: pingcap/ticdc

Length of output: 2979


🏁 Script executed:

rg 'return ctx\.Err\(\)' --type go | head -20

Repository: pingcap/ticdc

Length of output: 1057


Wrap the cancellation error before returning it.

Returning bare ctx.Err() loses the stack trace. Per coding guidelines, errors from library calls should be wrapped immediately with errors.Trace(err). This pattern is used consistently throughout the codebase (server, workerpool, sinks, etc.) but is not applied in the maintainer module.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer_manager_maintainers.go` around lines 277 - 279, Replace
the bare return of ctx.Err() in the select block with a wrapped error using
errors.Trace to preserve stack information; i.e., when handling <-ctx.Done()
capture the error (e := ctx.Err() or err := ctx.Err()) and return
errors.Trace(err) instead of returning ctx.Err() directly (update the select
case in maintainer_manager_maintainers.go where the code currently returns
ctx.Err()).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/needs-triage-completed release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant