From 6f6b9e0c808b919c27c58318cab3858ea01d9f98 Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Mon, 30 Mar 2026 13:24:44 -0700 Subject: [PATCH] feat(scheduler): add priority queue support Add a priority-queues config option to the scheduler that accepts a list of queue name prefixes. Jobs whose queue matches a priority prefix are dequeued before non-priority jobs, while maintaining FIFO order within each tier. This allows operators to ensure that known important repositories (e.g., monorepos) are never starved by a flood of cold clone jobs for less critical repos. The queue name is the upstream URL, so configuration is straightforward: scheduler { priority-queues = ["https://github.com/org/monorepo"] } Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019d404e-21ec-723a-b211-c619925dd12e --- internal/jobscheduler/jobs.go | 61 ++++++++++++++++++++++-------- internal/jobscheduler/jobs_test.go | 60 +++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 15 deletions(-) diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index a87d550..b697210 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -16,9 +16,10 @@ import ( ) type Config struct { - Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"` - MaxCloneConcurrency int `hcl:"max-clone-concurrency" help:"Maximum number of concurrent clone jobs. Remaining worker slots are reserved for fetch/repack/snapshot jobs. 0 means no limit." default:"0"` - SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"` + Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"` + MaxCloneConcurrency int `hcl:"max-clone-concurrency" help:"Maximum number of concurrent clone jobs. Remaining worker slots are reserved for fetch/repack/snapshot jobs. 0 means no limit." default:"0"` + SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"` + PriorityQueues []string `hcl:"priority-queues,optional" help:"Queue name prefixes that should be dequeued before other jobs. Matches if the queue starts with any prefix."` } type queueJob struct { @@ -80,6 +81,7 @@ type RootScheduler struct { active map[string]string // queue -> job id activeClones int maxCloneConcurrency int + priorityQueues []string cancel context.CancelFunc store ScheduleStore metrics *schedulerMetrics @@ -122,6 +124,7 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) { workAvailable: make(chan bool, 1024), active: make(map[string]string), maxCloneConcurrency: maxClones, + priorityQueues: config.PriorityQueues, store: store, metrics: m, } @@ -265,27 +268,55 @@ func isCloneJob(id string) bool { return strings.HasSuffix(id, "clone") || strings.HasSuffix(id, "deferred-mirror-restore") } -// Take the next job for any queue that is not already running a job. +// takeNextJob selects the next eligible job. Priority queue jobs are preferred over non-priority jobs; +// within the same priority tier, jobs are dequeued in submission order (FIFO). func (q *RootScheduler) takeNextJob() (queueJob, bool) { q.lock.Lock() defer q.lock.Unlock() + idx := -1 for i, job := range q.queue { - if _, active := q.active[job.queue]; active { + if !q.isEligibleLocked(job) { continue } - if q.maxCloneConcurrency > 0 && isCloneJob(job.id) && q.activeClones >= q.maxCloneConcurrency { - continue + if q.isPriority(job.queue) { + idx = i + break + } + if idx == -1 { + idx = i } - q.queue = append(q.queue[:i], q.queue[i+1:]...) - q.workAvailable <- true - q.active[job.queue] = job.id - if isCloneJob(job.id) { - q.activeClones++ + } + if idx == -1 { + return queueJob{}, false + } + job := q.queue[idx] + q.queue = append(q.queue[:idx], q.queue[idx+1:]...) + q.workAvailable <- true + q.active[job.queue] = job.id + if isCloneJob(job.id) { + q.activeClones++ + } + q.recordGaugesLocked() + return job, true +} + +func (q *RootScheduler) isEligibleLocked(job queueJob) bool { + if _, active := q.active[job.queue]; active { + return false + } + if q.maxCloneConcurrency > 0 && isCloneJob(job.id) && q.activeClones >= q.maxCloneConcurrency { + return false + } + return true +} + +func (q *RootScheduler) isPriority(queue string) bool { + for _, prefix := range q.priorityQueues { + if strings.HasPrefix(queue, prefix) { + return true } - q.recordGaugesLocked() - return job, true } - return queueJob{}, false + return false } // recordGaugesLocked updates gauge metrics. Must be called with q.lock held. diff --git a/internal/jobscheduler/jobs_test.go b/internal/jobscheduler/jobs_test.go index 6db5034..97fc51c 100644 --- a/internal/jobscheduler/jobs_test.go +++ b/internal/jobscheduler/jobs_test.go @@ -460,6 +460,66 @@ func FuzzJobScheduler(f *testing.F) { }) } +func TestJobSchedulerPriorityQueues(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Single worker so jobs execute sequentially, making order deterministic. + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{ + Concurrency: 1, + PriorityQueues: []string{"https://github.com/important/"}, + }) + + var ( + mu sync.Mutex + order []string + ) + + // Block the single worker so all subsequent submits queue up. + blockerStarted := make(chan struct{}) + blocker := make(chan struct{}) + scheduler.Submit("blocker", "block", func(_ context.Context) error { + close(blockerStarted) + <-blocker + return nil + }) + <-blockerStarted + + // Submit non-priority first, then priority. Priority should execute first after the blocker. + scheduler.Submit("https://github.com/random/repo", "job", func(_ context.Context) error { + mu.Lock() + order = append(order, "random") + mu.Unlock() + return nil + }) + scheduler.Submit("https://github.com/important/monorepo", "job", func(_ context.Context) error { + mu.Lock() + order = append(order, "important") + mu.Unlock() + return nil + }) + scheduler.Submit("https://github.com/other/thing", "job", func(_ context.Context) error { + mu.Lock() + order = append(order, "other") + mu.Unlock() + return nil + }) + + // Release the blocker. + close(blocker) + + eventually(t, 2*time.Second, func() bool { + mu.Lock() + defer mu.Unlock() + return len(order) == 3 + }, "all jobs should complete") + + mu.Lock() + defer mu.Unlock() + assert.Equal(t, "important", order[0], "priority job should execute first, got: %v", order) +} + func TestJobSchedulerCloneConcurrencyLimit(t *testing.T) { _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) ctx, cancel := context.WithCancel(ctx)