Conversation
…ce CRD definitions
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughIntroduce a cluster-scoped History CRD and HistoryManager; migrate controllers to asynchronously upsert/delete History resources instead of synchronously creating/patching Decision status; add deepcopy and CRD manifests, RBAC, and tests; remove the explanation subsystem and related wiring; adjust API types to use v1alpha1.SchedulingIntent. Changes
Sequence Diagram(s)sequenceDiagram
participant Controller as "Pipeline Controller"
participant HistoryMgr as "HistoryManager"
participant K8sAPI as "Kubernetes API"
participant Recorder as "EventRecorder"
Controller->>HistoryMgr: Upsert(ctx, decision, intent, az, pipelineErr)
activate HistoryMgr
HistoryMgr->>K8sAPI: Get History by name
alt History not found
K8sAPI-->>HistoryMgr: NotFound
HistoryMgr->>K8sAPI: Create History (Spec)
else History exists
K8sAPI-->>HistoryMgr: Existing History
HistoryMgr->>K8sAPI: Status().Update(CurrentDecision + archived History)
end
HistoryMgr->>Recorder: Event(SchedulingSucceeded|SchedulingFailed)
deactivate HistoryMgr
HistoryMgr-->>Controller: return error?
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
internal/scheduling/manila/decisions_cleanup_test.go (1)
350-356:⚠️ Potential issue | 🟡 MinorAssert deletions against
History, notDecision.At Line 352 this assertion still reads a
v1alpha1.Decision, so it passes even when theHistoryobject was never deleted. After the CRD swap, the test no longer validates the cleanup path.Suggested fix
- var decision v1alpha1.Decision + var history v1alpha1.History err := client.Get(context.Background(), - types.NamespacedName{Name: expectedDeleted}, &decision) + types.NamespacedName{Name: expectedDeleted}, &history) if err == nil { - t.Errorf("Expected decision %s to be deleted but it still exists", expectedDeleted) + t.Errorf("Expected history %s to be deleted but it still exists", expectedDeleted) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/manila/decisions_cleanup_test.go` around lines 350 - 356, The test is asserting deletion against v1alpha1.Decision but should check v1alpha1.History; update the loop that looks up expectedDeleted to fetch a v1alpha1.History (e.g., var history v1alpha1.History) and call client.Get with types.NamespacedName{Name: expectedDeleted} into that history variable, then assert that an error is returned (or IsNotFound) instead of checking a Decision—ensure all references to the retrieved object in this block use the History type and variable name.internal/scheduling/nova/decisions_cleanup_test.go (1)
352-360:⚠️ Potential issue | 🟠 MajorFetch
History, notDecision, in the deletion assertion.This still queries
v1alpha1.Decision, so the test will pass even if theHistoryobject was never deleted.Suggested fix
if !tt.expectError { - // Verify expected decisions were deleted + // Verify expected histories were deleted for _, expectedDeleted := range tt.expectedDeleted { - var decision v1alpha1.Decision + var history v1alpha1.History err := client.Get(context.Background(), - types.NamespacedName{Name: expectedDeleted}, &decision) + types.NamespacedName{Name: expectedDeleted}, &history) if err == nil { - t.Errorf("Expected decision %s to be deleted but it still exists", expectedDeleted) + t.Errorf("Expected history %s to be deleted but it still exists", expectedDeleted) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/nova/decisions_cleanup_test.go` around lines 352 - 360, The test is asserting deletion of History objects but currently fetches v1alpha1.Decision; change the lookup to fetch a v1alpha1.History instead. Replace the local variable (e.g., var decision v1alpha1.Decision) with var history v1alpha1.History and call client.Get(..., &history) when checking each tt.expectedDeleted (using the same types.NamespacedName{Name: expectedDeleted}), and update the error message to reference the History name so the assertion fails if the History was not actually deleted.internal/scheduling/pods/filter_weigher_pipeline_controller.go (1)
76-96:⚠️ Potential issue | 🟠 Major
pod.Nameis too weak for the new history key.
HistorySpeconly carriesSchedulingDomainandResourceID, and this controller still identifies the scheduling by pod name. Same-named pods in different namespaces will overwrite or delete each other’s history.Suggested fix
decision := &v1alpha1.Decision{ ObjectMeta: metav1.ObjectMeta{ GenerateName: "pod-", }, Spec: v1alpha1.DecisionSpec{ SchedulingDomain: v1alpha1.SchedulingDomainPods, - ResourceID: pod.Name, + ResourceID: client.ObjectKeyFromObject(pod).String(), PipelineRef: corev1.ObjectReference{ Name: "pods-scheduler", }, @@ DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { pod := evt.Object.(*corev1.Pod) - if err := c.HistoryManager.Delete(ctx, v1alpha1.SchedulingDomainPods, pod.Name); err != nil { + resourceID := client.ObjectKeyFromObject(pod).String() + if err := c.HistoryManager.Delete(ctx, v1alpha1.SchedulingDomainPods, resourceID); err != nil { log := ctrl.LoggerFrom(ctx) log.Error(err, "failed to delete history CRD for pod", "pod", pod.Name) }Also applies to: 224-229
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/pods/filter_weigher_pipeline_controller.go` around lines 76 - 96, The Decision resource uses pod.Name as Decision.Spec.ResourceID which is too weak (collides across namespaces); update ProcessNewPod (and the similar creation at the other location) to set ResourceID to a unique identifier combining namespace and name (e.g., "namespace/name") or better, use pod.UID (preferred) so HistorySpec's ResourceID is unique per pod; change the Decision.Spec.PodRef to remain the same but replace ResourceID assignment from pod.Name to either fmt.Sprintf("%s/%s", pod.Namespace, pod.Name) or string(pod.UID) consistently wherever Decision.Spec.ResourceID is set.internal/scheduling/machines/filter_weigher_pipeline_controller.go (1)
77-97:⚠️ Potential issue | 🟠 Major
machine.Nameis not unique enough for persisted history.With the new
HistoryCRD, the effective key isSchedulingDomain+ResourceID. Using only the machine name means same-named machines in different namespaces can clobber each other’s history.Suggested fix
decision := &v1alpha1.Decision{ ObjectMeta: metav1.ObjectMeta{ GenerateName: "machine-", }, Spec: v1alpha1.DecisionSpec{ SchedulingDomain: v1alpha1.SchedulingDomainMachines, - ResourceID: machine.Name, + ResourceID: client.ObjectKeyFromObject(machine).String(), PipelineRef: corev1.ObjectReference{ Name: "machines-scheduler", }, @@ DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { machine := evt.Object.(*ironcorev1alpha1.Machine) - if err := c.HistoryManager.Delete(ctx, v1alpha1.SchedulingDomainMachines, machine.Name); err != nil { + resourceID := client.ObjectKeyFromObject(machine).String() + if err := c.HistoryManager.Delete(ctx, v1alpha1.SchedulingDomainMachines, resourceID); err != nil { log := ctrl.LoggerFrom(ctx) log.Error(err, "failed to delete history CRD for machine", "machine", machine.Name) }Also applies to: 213-218
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/machines/filter_weigher_pipeline_controller.go` around lines 77 - 97, The Decision.ResourceID is currently set to machine.Name which can collide across namespaces; update FilterWeigherPipelineController.ProcessNewMachine (and the other Decision creation site around the similar block) to use a namespace-qualified key (for example fmt.Sprintf("%s/%s", machine.Namespace, machine.Name) or k8s types.NamespacedName.String()) so Decision.Spec.ResourceID is unique per namespace+name; ensure both places that build v1alpha1.Decision.Spec.ResourceID (and any analogous ResourceID assignments) are changed to the namespaced form.internal/scheduling/cinder/decisions_cleanup_test.go (1)
299-307:⚠️ Potential issue | 🟠 MajorThe deletion check is still looking up the wrong kind.
After this migration, querying
v1alpha1.Decisionhere returnsNotFoundeven when theHistoryobject still exists, so this test no longer protects the cleanup path.Suggested fix
if !tt.expectError { - // Verify expected decisions were deleted + // Verify expected histories were deleted for _, expectedDeleted := range tt.expectedDeleted { - var decision v1alpha1.Decision + var history v1alpha1.History err := client.Get(context.Background(), - types.NamespacedName{Name: expectedDeleted}, &decision) + types.NamespacedName{Name: expectedDeleted}, &history) if err == nil { - t.Errorf("Expected decision %s to be deleted but it still exists", expectedDeleted) + t.Errorf("Expected history %s to be deleted but it still exists", expectedDeleted) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/cinder/decisions_cleanup_test.go` around lines 299 - 307, The test is querying the wrong Kubernetes kind (v1alpha1.Decision) when asserting deletions; change the client.Get call to look up the History kind instead (replace v1alpha1.Decision with v1alpha1.History and adjust the variable name accordingly) so the test verifies the History object was actually deleted, and update the error message to reference the History resource name; keep the existing use of types.NamespacedName{Name: expectedDeleted} and the nil-check logic.
🧹 Nitpick comments (2)
internal/scheduling/lib/filter_weigher_pipeline_test.go (1)
224-228: Consider validating the returned step results.The test correctly adapts to the new
runFilterssignature, but discardingstepResultsmisses an opportunity to verify that filter activations are properly captured. Since this is now a key feature for history tracking, adding assertions would strengthen coverage.💡 Optional enhancement to verify step results
- req, _ := p.runFilters(slog.Default(), request) + req, stepResults := p.runFilters(slog.Default(), request) if len(req.Hosts) != 2 { t.Fatalf("expected 2 step results, got %d", len(req.Hosts)) } + if len(stepResults) != 1 { + t.Fatalf("expected 1 step result, got %d", len(stepResults)) + } + if stepResults[0].StepName != "mock_filter" { + t.Errorf("expected step name 'mock_filter', got %s", stepResults[0].StepName) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/lib/filter_weigher_pipeline_test.go` around lines 224 - 228, The test currently ignores the second return value from p.runFilters (the stepResults) after calling req, _ := p.runFilters(slog.Default(), request); update the test to capture and assert on stepResults (e.g., stepResults := ... or req, stepResults := ...), verifying its length and that entries correspond to expected filters/steps and activation states; specifically assert that stepResults contains the expected number of steps, that each StepResult references the correct filter identifier (by name or ID used in the test) and that the activation/decision fields match the expected outcomes for the given request, while keeping the existing host count assertion for req.Hosts.internal/scheduling/lib/history_manager.go (1)
181-188: Consider copying OrderedHosts to avoid shared slice reference.The slice is assigned directly from
decision.Status.Result.OrderedHosts. If the caller later modifies the original slice, it could inadvertently affect the stored history. While unlikely in current usage patterns, a defensive copy would be safer.♻️ Defensive copy of OrderedHosts
if decision.Status.Result != nil { current.TargetHost = decision.Status.Result.TargetHost hosts := decision.Status.Result.OrderedHosts if len(hosts) > 3 { hosts = hosts[:3] } - current.OrderedHosts = hosts + if len(hosts) > 0 { + current.OrderedHosts = make([]string, len(hosts)) + copy(current.OrderedHosts, hosts) + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/lib/history_manager.go` around lines 181 - 188, The code assigns decision.Status.Result.OrderedHosts directly to current.OrderedHosts which shares the underlying slice and can lead to accidental mutation; update the logic in the block that sets current.OrderedHosts (where decision.Status.Result.OrderedHosts is read) to create a defensive copy of the slice (copy the elements into a new slice, truncating to 3 if needed) before assigning to current.OrderedHosts so the stored history never references the original slice.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/scheduling/cinder/filter_weigher_pipeline_controller.go`:
- Around line 98-103: The fire-and-forget call to c.HistoryManager.Upsert inside
the CreateDecisions branch should be made in-band: remove the goroutine so
c.HistoryManager.Upsert(...) is executed synchronously (use the existing ctx,
decision, v1alpha1.SchedulingIntentUnknown and err parameters), check its
returned error, log it via ctrl.LoggerFrom(ctx).Error and propagate/return the
error from ProcessNewDecisionFromAPI() (or surface it to the caller) instead of
swallowing it; ensure callers observing pipelineConf.Spec.CreateDecisions can
rely on the history write completing before the function returns.
In `@internal/scheduling/machines/filter_weigher_pipeline_controller.go`:
- Around line 120-124: The async goroutine that calls c.HistoryManager.Upsert
with context.Background() can run after the machine is deleted and recreate
stale history; instead perform the Upsert synchronously (remove the goroutine)
or, if you must async, capture and use the request-scoped ctx and verify the
machine still exists before calling c.HistoryManager.Upsert. Concretely, replace
the fire-and-forget go func around c.HistoryManager.Upsert(decision,
v1alpha1.SchedulingIntentUnknown, err) with a direct call or add a pre-check
using the controller's store/client (e.g. check c.MachineStore.Exists/Get or
c.Client.Get for the machine referenced by decision) and only call
HistoryManager.Upsert if the resource still exists; apply the same change to the
duplicate block around lines 213-218.
In `@internal/scheduling/manila/filter_weigher_pipeline_controller.go`:
- Around line 98-103: The current fire-and-forget goroutine around
HistoryManager.Upsert causes ProcessNewDecisionFromAPI (and callers) to believe
persistence succeeded even if the Upsert fails; make the Upsert call in-band
instead of spawning a goroutine: remove the anonymous go func and call
c.HistoryManager.Upsert(ctx, decision, v1alpha1.SchedulingIntentUnknown, err)
synchronously, check its returned error, log it via ctrl.LoggerFrom(ctx).Error,
and return or propagate that error from ProcessNewDecisionFromAPI (or merge it
into the existing err) so callers observe a failure when history persistence
fails; this change touches the CreateDecisions check and the Upsert invocation
in filter_weigher_pipeline_controller.go and ensures HistoryManager.Upsert is
not async.
In `@internal/scheduling/pods/filter_weigher_pipeline_controller.go`:
- Around line 119-123: ProcessNewPod() currently spawns a detached goroutine
that calls c.HistoryManager.Upsert, which can race with Delete() (which runs
synchronously without processMu) and re-create history for a deleted pod; change
the Upsert to run synchronously and under the controller's processMu to prevent
the race (i.e., remove the goroutine and acquire c.processMu before calling
HistoryManager.Upsert inside ProcessNewPod()), and apply the same fix to the
other occurrence that mirrors lines 224-229 so both Upsert calls use the mutex
and are not deferred to a background goroutine.
---
Outside diff comments:
In `@internal/scheduling/cinder/decisions_cleanup_test.go`:
- Around line 299-307: The test is querying the wrong Kubernetes kind
(v1alpha1.Decision) when asserting deletions; change the client.Get call to look
up the History kind instead (replace v1alpha1.Decision with v1alpha1.History and
adjust the variable name accordingly) so the test verifies the History object
was actually deleted, and update the error message to reference the History
resource name; keep the existing use of types.NamespacedName{Name:
expectedDeleted} and the nil-check logic.
In `@internal/scheduling/machines/filter_weigher_pipeline_controller.go`:
- Around line 77-97: The Decision.ResourceID is currently set to machine.Name
which can collide across namespaces; update
FilterWeigherPipelineController.ProcessNewMachine (and the other Decision
creation site around the similar block) to use a namespace-qualified key (for
example fmt.Sprintf("%s/%s", machine.Namespace, machine.Name) or k8s
types.NamespacedName.String()) so Decision.Spec.ResourceID is unique per
namespace+name; ensure both places that build v1alpha1.Decision.Spec.ResourceID
(and any analogous ResourceID assignments) are changed to the namespaced form.
In `@internal/scheduling/manila/decisions_cleanup_test.go`:
- Around line 350-356: The test is asserting deletion against v1alpha1.Decision
but should check v1alpha1.History; update the loop that looks up expectedDeleted
to fetch a v1alpha1.History (e.g., var history v1alpha1.History) and call
client.Get with types.NamespacedName{Name: expectedDeleted} into that history
variable, then assert that an error is returned (or IsNotFound) instead of
checking a Decision—ensure all references to the retrieved object in this block
use the History type and variable name.
In `@internal/scheduling/nova/decisions_cleanup_test.go`:
- Around line 352-360: The test is asserting deletion of History objects but
currently fetches v1alpha1.Decision; change the lookup to fetch a
v1alpha1.History instead. Replace the local variable (e.g., var decision
v1alpha1.Decision) with var history v1alpha1.History and call client.Get(...,
&history) when checking each tt.expectedDeleted (using the same
types.NamespacedName{Name: expectedDeleted}), and update the error message to
reference the History name so the assertion fails if the History was not
actually deleted.
In `@internal/scheduling/pods/filter_weigher_pipeline_controller.go`:
- Around line 76-96: The Decision resource uses pod.Name as
Decision.Spec.ResourceID which is too weak (collides across namespaces); update
ProcessNewPod (and the similar creation at the other location) to set ResourceID
to a unique identifier combining namespace and name (e.g., "namespace/name") or
better, use pod.UID (preferred) so HistorySpec's ResourceID is unique per pod;
change the Decision.Spec.PodRef to remain the same but replace ResourceID
assignment from pod.Name to either fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
or string(pod.UID) consistently wherever Decision.Spec.ResourceID is set.
---
Nitpick comments:
In `@internal/scheduling/lib/filter_weigher_pipeline_test.go`:
- Around line 224-228: The test currently ignores the second return value from
p.runFilters (the stepResults) after calling req, _ :=
p.runFilters(slog.Default(), request); update the test to capture and assert on
stepResults (e.g., stepResults := ... or req, stepResults := ...), verifying its
length and that entries correspond to expected filters/steps and activation
states; specifically assert that stepResults contains the expected number of
steps, that each StepResult references the correct filter identifier (by name or
ID used in the test) and that the activation/decision fields match the expected
outcomes for the given request, while keeping the existing host count assertion
for req.Hosts.
In `@internal/scheduling/lib/history_manager.go`:
- Around line 181-188: The code assigns decision.Status.Result.OrderedHosts
directly to current.OrderedHosts which shares the underlying slice and can lead
to accidental mutation; update the logic in the block that sets
current.OrderedHosts (where decision.Status.Result.OrderedHosts is read) to
create a defensive copy of the slice (copy the elements into a new slice,
truncating to 3 if needed) before assigning to current.OrderedHosts so the
stored history never references the original slice.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c7318694-8787-4c2d-8835-ed238e44afda
📒 Files selected for processing (41)
api/external/nova/messages.goapi/external/nova/messages_test.goapi/v1alpha1/history_types.goapi/v1alpha1/pipeline_types.goapi/v1alpha1/zz_generated.deepcopy.gocmd/main.gohelm/bundles/cortex-cinder/values.yamlhelm/bundles/cortex-ironcore/values.yamlhelm/bundles/cortex-manila/values.yamlhelm/bundles/cortex-nova/values.yamlhelm/bundles/cortex-pods/values.yamlhelm/library/cortex/files/crds/cortex.cloud_histories.yamlhelm/library/cortex/files/crds/cortex.cloud_pipelines.yamlhelm/library/cortex/templates/rbac/role.yamlinternal/scheduling/cinder/decisions_cleanup.gointernal/scheduling/cinder/decisions_cleanup_test.gointernal/scheduling/cinder/filter_weigher_pipeline_controller.gointernal/scheduling/cinder/filter_weigher_pipeline_controller_test.gointernal/scheduling/explanation/controller.gointernal/scheduling/explanation/controller_test.gointernal/scheduling/explanation/explainer.gointernal/scheduling/explanation/explainer_test.gointernal/scheduling/explanation/templates.gointernal/scheduling/explanation/types.gointernal/scheduling/lib/filter_weigher_pipeline.gointernal/scheduling/lib/filter_weigher_pipeline_test.gointernal/scheduling/lib/history_manager.gointernal/scheduling/lib/history_manager_test.gointernal/scheduling/lib/pipeline_controller.gointernal/scheduling/machines/filter_weigher_pipeline_controller.gointernal/scheduling/machines/filter_weigher_pipeline_controller_test.gointernal/scheduling/manila/decisions_cleanup.gointernal/scheduling/manila/decisions_cleanup_test.gointernal/scheduling/manila/filter_weigher_pipeline_controller.gointernal/scheduling/manila/filter_weigher_pipeline_controller_test.gointernal/scheduling/nova/decisions_cleanup.gointernal/scheduling/nova/decisions_cleanup_test.gointernal/scheduling/nova/filter_weigher_pipeline_controller.gointernal/scheduling/nova/filter_weigher_pipeline_controller_test.gointernal/scheduling/pods/filter_weigher_pipeline_controller.gointernal/scheduling/pods/filter_weigher_pipeline_controller_test.go
💤 Files with no reviewable changes (12)
- cmd/main.go
- helm/bundles/cortex-manila/values.yaml
- helm/bundles/cortex-cinder/values.yaml
- helm/bundles/cortex-pods/values.yaml
- helm/bundles/cortex-nova/values.yaml
- internal/scheduling/explanation/types.go
- internal/scheduling/explanation/explainer_test.go
- internal/scheduling/explanation/controller.go
- helm/bundles/cortex-ironcore/values.yaml
- internal/scheduling/explanation/templates.go
- internal/scheduling/explanation/controller_test.go
- internal/scheduling/explanation/explainer.go
internal/scheduling/cinder/filter_weigher_pipeline_controller.go
Outdated
Show resolved
Hide resolved
internal/scheduling/machines/filter_weigher_pipeline_controller.go
Outdated
Show resolved
Hide resolved
internal/scheduling/manila/filter_weigher_pipeline_controller.go
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (5)
internal/scheduling/machines/filter_weigher_pipeline_controller.go (1)
119-126:⚠️ Potential issue | 🟠 MajorAsync upsert races with synchronous delete.
Same issue as the pods controller: the goroutine-based
Upsertcan execute afterDelete()in theDeleteFunchandler, recreating a stale history record for a deleted machine.🛠️ Suggested fix: make upsert synchronous
if pipelineConf.Spec.CreateDecisions { - go func() { - if upsertErr := c.HistoryManager.Upsert(context.Background(), decision, v1alpha1.SchedulingIntentUnknown, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - }() + if upsertErr := c.HistoryManager.Upsert(ctx, decision, v1alpha1.SchedulingIntentUnknown, nil, err); upsertErr != nil { + ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/machines/filter_weigher_pipeline_controller.go` around lines 119 - 126, The asynchronous goroutine calling c.HistoryManager.Upsert when pipelineConf.Spec.CreateDecisions is true can race with the DeleteFunc and recreate stale history; remove the goroutine and perform the Upsert synchronously (call c.HistoryManager.Upsert directly, handle/log any upsertErr) so the Upsert completes before returning from the function (use the existing ctx or context.Background() as done now) — update the block guarded by pipelineConf.Spec.CreateDecisions that references c.HistoryManager.Upsert and ensure the function returns err only after Upsert finishes.internal/scheduling/manila/filter_weigher_pipeline_controller.go (1)
98-104:⚠️ Potential issue | 🟠 MajorFire-and-forget history upsert can silently fail.
The goroutine-based
UpsertmeansProcessNewDecisionFromAPI()can return success while the History write never completes. Since the PR moves persistence behindCreateDecisions, callers may observe no persisted record despite a successful return.Consider making this synchronous or using a bounded context with error propagation.
🛠️ Suggested fix for synchronous upsert
if pipelineConf.Spec.CreateDecisions { - go func() { - if upsertErr := c.HistoryManager.Upsert(context.Background(), decision, v1alpha1.SchedulingIntentUnknown, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - }() + if upsertErr := c.HistoryManager.Upsert(ctx, decision, v1alpha1.SchedulingIntentUnknown, nil, err); upsertErr != nil { + ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") + // Optionally propagate error: return fmt.Errorf("persist history: %w", upsertErr) + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/manila/filter_weigher_pipeline_controller.go` around lines 98 - 104, ProcessNewDecisionFromAPI() currently fire-and-forgets HistoryManager.Upsert in a goroutine when pipelineConf.Spec.CreateDecisions is true, which can let the function return before the history write completes; change this to perform a synchronous Upsert (or call Upsert with a bounded context and propagate any error back to the caller) instead of launching a goroutine: call c.HistoryManager.Upsert(ctxOrBoundedCtx, decision, v1alpha1.SchedulingIntentUnknown, nil, err), check the returned error and surface/log/return it appropriately so callers observe failures to persist history when CreateDecisions is enabled.internal/scheduling/pods/filter_weigher_pipeline_controller.go (1)
118-126:⚠️ Potential issue | 🟠 MajorAsync upsert races with synchronous delete.
The goroutine-based
Upsert(line 119-123) can execute afterDelete()in theDeleteFunchandler (lines 224-229), recreating a stale history record for a deleted pod. TheDeletehandler doesn't acquireprocessMuand runs synchronously, whileProcessNewPodreleases the mutex before the goroutine completes.🛠️ Suggested fix: make upsert synchronous
if pipelineConf.Spec.CreateDecisions { - go func() { - if upsertErr := c.HistoryManager.Upsert(context.Background(), decision, v1alpha1.SchedulingIntentUnknown, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - }() + if upsertErr := c.HistoryManager.Upsert(ctx, decision, v1alpha1.SchedulingIntentUnknown, nil, err); upsertErr != nil { + ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/pods/filter_weigher_pipeline_controller.go` around lines 118 - 126, Async Upsert races with Delete: remove the goroutine and call c.HistoryManager.Upsert synchronously when pipelineConf.Spec.CreateDecisions is true (i.e., replace the go func with a direct call to c.HistoryManager.Upsert using the current context and same arguments), so that the Upsert completes before ProcessNewPod releases processMu; keep the existing error handling (ctrl.LoggerFrom(ctx).Error) and return semantics unchanged; alternatively (if synchronous call would deadlock) ensure DeleteFunc also acquires processMu before deleting, but the preferred fix is to make the Upsert in ProcessNewPod synchronous by invoking c.HistoryManager.Upsert directly instead of spawning a goroutine.internal/scheduling/lib/history_manager_test.go (1)
558-586:⚠️ Potential issue | 🟡 MinorCalling
t.Errorfinside a goroutine is unsafe.The test spawns a goroutine that calls
t.Errorfwithout synchronization. If the goroutine outlives the test function (e.g., if the polling loop exits early or times out), this can cause a race or panic. The test should wait for the goroutine to complete before exiting.🛠️ Proposed fix using a channel
// Mirrors the pattern used in pipeline controllers. ctx := context.Background() + errCh := make(chan error, 1) go func() { - if err := hm.Upsert(ctx, decision, v1alpha1.SchedulingIntentUnknown, nil, nil); err != nil { - t.Errorf("Upsert() returned error: %v", err) - } + errCh <- hm.Upsert(ctx, decision, v1alpha1.SchedulingIntentUnknown, nil, nil) }() // Poll for history creation. var histories v1alpha1.HistoryList deadline := time.Now().Add(2 * time.Second) for { if err := c.List(context.Background(), &histories); err != nil { t.Fatalf("Failed to list histories: %v", err) } if len(histories.Items) > 0 { break } if time.Now().After(deadline) { t.Fatal("timed out waiting for async history creation") } time.Sleep(5 * time.Millisecond) } + // Wait for the goroutine to complete and check for errors. + if err := <-errCh; err != nil { + t.Errorf("Upsert() returned error: %v", err) + } + got := histories.Items[0].Status.Current.TargetHost🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/lib/history_manager_test.go` around lines 558 - 586, The goroutine that calls hm.Upsert should be synchronized so the test doesn't call t.Errorf from a background goroutine; replace the unsynchronized goroutine with one that reports its result over a channel (or use a sync.WaitGroup) and ensure the main test goroutine waits for it before returning. Specifically, when you call go func() { if err := hm.Upsert(...) { /* send err on errCh */ } }(), create an errCh (or wg) before starting the goroutine, send any error into that channel, and after the polling loop receive from errCh (or wg.Wait()) and then call t.Errorf/t.Fatalf from the main test goroutine if an error was reported; reference hm.Upsert and the async goroutine around the polling of histories.Items.internal/scheduling/cinder/filter_weigher_pipeline_controller.go (1)
98-104:⚠️ Potential issue | 🟠 MajorFire-and-forget history upsert can silently fail.
Same issue as in the Manila controller—the goroutine-based
Upsertcan fail silently whileProcessNewDecisionFromAPI()returns success.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/cinder/filter_weigher_pipeline_controller.go` around lines 98 - 104, The fire-and-forget goroutine calling c.HistoryManager.Upsert when pipelineConf.Spec.CreateDecisions is true can cause silent failures; change this to perform the Upsert synchronously (remove the goroutine) and propagate or handle its error from ProcessNewDecisionFromAPI so ProcessNewDecisionFromAPI only returns success if Upsert succeeds—locate the block using pipelineConf.Spec.CreateDecisions, c.HistoryManager.Upsert, and ProcessNewDecisionFromAPI to implement the synchronous call and proper error handling/logging.
🧹 Nitpick comments (4)
internal/scheduling/machines/filter_weigher_pipeline_controller.go (1)
213-219: Delete handler should acquire mutex for consistency.If the upsert remains async, the
DeleteFuncshould acquireprocessMuto prevent the race with in-flight upserts.♻️ Suggested fix
DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { + c.processMu.Lock() + defer c.processMu.Unlock() machine := evt.Object.(*ironcorev1alpha1.Machine) if err := c.HistoryManager.Delete(ctx, v1alpha1.SchedulingDomainMachines, machine.Name); err != nil { log := ctrl.LoggerFrom(ctx) log.Error(err, "failed to delete history CRD for machine", "machine", machine.Name) } },🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/machines/filter_weigher_pipeline_controller.go` around lines 213 - 219, The DeleteFunc handling machine deletions can race with async upserts; acquire the controller's processMu before performing the history delete to serialise against in-flight upserts. In the DeleteFunc (the anonymous function registered as DeleteFunc) add a processMu.Lock() at the start and defer processMu.Unlock() before calling c.HistoryManager.Delete(ctx, v1alpha1.SchedulingDomainMachines, machine.Name) so the delete is performed under the same mutex used by the upsert path (reference processMu, DeleteFunc, and HistoryManager.Delete to locate the code).internal/scheduling/pods/filter_weigher_pipeline_controller.go (1)
224-230: Delete handler should acquire mutex for consistency.If the upsert remains async, the
DeleteFuncshould acquireprocessMuto prevent the race with in-flight upserts.♻️ Suggested fix
DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { + c.processMu.Lock() + defer c.processMu.Unlock() pod := evt.Object.(*corev1.Pod) if err := c.HistoryManager.Delete(ctx, v1alpha1.SchedulingDomainPods, pod.Name); err != nil { log := ctrl.LoggerFrom(ctx) log.Error(err, "failed to delete history CRD for pod", "pod", pod.Name) } },🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/pods/filter_weigher_pipeline_controller.go` around lines 224 - 230, The DeleteFunc handler currently calls c.HistoryManager.Delete without synchronizing with in-flight async upserts; wrap the deletion in the controller's processMu lock to avoid races by calling c.processMu.Lock() at the start of DeleteFunc and defer c.processMu.Unlock() before invoking c.HistoryManager.Delete (preserving existing logging behavior on error). Ensure you acquire the same mutex used by the upsert path (processMu) and keep the rest of the DeleteFunc logic unchanged.internal/scheduling/nova/filter_weigher_pipeline_controller.go (1)
101-132: Async history upsert with extracted metadata.The
upsertHistoryhelper extracts availability zone and scheduling intent from the request, which is good for richer history records. However, the fire-and-forget goroutine pattern (line 102) means failures are only logged and not propagated to callers.Additionally, the goroutine uses
context.Background()(line 129) which is intentional to decouple from the request lifecycle, but this means the upsert has no timeout and could hang indefinitely.♻️ Consider adding a timeout
func (c *FilterWeigherPipelineController) upsertHistory(ctx context.Context, decision *v1alpha1.Decision, pipelineErr error) { log := ctrl.LoggerFrom(ctx) var az *string intent := v1alpha1.SchedulingIntentUnknown if decision.Spec.NovaRaw != nil { var request api.ExternalSchedulerRequest err := json.Unmarshal(decision.Spec.NovaRaw.Raw, &request) if err != nil { log.Error(err, "failed to unmarshal novaRaw for history, using defaults") } else { azStr := request.Spec.Data.AvailabilityZone az = &azStr if parsedIntent, intentErr := request.GetIntent(); intentErr != nil { log.Error(intentErr, "failed to get intent from nova request, using Unknown") } else { intent = parsedIntent } } } - if upsertErr := c.HistoryManager.Upsert(context.Background(), decision, intent, az, pipelineErr); upsertErr != nil { + upsertCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if upsertErr := c.HistoryManager.Upsert(upsertCtx, decision, intent, az, pipelineErr); upsertErr != nil { log.Error(upsertErr, "failed to create/update history") } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/nova/filter_weigher_pipeline_controller.go` around lines 101 - 132, The current fire-and-forget call to upsertHistory from FilterWeigherPipelineController starts a goroutine without a timeout and calls HistoryManager.Upsert with context.Background(), risking indefinite hangs and lost/ungoverned work; change the goroutine invocation to derive a cancellable context with a bounded timeout (e.g., context.WithTimeout(ctx, <reasonable duration>)), pass that context into upsertHistory (update the signature of upsertHistory to accept ctx context.Context), ensure the goroutine defers cancel(), and inside upsertHistory use the passed ctx for c.HistoryManager.Upsert instead of context.Background(); keep the call asynchronous (go ...) so callers aren’t blocked and preserve existing logging of upsert errors (or add a metric emission if desired).internal/scheduling/lib/history_manager.go (1)
216-219: Add conflict-retry around status updates to prevent concurrent write loss.Multiple pipeline controllers call
Upsert()concurrently via goroutines. When the same History object is updated by concurrent calls, a TOCTOU race can occur: one goroutine'sGet()fetches a stale resource version, then itsStatus().Update()fails with a conflict error, losing the status write. The suggested refactor withRetryOnConflictis the standard Kubernetes pattern for handling such races.Proposed refactor
+ "k8s.io/client-go/util/retry" @@ - if updateErr := h.Client.Status().Update(ctx, history); updateErr != nil { + if updateErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latest := &v1alpha1.History{} + if err := h.Client.Get(ctx, client.ObjectKey{Name: name}, latest); err != nil { + return err + } + latest.Status = history.Status + return h.Client.Status().Update(ctx, latest) + }); updateErr != nil { log.Error(updateErr, "failed to update history CRD status", "name", name) return updateErr }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/lib/history_manager.go` around lines 216 - 219, The status update in Upsert() currently calls h.Client.Status().Update(ctx, history) directly and can fail with a conflict when concurrent goroutines modify the same History; wrap the status update in a kubernetes retry loop using clientretry.RetryOnConflict to Get() the latest History, apply the status changes to that fresh object, and then call h.Client.Status().Update until it succeeds or the retry returns an error; reference the Upsert() function, the history variable, name/name identifier and the h.Client.Status().Update call when making the change and ensure you preserve context and returned error handling.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/scheduling/lib/history_manager.go`:
- Around line 136-140: Spec.AvailabilityZone is only set on History creation so
it can remain stale; update the code paths that construct or persist
v1alpha1.History (both the create branch where v1alpha1.HistorySpec{...
AvailabilityZone: az} is used and the later block referenced in the comment) to
ensure the current az is applied on upsert: when you build the HistorySpec for
either create or update, assign AvailabilityZone = az, and if loading an
existing History, compare existing.Spec.AvailabilityZone to az and patch/update
the History resource when they differ so AZ changes are persisted.
- Around line 115-126: Upsert currently dereferences decision immediately (used
to build name via getName(decision.Spec.SchedulingDomain,
decision.Spec.ResourceID)); add a nil guard at the top of HistoryManager.Upsert
to validate decision is not nil and return a clear error (or log and return) if
it is nil to avoid panics in async goroutines; follow the same defensive pattern
used by generateExplanation and ensure subsequent uses of decision (e.g.,
getName, accessing Spec) only run after the nil check.
---
Duplicate comments:
In `@internal/scheduling/cinder/filter_weigher_pipeline_controller.go`:
- Around line 98-104: The fire-and-forget goroutine calling
c.HistoryManager.Upsert when pipelineConf.Spec.CreateDecisions is true can cause
silent failures; change this to perform the Upsert synchronously (remove the
goroutine) and propagate or handle its error from ProcessNewDecisionFromAPI so
ProcessNewDecisionFromAPI only returns success if Upsert succeeds—locate the
block using pipelineConf.Spec.CreateDecisions, c.HistoryManager.Upsert, and
ProcessNewDecisionFromAPI to implement the synchronous call and proper error
handling/logging.
In `@internal/scheduling/lib/history_manager_test.go`:
- Around line 558-586: The goroutine that calls hm.Upsert should be synchronized
so the test doesn't call t.Errorf from a background goroutine; replace the
unsynchronized goroutine with one that reports its result over a channel (or use
a sync.WaitGroup) and ensure the main test goroutine waits for it before
returning. Specifically, when you call go func() { if err := hm.Upsert(...) { /*
send err on errCh */ } }(), create an errCh (or wg) before starting the
goroutine, send any error into that channel, and after the polling loop receive
from errCh (or wg.Wait()) and then call t.Errorf/t.Fatalf from the main test
goroutine if an error was reported; reference hm.Upsert and the async goroutine
around the polling of histories.Items.
In `@internal/scheduling/machines/filter_weigher_pipeline_controller.go`:
- Around line 119-126: The asynchronous goroutine calling
c.HistoryManager.Upsert when pipelineConf.Spec.CreateDecisions is true can race
with the DeleteFunc and recreate stale history; remove the goroutine and perform
the Upsert synchronously (call c.HistoryManager.Upsert directly, handle/log any
upsertErr) so the Upsert completes before returning from the function (use the
existing ctx or context.Background() as done now) — update the block guarded by
pipelineConf.Spec.CreateDecisions that references c.HistoryManager.Upsert and
ensure the function returns err only after Upsert finishes.
In `@internal/scheduling/manila/filter_weigher_pipeline_controller.go`:
- Around line 98-104: ProcessNewDecisionFromAPI() currently fire-and-forgets
HistoryManager.Upsert in a goroutine when pipelineConf.Spec.CreateDecisions is
true, which can let the function return before the history write completes;
change this to perform a synchronous Upsert (or call Upsert with a bounded
context and propagate any error back to the caller) instead of launching a
goroutine: call c.HistoryManager.Upsert(ctxOrBoundedCtx, decision,
v1alpha1.SchedulingIntentUnknown, nil, err), check the returned error and
surface/log/return it appropriately so callers observe failures to persist
history when CreateDecisions is enabled.
In `@internal/scheduling/pods/filter_weigher_pipeline_controller.go`:
- Around line 118-126: Async Upsert races with Delete: remove the goroutine and
call c.HistoryManager.Upsert synchronously when
pipelineConf.Spec.CreateDecisions is true (i.e., replace the go func with a
direct call to c.HistoryManager.Upsert using the current context and same
arguments), so that the Upsert completes before ProcessNewPod releases
processMu; keep the existing error handling (ctrl.LoggerFrom(ctx).Error) and
return semantics unchanged; alternatively (if synchronous call would deadlock)
ensure DeleteFunc also acquires processMu before deleting, but the preferred fix
is to make the Upsert in ProcessNewPod synchronous by invoking
c.HistoryManager.Upsert directly instead of spawning a goroutine.
---
Nitpick comments:
In `@internal/scheduling/lib/history_manager.go`:
- Around line 216-219: The status update in Upsert() currently calls
h.Client.Status().Update(ctx, history) directly and can fail with a conflict
when concurrent goroutines modify the same History; wrap the status update in a
kubernetes retry loop using clientretry.RetryOnConflict to Get() the latest
History, apply the status changes to that fresh object, and then call
h.Client.Status().Update until it succeeds or the retry returns an error;
reference the Upsert() function, the history variable, name/name identifier and
the h.Client.Status().Update call when making the change and ensure you preserve
context and returned error handling.
In `@internal/scheduling/machines/filter_weigher_pipeline_controller.go`:
- Around line 213-219: The DeleteFunc handling machine deletions can race with
async upserts; acquire the controller's processMu before performing the history
delete to serialise against in-flight upserts. In the DeleteFunc (the anonymous
function registered as DeleteFunc) add a processMu.Lock() at the start and defer
processMu.Unlock() before calling c.HistoryManager.Delete(ctx,
v1alpha1.SchedulingDomainMachines, machine.Name) so the delete is performed
under the same mutex used by the upsert path (reference processMu, DeleteFunc,
and HistoryManager.Delete to locate the code).
In `@internal/scheduling/nova/filter_weigher_pipeline_controller.go`:
- Around line 101-132: The current fire-and-forget call to upsertHistory from
FilterWeigherPipelineController starts a goroutine without a timeout and calls
HistoryManager.Upsert with context.Background(), risking indefinite hangs and
lost/ungoverned work; change the goroutine invocation to derive a cancellable
context with a bounded timeout (e.g., context.WithTimeout(ctx, <reasonable
duration>)), pass that context into upsertHistory (update the signature of
upsertHistory to accept ctx context.Context), ensure the goroutine defers
cancel(), and inside upsertHistory use the passed ctx for
c.HistoryManager.Upsert instead of context.Background(); keep the call
asynchronous (go ...) so callers aren’t blocked and preserve existing logging of
upsert errors (or add a metric emission if desired).
In `@internal/scheduling/pods/filter_weigher_pipeline_controller.go`:
- Around line 224-230: The DeleteFunc handler currently calls
c.HistoryManager.Delete without synchronizing with in-flight async upserts; wrap
the deletion in the controller's processMu lock to avoid races by calling
c.processMu.Lock() at the start of DeleteFunc and defer c.processMu.Unlock()
before invoking c.HistoryManager.Delete (preserving existing logging behavior on
error). Ensure you acquire the same mutex used by the upsert path (processMu)
and keep the rest of the DeleteFunc logic unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 10cae06d-47ef-40d4-b4f2-13baff932c9e
📒 Files selected for processing (10)
api/v1alpha1/history_types.goapi/v1alpha1/zz_generated.deepcopy.gohelm/library/cortex/files/crds/cortex.cloud_histories.yamlinternal/scheduling/cinder/filter_weigher_pipeline_controller.gointernal/scheduling/lib/history_manager.gointernal/scheduling/lib/history_manager_test.gointernal/scheduling/machines/filter_weigher_pipeline_controller.gointernal/scheduling/manila/filter_weigher_pipeline_controller.gointernal/scheduling/nova/filter_weigher_pipeline_controller.gointernal/scheduling/pods/filter_weigher_pipeline_controller.go
🚧 Files skipped from review as they are similar to previous changes (1)
- api/v1alpha1/zz_generated.deepcopy.go
…n and error handling
…try deletion for Cinder and Manila
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
internal/scheduling/lib/history_manager.go (2)
126-137:⚠️ Potential issue | 🟠 MajorGuard
decisionbefore dereferencing inUpsert.At Line 136,
decision.Spec...is accessed unconditionally. A nil input will panic the reconciliation path.Proposed fix
func (h *HistoryManager) Upsert( ctx context.Context, decision *v1alpha1.Decision, intent v1alpha1.SchedulingIntent, az *string, pipelineErr error, ) error { + if decision == nil { + return fmt.Errorf("decision must not be nil") + } log := ctrl.LoggerFrom(ctx)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/lib/history_manager.go` around lines 126 - 137, The Upsert method dereferences decision (decision.Spec.ResourceID etc.) without guarding for nil; add a nil-check at the start of HistoryManager.Upsert (validate that the decision argument is non-nil) and return a clear error if nil before calling getName or touching decision.Spec, so callers won't panic when passing a nil decision; update any related callers/tests if needed to handle the returned error.
141-167:⚠️ Potential issue | 🟠 Major
AvailabilityZonecan go stale on existing History objects.
Spec.AvailabilityZoneis set on create, but not reconciled on update path. If AZ is learned/changed later, persisted spec remains outdated.Proposed fix
} else if err != nil { log.Error(err, "failed to get history CRD", "name", name) return err } + + // Keep AZ in sync when it becomes known or changes. + if (history.Spec.AvailabilityZone == nil) != (az == nil) || + (history.Spec.AvailabilityZone != nil && az != nil && *history.Spec.AvailabilityZone != *az) { + history.Spec.AvailabilityZone = az + if specErr := h.Client.Update(ctx, history); specErr != nil { + log.Error(specErr, "failed to update history CRD spec", "name", name) + return specErr + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/lib/history_manager.go` around lines 141 - 167, The current logic only sets v1alpha1.History.Spec.AvailabilityZone when creating a new History but never updates existing History objects, so stale AZs persist; modify the reconciliation in the function around the existing Get/Create block (the code that uses h.Client.Get, history, decision, and h.Client.Create) to, when an existing history is found (err == nil), compare history.Spec.AvailabilityZone with the computed az and, if they differ, update the History spec to set AvailabilityZone to az (use either h.Client.Update or a Patch to update only Spec.AvailabilityZone and handle conflicts/retries—e.g., re-get on conflict—while preserving other fields), and log success/error via log.Error as currently done for create/get failures.
🧹 Nitpick comments (1)
internal/scheduling/lib/history_manager.go (1)
181-183: Replace hard-coded history cap with a named constant.
10is duplicated as behavior, but only explanation cap is currently named. A dedicatedmaxHistoryEntriesconstant improves maintainability.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/lib/history_manager.go` around lines 181 - 183, Replace the hard-coded literal 10 with a named constant: declare a package-level constant maxHistoryEntries = 10 (near the existing explanation cap constant) and use maxHistoryEntries in the trimming logic that currently touches history.Status.History (the snippet in history_manager.go that slices history.Status.History). Also update any other occurrences of the same literal used for history limits to use maxHistoryEntries to keep behavior consistent and maintainable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/scheduling/lib/history_manager.go`:
- Around line 227-230: Upsert currently calls h.Client.Status().Update(ctx,
history) once and drops updates on conflict; replace this with a bounded retry
loop (e.g., maxRetries constant) that attempts Status().Update, and on an
apierrors.IsConflict(err) re-fetches the latest History object
(h.Client.Get(ctx, namespacedName, history)), reapplies the same status
modifications to the refreshed history object, and retries until success or
maxRetries exhausted; for non-conflict errors return immediately, and if retries
are exhausted return/log the final error (keep using Upsert,
h.Client.Status().Update, history, ctx, and name to locate and modify the
object).
---
Duplicate comments:
In `@internal/scheduling/lib/history_manager.go`:
- Around line 126-137: The Upsert method dereferences decision
(decision.Spec.ResourceID etc.) without guarding for nil; add a nil-check at the
start of HistoryManager.Upsert (validate that the decision argument is non-nil)
and return a clear error if nil before calling getName or touching
decision.Spec, so callers won't panic when passing a nil decision; update any
related callers/tests if needed to handle the returned error.
- Around line 141-167: The current logic only sets
v1alpha1.History.Spec.AvailabilityZone when creating a new History but never
updates existing History objects, so stale AZs persist; modify the
reconciliation in the function around the existing Get/Create block (the code
that uses h.Client.Get, history, decision, and h.Client.Create) to, when an
existing history is found (err == nil), compare history.Spec.AvailabilityZone
with the computed az and, if they differ, update the History spec to set
AvailabilityZone to az (use either h.Client.Update or a Patch to update only
Spec.AvailabilityZone and handle conflicts/retries—e.g., re-get on
conflict—while preserving other fields), and log success/error via log.Error as
currently done for create/get failures.
---
Nitpick comments:
In `@internal/scheduling/lib/history_manager.go`:
- Around line 181-183: Replace the hard-coded literal 10 with a named constant:
declare a package-level constant maxHistoryEntries = 10 (near the existing
explanation cap constant) and use maxHistoryEntries in the trimming logic that
currently touches history.Status.History (the snippet in history_manager.go that
slices history.Status.History). Also update any other occurrences of the same
literal used for history limits to use maxHistoryEntries to keep behavior
consistent and maintainable.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5d904ac5-c9f7-4149-9fd2-33d1645bf939
📒 Files selected for processing (2)
internal/scheduling/lib/history_manager.gointernal/scheduling/lib/history_manager_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/scheduling/lib/history_manager_test.go
There was a problem hiding this comment.
♻️ Duplicate comments (2)
internal/scheduling/lib/history_manager.go (2)
152-156:⚠️ Potential issue | 🟠 Major
AvailabilityZonebecomes stale after initial creation.
Spec.AvailabilityZoneis only set when creating a new History CRD. If the availability zone becomes known or changes on subsequent upserts, the existing History'sAvailabilityZonefield is never updated.🛠️ Proposed fix to sync AZ on existing History
} else if err != nil { log.Error(err, "failed to get history CRD", "name", name) return err } + + // Sync AvailabilityZone if it changed. + azChanged := (history.Spec.AvailabilityZone == nil) != (az == nil) || + (history.Spec.AvailabilityZone != nil && az != nil && *history.Spec.AvailabilityZone != *az) + if azChanged { + history.Spec.AvailabilityZone = az + if updateErr := h.Client.Update(ctx, history); updateErr != nil { + log.Error(updateErr, "failed to update history CRD spec", "name", name) + return updateErr + } + } successful := pipelineErr == nil && decision.Status.Result != nil && decision.Status.Result.TargetHost != nil🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/lib/history_manager.go` around lines 152 - 156, The History CRD's Spec.AvailabilityZone is only set on creation and never updated on upsert; modify the upsert logic in internal/scheduling/lib/history_manager.go (the code that constructs v1alpha1.HistorySpec using decision.Spec.SchedulingDomain, decision.Spec.ResourceID and az) so that when an existing History is found you set/update history.Spec.AvailabilityZone = az and persist that change (call the client's Update/Patch for the existing History instead of skipping updates). Ensure the update path uses the same History object fetched and only changes the AvailabilityZone field before saving.
229-235:⚠️ Potential issue | 🟠 MajorMissing retry logic for concurrent status update conflicts.
The
Upsertmethod is called asynchronously from multiple pipeline controllers. When two goroutines fetch the same History CRD, the firstStatus().Update()succeeds while the second fails with a conflict error. Without a retry loop, the second update is permanently lost.Consider using
retry.RetryOnConflictfromk8s.io/client-go/util/retryto handle resourceVersion conflicts gracefully.🛠️ Proposed fix using RetryOnConflict
+import "k8s.io/client-go/util/retry" + // In Upsert method, wrap the status update: - if updateErr := h.Client.Status().Update(ctx, history); updateErr != nil { - log.Error(updateErr, "failed to update history CRD status", "name", name) - return updateErr - } + updateErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // Re-fetch on retry to get latest resourceVersion. + if err := h.Client.Get(ctx, client.ObjectKey{Name: name}, history); err != nil { + return err + } + // Re-apply status modifications here (archive, current, conditions)... + return h.Client.Status().Update(ctx, history) + }) + if updateErr != nil { + log.Error(updateErr, "failed to update history CRD status", "name", name) + return updateErr + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/lib/history_manager.go` around lines 229 - 235, The Status().Update call in Upsert can fail with resourceVersion conflicts when multiple goroutines update the same History CRD; wrap the update in a retry.RetryOnConflict loop (importing k8s.io/client-go/util/retry) so you re-fetch the History object (using h.Client.Get or the same retrieval path), apply the intended status changes to the fresh copy, and call h.Client.Status().Update until success or max retries; specifically modify the block around h.Client.Status().Update(ctx, history) in Upsert to retry on conflict and preserve the existing log/error handling when the update ultimately fails.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@internal/scheduling/lib/history_manager.go`:
- Around line 152-156: The History CRD's Spec.AvailabilityZone is only set on
creation and never updated on upsert; modify the upsert logic in
internal/scheduling/lib/history_manager.go (the code that constructs
v1alpha1.HistorySpec using decision.Spec.SchedulingDomain,
decision.Spec.ResourceID and az) so that when an existing History is found you
set/update history.Spec.AvailabilityZone = az and persist that change (call the
client's Update/Patch for the existing History instead of skipping updates).
Ensure the update path uses the same History object fetched and only changes the
AvailabilityZone field before saving.
- Around line 229-235: The Status().Update call in Upsert can fail with
resourceVersion conflicts when multiple goroutines update the same History CRD;
wrap the update in a retry.RetryOnConflict loop (importing
k8s.io/client-go/util/retry) so you re-fetch the History object (using
h.Client.Get or the same retrieval path), apply the intended status changes to
the fresh copy, and call h.Client.Status().Update until success or max retries;
specifically modify the block around h.Client.Status().Update(ctx, history) in
Upsert to retry on conflict and preserve the existing log/error handling when
the update ultimately fails.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cb53a7e1-2c12-402d-9840-c76ec7c8cff1
📒 Files selected for processing (4)
internal/scheduling/cinder/decisions_cleanup.gointernal/scheduling/lib/history_manager.gointernal/scheduling/lib/history_manager_test.gointernal/scheduling/manila/decisions_cleanup.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/scheduling/manila/decisions_cleanup.go
There was a problem hiding this comment.
♻️ Duplicate comments (1)
internal/scheduling/machines/filter_weigher_pipeline_controller.go (1)
120-121:⚠️ Potential issue | 🟠 MajorUse request-scoped context for history upsert (not
context.Background()).This call runs while
processMuis held; dropping deadlines/cancellation can stall scheduling on slow client calls. Usectxhere.💡 Suggested fix
- if upsertErr := c.HistoryManager.Upsert(context.Background(), decision, v1alpha1.SchedulingIntentUnknown, nil, err); upsertErr != nil { + if upsertErr := c.HistoryManager.Upsert(ctx, decision, v1alpha1.SchedulingIntentUnknown, nil, err); upsertErr != nil { ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") }Run this to verify remaining
context.Background()upsert calls in scheduling controllers:#!/bin/bash set -euo pipefail # Show all HistoryManager.Upsert call sites with context. rg -nP --type=go 'HistoryManager\.Upsert\(' internal/scheduling -C2 # Specifically flag Upsert calls using context.Background(). rg -nP --type=go 'HistoryManager\.Upsert\(\s*context\.Background\(\)' internal/scheduling -C2🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/scheduling/machines/filter_weigher_pipeline_controller.go` around lines 120 - 121, The Upsert call is using context.Background() while holding processMu which can drop cancellation/deadlines and stall scheduling; replace the call to c.HistoryManager.Upsert(context.Background(), ...) with c.HistoryManager.Upsert(ctx, ...) so it uses the request-scoped ctx (the same ctx used when logging via ctrl.LoggerFrom(ctx)) — update the call site in filter_weigher_pipeline_controller.go where c.HistoryManager.Upsert is invoked while processMu is held to pass ctx instead of context.Background().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@internal/scheduling/machines/filter_weigher_pipeline_controller.go`:
- Around line 120-121: The Upsert call is using context.Background() while
holding processMu which can drop cancellation/deadlines and stall scheduling;
replace the call to c.HistoryManager.Upsert(context.Background(), ...) with
c.HistoryManager.Upsert(ctx, ...) so it uses the request-scoped ctx (the same
ctx used when logging via ctrl.LoggerFrom(ctx)) — update the call site in
filter_weigher_pipeline_controller.go where c.HistoryManager.Upsert is invoked
while processMu is held to pass ctx instead of context.Background().
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1227c0af-2dbf-4fbf-8cf5-ade897505f04
📒 Files selected for processing (5)
api/v1alpha1/history_types.gohelm/library/cortex/files/crds/cortex.cloud_histories.yamlinternal/scheduling/lib/history_manager.gointernal/scheduling/machines/filter_weigher_pipeline_controller.gointernal/scheduling/pods/filter_weigher_pipeline_controller.go
🚧 Files skipped from review as they are similar to previous changes (3)
- helm/library/cortex/files/crds/cortex.cloud_histories.yaml
- internal/scheduling/lib/history_manager.go
- api/v1alpha1/history_types.go
…decision processing
…e a constant for max hosts limit
…rrent updates with retry logic
…on conflict resolution
…istory management
There was a problem hiding this comment.
Pull request overview
This PR refactors the scheduling “Decision” persistence/explanation flow by introducing a new cluster-scoped History CRD and a HistoryManager that records the latest decision plus a bounded history, while keeping the Decision CRD as an in-memory/DTO concept (deprecated, no longer persisted).
Changes:
- Add
HistoryCRD +HistoryManagerto upsert/delete histories and generate simplified explanations (plus emit K8s Events). - Update all filter/weigher pipeline controllers and cleanup tasks to write/read
Historyinstead of persistingDecision. - Remove the old explanation controller/template explainer, and extend pipeline results with per-step activations (
StepResults).
Reviewed changes
Copilot reviewed 41 out of 41 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/scheduling/pods/filter_weigher_pipeline_controller_test.go | Update pod pipeline tests to assert History creation instead of Decision persistence. |
| internal/scheduling/pods/filter_weigher_pipeline_controller.go | Stop persisting Decisions; upsert/delete History entries keyed by domain+resourceID. |
| internal/scheduling/nova/filter_weigher_pipeline_controller_test.go | Update nova pipeline tests to assert History creation. |
| internal/scheduling/nova/filter_weigher_pipeline_controller.go | Upsert History asynchronously (incl. intent/AZ extraction) instead of patching Decision status. |
| internal/scheduling/nova/decisions_cleanup_test.go | Switch cleanup tests from Decisions to Histories. |
| internal/scheduling/nova/decisions_cleanup.go | Cleanup task now deletes orphaned Histories instead of Decisions. |
| internal/scheduling/manila/filter_weigher_pipeline_controller_test.go | Update manila pipeline tests to assert History creation. |
| internal/scheduling/manila/filter_weigher_pipeline_controller.go | Upsert History asynchronously instead of persisting/patching Decision. |
| internal/scheduling/manila/decisions_cleanup_test.go | Switch cleanup tests from Decisions to Histories. |
| internal/scheduling/manila/decisions_cleanup.go | Cleanup task now deletes orphaned Histories instead of Decisions. |
| internal/scheduling/machines/filter_weigher_pipeline_controller_test.go | Update machine pipeline tests to assert History creation. |
| internal/scheduling/machines/filter_weigher_pipeline_controller.go | Stop persisting Decisions; upsert/delete History entries for machines. |
| internal/scheduling/lib/pipeline_controller.go | Add HistoryManager to the shared base controller struct. |
| internal/scheduling/lib/history_manager_test.go | New unit tests covering HistoryManager upsert/delete and explanation generation. |
| internal/scheduling/lib/history_manager.go | New HistoryManager implementation: upsert, delete, explanation, bounded history/hosts. |
| internal/scheduling/lib/filter_weigher_pipeline_test.go | Adjust tests for changed runFilters signature. |
| internal/scheduling/lib/filter_weigher_pipeline.go | Capture filter+weigher activations into DecisionResult.StepResults. |
| internal/scheduling/explanation/types.go | Remove old explanation templating types (deleted). |
| internal/scheduling/explanation/templates.go | Remove old explanation template rendering (deleted). |
| internal/scheduling/explanation/explainer_test.go | Remove old explainer tests (deleted). |
| internal/scheduling/explanation/explainer.go | Remove old explainer implementation (deleted). |
| internal/scheduling/explanation/controller_test.go | Remove old explanation controller tests (deleted). |
| internal/scheduling/explanation/controller.go | Remove old explanation controller (deleted). |
| internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go | Update cinder pipeline tests to assert History creation. |
| internal/scheduling/cinder/filter_weigher_pipeline_controller.go | Upsert History asynchronously instead of persisting/patching Decision. |
| internal/scheduling/cinder/decisions_cleanup_test.go | Switch cleanup tests from Decisions to Histories. |
| internal/scheduling/cinder/decisions_cleanup.go | Cleanup task now deletes orphaned Histories instead of Decisions. |
| helm/library/cortex/templates/rbac/role.yaml | Grant RBAC for histories + event creation/patch for HistoryManager event emission. |
| helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml | Document why createDecisions flag name is kept for compatibility. |
| helm/library/cortex/files/crds/cortex.cloud_histories.yaml | Add History CRD schema and printer columns. |
| helm/bundles/cortex-pods/values.yaml | Disable explanation-controller in pods bundle. |
| helm/bundles/cortex-nova/values.yaml | Disable explanation-controller in nova bundle. |
| helm/bundles/cortex-manila/values.yaml | Disable explanation-controller in manila bundle. |
| helm/bundles/cortex-ironcore/values.yaml | Disable explanation-controller in ironcore bundle. |
| helm/bundles/cortex-cinder/values.yaml | Disable explanation-controller in cinder bundle. |
| cmd/main.go | Remove wiring for explanation-controller. |
| api/v1alpha1/zz_generated.deepcopy.go | Add generated deepcopy implementations for new History-related types. |
| api/v1alpha1/pipeline_types.go | Add compatibility note on CreateDecisions field. |
| api/v1alpha1/history_types.go | Define new History CRD Go types (SchedulingIntent, History spec/status, etc.). |
| api/external/nova/messages_test.go | Update intent type expectation to v1alpha1.SchedulingIntent. |
| api/external/nova/messages.go | Switch nova request intent parsing to return v1alpha1.SchedulingIntent. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go
Outdated
Show resolved
Hide resolved
…ts and update test expectations for history creation
Test Coverage ReportTest Coverage 📊: 67.1% |
Note
The Decision CRD is kept as-is for now because removing it entirely from the workflow between the external scheduler API and the pipeline controller would be too complex at this point. Its current use case is primarily to trigger scheduling runs and to serve as a DTO within the scheduler pipeline. We plan to replace it with something more fitting in the future, and since a similar concept will likely be needed, it makes more sense to leave it in place rather than tear it out now. Ofc the Decision CRD will no longer be persisted. Just see it as deprecated. The functionality that was originally planned for the Decision CRD has been moved into a new History CRD. Once we have a clearer plan on how to restructure the CRD workflow and can fully retire the now-deprecated Decision CRD, the History CRD can be renamed to Decision if appropriate.
Note
As ugly as this state is: The flag to persist the decision in the History CRD is still called
createDecisionto avoid breaking changesChanges
HistoryManagerto explain decisions