Skip to content

Commit ef643e8

Browse files
worstellampcode-com
andcommitted
feat(metrics): add scheduler and git operation OTel metrics
Add OpenTelemetry metrics for operational visibility into the job scheduler and git strategy: Scheduler metrics: - cachew.scheduler.queue_depth: pending jobs gauge - cachew.scheduler.active_workers: running workers gauge - cachew.scheduler.active_clones: clone-specific concurrency gauge - cachew.scheduler.jobs_total: counter by job type and status - cachew.scheduler.job_duration_seconds: histogram by job type and status Git operation metrics: - cachew.git.operations_total: counter by operation/upstream/status - cachew.git.operation_duration_seconds: histogram for clone/fetch/snapshot - cachew.git.requests_total: HTTP request counter by type All metrics flow through the existing OTel/Prometheus pipeline and are scraped on :9102/metrics automatically. Co-authored-by: Amp <amp@ampcode.com> Amp-Thread-ID: https://ampcode.com/threads/T-019d404e-21ec-723a-b211-c619925dd12e
1 parent 4d2963e commit ef643e8

5 files changed

Lines changed: 222 additions & 42 deletions

File tree

internal/jobscheduler/jobs.go

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"time"
1010

1111
"github.com/alecthomas/errors"
12+
"go.opentelemetry.io/otel/attribute"
13+
"go.opentelemetry.io/otel/metric"
1214

1315
"github.com/block/cachew/internal/logging"
1416
)
@@ -80,6 +82,7 @@ type RootScheduler struct {
8082
maxCloneConcurrency int
8183
cancel context.CancelFunc
8284
store ScheduleStore
85+
metrics *schedulerMetrics
8386
}
8487

8588
var _ Scheduler = &RootScheduler{}
@@ -111,11 +114,16 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
111114
// Default: reserve at least half the workers for non-clone jobs.
112115
maxClones = max(1, config.Concurrency/2)
113116
}
117+
m, err := newSchedulerMetrics()
118+
if err != nil {
119+
return nil, errors.Wrap(err, "create scheduler metrics")
120+
}
114121
q := &RootScheduler{
115122
workAvailable: make(chan bool, 1024),
116123
active: make(map[string]string),
117124
maxCloneConcurrency: maxClones,
118125
store: store,
126+
metrics: m,
119127
}
120128
ctx, cancel := context.WithCancel(ctx)
121129
q.cancel = cancel
@@ -141,8 +149,9 @@ func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler {
141149

142150
func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
143151
q.lock.Lock()
144-
defer q.lock.Unlock()
145152
q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run})
153+
q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue)))
154+
q.lock.Unlock()
146155
q.workAvailable <- true
147156
}
148157

@@ -201,26 +210,52 @@ func (q *RootScheduler) worker(ctx context.Context, id int) {
201210
if !ok {
202211
continue
203212
}
213+
jobAttrs := attribute.String("job.type", jobType(job.id))
204214
start := time.Now()
205215
logger.InfoContext(ctx, "Starting job", "job", job)
206-
if err := job.run(ctx); err != nil {
207-
logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", time.Since(start))
216+
err := job.run(ctx)
217+
elapsed := time.Since(start)
218+
status := "success"
219+
if err != nil {
220+
status = "error"
221+
logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", elapsed)
208222
} else {
209-
logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", time.Since(start))
223+
logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", elapsed)
210224
}
225+
q.metrics.jobsTotal.Add(ctx, 1, metric.WithAttributes(jobAttrs, attribute.String("status", status)))
226+
q.metrics.jobDuration.Record(ctx, elapsed.Seconds(), metric.WithAttributes(jobAttrs, attribute.String("status", status)))
211227
q.markQueueInactive(job.queue)
212228
q.workAvailable <- true
213229
}
214230
}
215231
}
216232

