maintainer,heartbeatpb: add drain target plumbing#4759
maintainer,heartbeatpb: add drain target plumbing#4759hongyunyan wants to merge 1 commit intomasterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis PR implements dispatcher drain target management across the maintainer subsystem, adding protobuf message definitions for drain progress tracking, manager-level drain target coordination, scheduler-level drain state, and message handling for coordinator-driven drain requests. Changes
Sequence Diagram(s)sequenceDiagram
participant Coordinator
participant Manager
participant MaintainerSet
participant Maintainer
participant Scheduler
Coordinator->>Manager: onSetDispatcherDrainTargetRequest<br/>(target_node_id, target_epoch)
Manager->>Manager: tryUpdateDispatcherDrainTarget<br/>(validate monotonic rules)
Manager->>MaintainerSet: applyDispatcherDrainTarget<br/>(target, epoch)
MaintainerSet->>Maintainer: SetDispatcherDrainTarget<br/>(target, epoch)
Maintainer->>Scheduler: SetDispatcherDrainTarget<br/>(via controller)
Scheduler->>Scheduler: Update DrainState snapshot
Manager->>Manager: sendNodeHeartbeat(true)<br/>(forced ack with drain fields)
Manager->>Coordinator: TypeNodeHeartbeatRequest<br/>(dispatcher_drain_target_node_id,<br/>dispatcher_drain_target_epoch)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@hongyunyan: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Code Review
This pull request introduces a dispatcher drain mechanism, allowing the coordinator to instruct maintainer managers to drain specific nodes. It adds new protobuf definitions for drain progress and requests, updates the maintainer manager to handle these requests, and implements a node-scoped drain state. I have provided feedback regarding the use of log.Panic for input validation, which could lead to process crashes, and suggested ensuring that node heartbeats are sent to acknowledge drain target updates even when the update is a duplicate or stale.
| err := json.Unmarshal(req.Config, info) | ||
| if err != nil { | ||
| log.Panic("decode changefeed fail", zap.Error(err)) | ||
| } |
There was a problem hiding this comment.
Using log.Panic when unmarshaling the changefeed configuration from a network message is risky. If the coordinator (or a malicious actor) sends a malformed configuration, it will crash the entire maintainer manager process. It is better to log an error and return a failure status or ignore the request.
| if req.CheckpointTs == 0 { | ||
| log.Panic("add maintainer with invalid checkpointTs", | ||
| zap.Stringer("changefeedID", changefeedID), | ||
| zap.Uint64("checkpointTs", req.CheckpointTs), | ||
| zap.Any("info", info)) | ||
| } |
| if !m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) { | ||
| return | ||
| } |
There was a problem hiding this comment.
If tryUpdateDispatcherDrainTarget returns false (e.g., because the update is a duplicate or stale), the manager currently returns immediately without sending a heartbeat. For better responsiveness and to ensure the coordinator receives an acknowledgement even for retries or stale messages, it is recommended to always send a node heartbeat if the request is from the authorized coordinator.
| if !m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) { | |
| return | |
| } | |
| if m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) { | |
| log.Info("dispatcher drain target updated", | |
| zap.Stringer("targetNodeID", target), | |
| zap.Uint64("targetEpoch", req.TargetEpoch)) | |
| m.maintainers.applyDispatcherDrainTarget(target, req.TargetEpoch) | |
| } | |
| // A manager-level heartbeat is the authoritative acknowledgement that this | |
| // node has applied the latest drain target, even when no maintainers exist. | |
| m.sendNodeHeartbeat(true) |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@maintainer/maintainer_manager_maintainers.go`:
- Around line 88-92: The log currently emits the whole rejected control message
(zap.Any("request", msg)) which can leak serialized changefeed config and
secrets; change the warning in the coordinator ID check (m.coordinatorID !=
msg.From) to avoid logging msg itself and instead log only safe metadata fields
from the AddMaintainerRequest such as msg.Type, msg.From (use zap.Stringer if
appropriate) and msg.ChangefeedID so the message body/config is never written to
logs.
- Around line 74-75: Register the maintainer in the registry atomically before
seeding it from a snapshot: instead of calling getDispatcherDrainTarget() and
then m.maintainers.handleAddMaintainer(...) which seeds the maintainer before it
is visible, first perform an atomic register (e.g., use a per-changefeed lock or
m.maintainers.LoadOrStore equivalent) to ensure only one add wins and the
maintainer becomes visible, then apply the snapshot/drain-target returned by
getDispatcherDrainTarget() to that already-registered maintainer; update the
same pattern at the other occurrences referenced (the blocks around lines
156-185 and 258-262) to eliminate the separate Load/Store window and prevent
duplicate-add and stale-target races.
- Around line 168-177: The code currently panics on json.Unmarshal(req.Config,
info) errors and when req.CheckpointTs == 0 (using log.Panic), which can crash
the node; instead, detect these two conditions (the json.Unmarshal failure for
req.Config into info, and req.CheckpointTs == 0 for the incoming
req/changefeedID), log a structured error (use log.Error or log.Warn with
zap.Error(err), zap.Stringer("changefeedID", changefeedID),
zap.Uint64("checkpointTs", req.CheckpointTs), zap.Any("info", info)) and reject
the add-maintainer request by returning an error (or otherwise signaling failure
to the caller) rather than calling log.Panic so the process does not exit and
the bad request is dropped/reported.
- Around line 277-279: Replace the bare return of ctx.Err() in the select block
with a wrapped error using errors.Trace to preserve stack information; i.e.,
when handling <-ctx.Done() capture the error (e := ctx.Err() or err :=
ctx.Err()) and return errors.Trace(err) instead of returning ctx.Err() directly
(update the select case in maintainer_manager_maintainers.go where the code
currently returns ctx.Err()).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 470fe784-acff-4162-88e8-31176a2ebaca
⛔ Files ignored due to path filters (1)
heartbeatpb/heartbeat.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (9)
heartbeatpb/heartbeat.protomaintainer/maintainer.gomaintainer/maintainer_controller.gomaintainer/maintainer_manager.gomaintainer/maintainer_manager_maintainers.gomaintainer/maintainer_manager_node.gomaintainer/node_liveness_test.gomaintainer/scheduler/drain_common.gopkg/messaging/message.go
| target, epoch := m.getDispatcherDrainTarget() | ||
| return m.maintainers.handleAddMaintainer(req, target, epoch) |
There was a problem hiding this comment.
Make maintainer registration atomic with drain-target propagation.
This seeds the new maintainer from a snapshot before it is visible in registry. If a newer epoch is published concurrently, the fan-out path skips this maintainer and it can keep the stale target until the next update; the separate Load/Store window also allows duplicate add races. Please register atomically first (for example with per-changefeed locking or LoadOrStore) and only then apply the snapshot.
Also applies to: 156-185, 258-262
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@maintainer/maintainer_manager_maintainers.go` around lines 74 - 75, Register
the maintainer in the registry atomically before seeding it from a snapshot:
instead of calling getDispatcherDrainTarget() and then
m.maintainers.handleAddMaintainer(...) which seeds the maintainer before it is
visible, first perform an atomic register (e.g., use a per-changefeed lock or
m.maintainers.LoadOrStore equivalent) to ensure only one add wins and the
maintainer becomes visible, then apply the snapshot/drain-target returned by
getDispatcherDrainTarget() to that already-registered maintainer; update the
same pattern at the other occurrences referenced (the blocks around lines
156-185 and 258-262) to eliminate the separate Load/Store window and prevent
duplicate-add and stale-target races.
| if m.coordinatorID != msg.From { | ||
| log.Warn("ignore invalid coordinator id", | ||
| zap.Any("request", msg), | ||
| zap.Any("coordinatorID", m.coordinatorID), | ||
| zap.Stringer("from", msg.From)) |
There was a problem hiding this comment.
Don't log the whole rejected control message.
AddMaintainerRequest carries the serialized changefeed config from coordinator/changefeed/changefeed.go:249-259. zap.Any("request", msg) can therefore leak sink credentials or tokens into logs. Please log only metadata such as Type, From, and changefeedID.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@maintainer/maintainer_manager_maintainers.go` around lines 88 - 92, The log
currently emits the whole rejected control message (zap.Any("request", msg))
which can leak serialized changefeed config and secrets; change the warning in
the coordinator ID check (m.coordinatorID != msg.From) to avoid logging msg
itself and instead log only safe metadata fields from the AddMaintainerRequest
such as msg.Type, msg.From (use zap.Stringer if appropriate) and
msg.ChangefeedID so the message body/config is never written to logs.
| err := json.Unmarshal(req.Config, info) | ||
| if err != nil { | ||
| log.Panic("decode changefeed fail", zap.Error(err)) | ||
| } | ||
| if req.CheckpointTs == 0 { | ||
| log.Panic("add maintainer with invalid checkpointTs", | ||
| zap.Stringer("changefeedID", changefeedID), | ||
| zap.Uint64("checkpointTs", req.CheckpointTs), | ||
| zap.Any("info", info)) | ||
| } |
There was a problem hiding this comment.
Reject bad add requests instead of panicking the capture.
A JSON decode error or CheckpointTs == 0 currently brings the whole node down. coordinator/changefeed/changefeed.go:249-259 forwards CheckpointTs straight from changefeed status, so an uninitialized status can become a process crash here. Drop/report the bad request instead of calling log.Panic.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@maintainer/maintainer_manager_maintainers.go` around lines 168 - 177, The
code currently panics on json.Unmarshal(req.Config, info) errors and when
req.CheckpointTs == 0 (using log.Panic), which can crash the node; instead,
detect these two conditions (the json.Unmarshal failure for req.Config into
info, and req.CheckpointTs == 0 for the incoming req/changefeedID), log a
structured error (use log.Error or log.Warn with zap.Error(err),
zap.Stringer("changefeedID", changefeedID), zap.Uint64("checkpointTs",
req.CheckpointTs), zap.Any("info", info)) and reject the add-maintainer request
by returning an error (or otherwise signaling failure to the caller) rather than
calling log.Panic so the process does not exit and the bad request is
dropped/reported.
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n maintainer/maintainer_manager_maintainers.go | sed -n '270,290p'Repository: pingcap/ticdc
Length of output: 617
🏁 Script executed:
rg 'ctx\.Err\(\)' --type go -B 3 -A 1 maintainer/Repository: pingcap/ticdc
Length of output: 1392
🏁 Script executed:
rg 'errors\.Trace\(ctx\.Err' --type goRepository: pingcap/ticdc
Length of output: 2979
🏁 Script executed:
rg 'return ctx\.Err\(\)' --type go | head -20Repository: pingcap/ticdc
Length of output: 1057
Wrap the cancellation error before returning it.
Returning bare ctx.Err() loses the stack trace. Per coding guidelines, errors from library calls should be wrapped immediately with errors.Trace(err). This pattern is used consistently throughout the codebase (server, workerpool, sinks, etc.) but is not applied in the maintainer module.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@maintainer/maintainer_manager_maintainers.go` around lines 277 - 279, Replace
the bare return of ctx.Err() in the select block with a wrapped error using
errors.Trace to preserve stack information; i.e., when handling <-ctx.Done()
capture the error (e := ctx.Err() or err := ctx.Err()) and return
errors.Trace(err) instead of returning ctx.Err() directly (update the select
case in maintainer_manager_maintainers.go where the code currently returns
ctx.Err()).
What problem does this PR solve?
The dispatcher-drain work split out of #4190 still mixes the drain target protocol, maintainer runtime behavior, coordinator scheduling, and public API orchestration in one review path. This PR extracts the drain target protocol and maintainer-manager plumbing into a dedicated review unit so reviewers can focus on how the target is propagated and acknowledged before looking at scheduler behavior.
Issue Number: ref #4190
What is changed and how it works?
Background:
master.#4523still mixed protocol, runtime scheduling, and API orchestration.masterand keeps only the drain target plumbing.Motivation:
Summary:
SetDispatcherDrainTargetRequestHow it works:
Check List
Tests
go test ./maintainer -run 'TestSetDispatcherDrainTarget|TestSetNodeLiveness'go test ./pkg/messaging ./heartbeatpbQuestions
Will it cause performance regression or break compatibility?
This PR does not add a user-facing API by itself. It only adds the internal protocol and manager-level drain target propagation needed by the later runtime and API layers.
Do you need to update user documentation, design documentation or monitoring documentation?
No additional user-facing documentation is needed for this split. It is an internal decomposition of the drain-capture implementation.
Release note
Summary by CodeRabbit
New Features
Improvements