233+
// jobType extracts a normalised job type from the job ID for metric labels.
234+
func jobType(id string) string {
235+
switch {
236+
case strings.HasSuffix(id, "clone"):
237+
return "clone"
238+
case strings.HasSuffix(id, "deferred-mirror-restore"):
239+
return "clone"
240+
case strings.HasSuffix(id, "fetch"):
241+
return "fetch"
242+
case strings.HasSuffix(id, "snapshot-periodic"), strings.HasSuffix(id, "mirror-snapshot-periodic"):
243+
return "snapshot"
244+
case strings.HasSuffix(id, "repack-periodic"):
245+
return "repack"
246+
default:
247+
return "other"
248+
}
249+
}
250+
217251
func (q *RootScheduler) markQueueInactive(queue string) {
218252
q.lock.Lock()
219253
defer q.lock.Unlock()
220254
if isCloneJob(q.active[queue]) {
221255
q.activeClones--
222256
}
223257
delete(q.active, queue)
258+
q.recordGaugesLocked()
224259
}
225260

226261
// isCloneJob returns true for job IDs that represent long-running clone operations
@@ -246,7 +281,16 @@ func (q *RootScheduler) takeNextJob() (queueJob, bool) {
246281
if isCloneJob(job.id) {
247282
q.activeClones++
248283
}
284+
q.recordGaugesLocked()
249285
return job, true
250286
}
251287
return queueJob{}, false
252288
}
289+
290+
// recordGaugesLocked updates gauge metrics. Must be called with q.lock held.
291+
func (q *RootScheduler) recordGaugesLocked() {
292+
ctx := context.Background()
293+
q.metrics.queueDepth.Record(ctx, int64(len(q.queue)))
294+
q.metrics.activeWorkers.Record(ctx, int64(len(q.active)))
295+
q.metrics.activeClones.Record(ctx, int64(q.activeClones))
296+
}

internal/jobscheduler/metrics.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package jobscheduler
2+
3+
import (
4+
"github.com/alecthomas/errors"
5+
"go.opentelemetry.io/otel"
6+
"go.opentelemetry.io/otel/metric"
7+
)
8+
9+
type schedulerMetrics struct {
10+
queueDepth metric.Int64Gauge
11+
activeWorkers metric.Int64Gauge
12+
activeClones metric.Int64Gauge
13+
jobsTotal metric.Int64Counter
14+
jobDuration metric.Float64Histogram
15+
}
16+
17+
func newSchedulerMetrics() (*schedulerMetrics, error) {
18+
meter := otel.Meter("cachew.scheduler")
19+
m := &schedulerMetrics{}
20+
var err error
21+
22+
if m.queueDepth, err = meter.Int64Gauge("cachew.scheduler.queue_depth",
23+
metric.WithDescription("Number of jobs waiting in the scheduler queue"),
24+
metric.WithUnit("{jobs}")); err != nil {
25+
return nil, errors.Wrap(err, "create queue_depth gauge")
26+
}
27+
28+
if m.activeWorkers, err = meter.Int64Gauge("cachew.scheduler.active_workers",
29+
metric.WithDescription("Number of workers currently executing jobs"),
30+
metric.WithUnit("{workers}")); err != nil {
31+
return nil, errors.Wrap(err, "create active_workers gauge")
32+
}
33+
34+
if m.activeClones, err = meter.Int64Gauge("cachew.scheduler.active_clones",
35+
metric.WithDescription("Number of clone jobs currently executing"),
36+
metric.WithUnit("{jobs}")); err != nil {
37+
return nil, errors.Wrap(err, "create active_clones gauge")
38+
}
39+
40+
if m.jobsTotal, err = meter.Int64Counter("cachew.scheduler.jobs_total",
41+
metric.WithDescription("Total number of completed scheduler jobs"),
42+
metric.WithUnit("{jobs}")); err != nil {
43+
return nil, errors.Wrap(err, "create jobs_total counter")
44+
}
45+
46+
if m.jobDuration, err = meter.Float64Histogram("cachew.scheduler.job_duration_seconds",
47+
metric.WithDescription("Duration of scheduler jobs in seconds"),
48+
metric.WithUnit("s")); err != nil {
49+
return nil, errors.Wrap(err, "create job_duration histogram")
50+
}
51+
52+
return m, nil
53+
}

internal/strategy/git/git.go

Lines changed: 60 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type Strategy struct {
5858
snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry
5959
coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry
6060
deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo
61+
metrics *gitMetrics
6162
}
6263

6364
func New(
@@ -108,6 +109,11 @@ func New(
108109
return nil, errors.Wrap(err, "failed to create scheduler")
109110
}
110111

112+
m, err := newGitMetrics()
113+
if err != nil {
114+
return nil, errors.Wrap(err, "create git metrics")
115+
}
116+
111117
s := &Strategy{
112118
config: config,
113119
cache: cache,
@@ -117,47 +123,11 @@ func New(
117123
scheduler: scheduler.WithQueuePrefix("git"),
118124
spools: make(map[string]*RepoSpools),
119125
tokenManager: tokenManager,
126+
metrics: m,
120127
}
121128
s.config.ServerURL = strings.TrimRight(config.ServerURL, "/")
122129

123-
existing, err := s.cloneManager.DiscoverExisting(ctx)
124-
if err != nil {
125-
logger.WarnContext(ctx, "Failed to discover existing clones", "error", err)
126-
}
127-
for _, repo := range existing {
128-
logger.InfoContext(ctx, "Running startup fetch for existing repo", "upstream", repo.UpstreamURL())
129-
130-
preRefs, err := repo.GetLocalRefs(ctx)
131-
if err != nil {
132-
logger.WarnContext(ctx, "Failed to get pre-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
133-
"error", err)
134-
}
135-
136-
start := time.Now()
137-
if err := repo.FetchLenient(ctx, gitclone.CloneTimeout); err != nil {
138-
logger.ErrorContext(ctx, "Startup fetch failed for existing repo", "upstream", repo.UpstreamURL(), "error", err,
139-
"duration", time.Since(start))
140-
continue
141-
}
142-
logger.InfoContext(ctx, "Startup fetch completed for existing repo", "upstream", repo.UpstreamURL(),
143-
"duration", time.Since(start))
144-
145-
postRefs, err := repo.GetLocalRefs(ctx)
146-
if err != nil {
147-
logger.WarnContext(ctx, "Failed to get post-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
148-
"error", err)
149-
} else {
150-
maps.DeleteFunc(postRefs, func(k, v string) bool { return preRefs[k] == v })
151-
logger.InfoContext(ctx, "Post-fetch changed refs for existing repo", "upstream", repo.UpstreamURL(), "refs", postRefs)
152-
}
153-
154-
if s.config.SnapshotInterval > 0 {
155-
s.scheduleSnapshotJobs(repo)
156-
}
157-
if s.config.RepackInterval > 0 {
158-
s.scheduleRepackJobs(repo)
159-
}
160-
}
130+
s.warmExistingRepos(ctx)
161131

162132
s.proxy = &httputil.ReverseProxy{
163133
Director: func(req *http.Request) {
@@ -198,6 +168,48 @@ func New(
198168

199169
var _ strategy.Strategy = (*Strategy)(nil)
200170

171+
func (s *Strategy) warmExistingRepos(ctx context.Context) {
172+
logger := logging.FromContext(ctx)
173+
existing, err := s.cloneManager.DiscoverExisting(ctx)
174+
if err != nil {
175+
logger.WarnContext(ctx, "Failed to discover existing clones", "error", err)
176+
}
177+
for _, repo := range existing {
178+
logger.InfoContext(ctx, "Running startup fetch for existing repo", "upstream", repo.UpstreamURL())
179+
180+
preRefs, err := repo.GetLocalRefs(ctx)
181+
if err != nil {
182+
logger.WarnContext(ctx, "Failed to get pre-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
183+
"error", err)
184+
}
185+
186+
start := time.Now()
187+
if err := repo.FetchLenient(ctx, gitclone.CloneTimeout); err != nil {
188+
logger.ErrorContext(ctx, "Startup fetch failed for existing repo", "upstream", repo.UpstreamURL(), "error", err,
189+
"duration", time.Since(start))
190+
continue
191+
}
192+
logger.InfoContext(ctx, "Startup fetch completed for existing repo", "upstream", repo.UpstreamURL(),
193+
"duration", time.Since(start))
194+
195+
postRefs, err := repo.GetLocalRefs(ctx)
196+
if err != nil {
197+
logger.WarnContext(ctx, "Failed to get post-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
198+
"error", err)
199+
} else {
200+
maps.DeleteFunc(postRefs, func(k, v string) bool { return preRefs[k] == v })
201+
logger.InfoContext(ctx, "Post-fetch changed refs for existing repo", "upstream", repo.UpstreamURL(), "refs", postRefs)
202+
}
203+
204+
if s.config.SnapshotInterval > 0 {
205+
s.scheduleSnapshotJobs(repo)
206+
}
207+
if s.config.RepackInterval > 0 {
208+
s.scheduleRepackJobs(repo)
209+
}
210+
}
211+
}
212+
201213
// SetHTTPTransport overrides the HTTP transport used for upstream requests.
202214
// This is intended for testing.
203215
func (s *Strategy) SetHTTPTransport(t http.RoundTripper) {
@@ -217,11 +229,13 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
217229
logger.DebugContext(ctx, "Git request", "method", r.Method, "host", host, "path", pathValue)
218230

219231
if strings.HasSuffix(pathValue, "/snapshot.tar.zst") {
232+
s.metrics.recordRequest(ctx, "snapshot")
220233
s.handleSnapshotRequest(w, r, host, pathValue)
221234
return
222235
}
223236

224237
if strings.HasSuffix(pathValue, "/snapshot.bundle") {
238+
s.metrics.recordRequest(ctx, "bundle")
225239
s.handleBundleRequest(w, r, host, pathValue)
226240
return
227241
}
@@ -230,11 +244,14 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
230244
isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack")
231245

232246
if isReceivePack {
247+
s.metrics.recordRequest(ctx, "receive-pack")
233248
logger.DebugContext(ctx, "Forwarding write operation to upstream")
234249
s.forwardToUpstream(w, r, host, pathValue)
235250
return
236251
}
237252

253+
s.metrics.recordRequest(ctx, "upload-pack")
254+
238255
repoPath := ExtractRepoPath(pathValue)
239256
upstreamURL := "https://" + host + "/" + repoPath
240257

@@ -514,6 +531,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
514531

515532
logger.InfoContext(ctx, "Starting clone", "upstream", upstream, "path", repo.Path())
516533

534+
cloneStart := time.Now()
517535
err := repo.Clone(ctx)
518536

519537
// Clean up spools regardless of clone success or failure, so that subsequent
@@ -523,11 +541,13 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
523541
}
524542

525543
if err != nil {
544+
s.metrics.recordOperation(ctx, "clone", upstream, "error", time.Since(cloneStart))
526545
logger.ErrorContext(ctx, "Clone failed", "upstream", upstream, "error", err)
527546
repo.ResetToEmpty()
528547
return
529548
}
530549

550+
s.metrics.recordOperation(ctx, "clone", upstream, "success", time.Since(cloneStart))
531551
logger.InfoContext(ctx, "Clone completed", "upstream", upstream, "path", repo.Path())
532552

533553
if s.config.SnapshotInterval > 0 {
@@ -601,9 +621,11 @@ func (s *Strategy) doFetch(ctx context.Context, repo *gitclone.Repository) error
601621

602622
start := time.Now()
603623
if err := repo.Fetch(ctx); err != nil {
624+
s.metrics.recordOperation(ctx, "fetch", repo.UpstreamURL(), "error", time.Since(start))
604625
logger.ErrorContext(ctx, "Fetch failed", "upstream", repo.UpstreamURL(), "duration", time.Since(start), "error", err)
605626
return errors.Errorf("fetch failed: %w", err)
606627
}
628+
s.metrics.recordOperation(ctx, "fetch", repo.UpstreamURL(), "success", time.Since(start))
607629
logger.InfoContext(ctx, "Fetch completed", "upstream", repo.UpstreamURL(), "duration", time.Since(start))
608630
return nil
609631
}

0 commit comments

Comments
 (0)