diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 86945d1..a87d550 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -9,6 +9,8 @@ import ( "time" "github.com/alecthomas/errors" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "github.com/block/cachew/internal/logging" ) @@ -80,6 +82,7 @@ type RootScheduler struct { maxCloneConcurrency int cancel context.CancelFunc store ScheduleStore + metrics *schedulerMetrics } var _ Scheduler = &RootScheduler{} @@ -111,11 +114,16 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) { // Default: reserve at least half the workers for non-clone jobs. maxClones = max(1, config.Concurrency/2) } + m, err := newSchedulerMetrics() + if err != nil { + return nil, errors.Wrap(err, "create scheduler metrics") + } q := &RootScheduler{ workAvailable: make(chan bool, 1024), active: make(map[string]string), maxCloneConcurrency: maxClones, store: store, + metrics: m, } ctx, cancel := context.WithCancel(ctx) q.cancel = cancel @@ -141,8 +149,9 @@ func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler { func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) { q.lock.Lock() - defer q.lock.Unlock() q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run}) + q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue))) + q.lock.Unlock() q.workAvailable <- true } @@ -201,19 +210,45 @@ func (q *RootScheduler) worker(ctx context.Context, id int) { if !ok { continue } + jobAttrs := attribute.String("job.type", jobType(job.id)) start := time.Now() logger.InfoContext(ctx, "Starting job", "job", job) - if err := job.run(ctx); err != nil { - logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", time.Since(start)) + err := job.run(ctx) + elapsed := time.Since(start) + status := "success" + if err != nil { + status = "error" + logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", elapsed) } else { - logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", time.Since(start)) + logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", elapsed) } + statusAttr := attribute.String("status", status) + q.metrics.jobsTotal.Add(ctx, 1, metric.WithAttributes(jobAttrs, statusAttr)) + q.metrics.jobDuration.Record(ctx, elapsed.Seconds(), metric.WithAttributes(jobAttrs, statusAttr)) q.markQueueInactive(job.queue) q.workAvailable <- true } } } +// jobType extracts a normalised job type from the job ID for metric labels. +func jobType(id string) string { + switch { + case strings.HasSuffix(id, "clone"): + return "clone" + case strings.HasSuffix(id, "deferred-mirror-restore"): + return "clone" + case strings.HasSuffix(id, "fetch"): + return "fetch" + case strings.HasSuffix(id, "snapshot-periodic"), strings.HasSuffix(id, "mirror-snapshot-periodic"): + return "snapshot" + case strings.HasSuffix(id, "repack-periodic"): + return "repack" + default: + return "other" + } +} + func (q *RootScheduler) markQueueInactive(queue string) { q.lock.Lock() defer q.lock.Unlock() @@ -221,6 +256,7 @@ func (q *RootScheduler) markQueueInactive(queue string) { q.activeClones-- } delete(q.active, queue) + q.recordGaugesLocked() } // isCloneJob returns true for job IDs that represent long-running clone operations @@ -246,7 +282,16 @@ func (q *RootScheduler) takeNextJob() (queueJob, bool) { if isCloneJob(job.id) { q.activeClones++ } + q.recordGaugesLocked() return job, true } return queueJob{}, false } + +// recordGaugesLocked updates gauge metrics. Must be called with q.lock held. +func (q *RootScheduler) recordGaugesLocked() { + ctx := context.Background() + q.metrics.queueDepth.Record(ctx, int64(len(q.queue))) + q.metrics.activeWorkers.Record(ctx, int64(len(q.active))) + q.metrics.activeClones.Record(ctx, int64(q.activeClones)) +} diff --git a/internal/jobscheduler/metrics.go b/internal/jobscheduler/metrics.go new file mode 100644 index 0000000..f853206 --- /dev/null +++ b/internal/jobscheduler/metrics.go @@ -0,0 +1,53 @@ +package jobscheduler + +import ( + "github.com/alecthomas/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +type schedulerMetrics struct { + queueDepth metric.Int64Gauge + activeWorkers metric.Int64Gauge + activeClones metric.Int64Gauge + jobsTotal metric.Int64Counter + jobDuration metric.Float64Histogram +} + +func newSchedulerMetrics() (*schedulerMetrics, error) { + meter := otel.Meter("cachew.scheduler") + m := &schedulerMetrics{} + var err error + + if m.queueDepth, err = meter.Int64Gauge("cachew.scheduler.queue_depth", + metric.WithDescription("Number of jobs waiting in the scheduler queue"), + metric.WithUnit("{jobs}")); err != nil { + return nil, errors.Wrap(err, "create queue_depth gauge") + } + + if m.activeWorkers, err = meter.Int64Gauge("cachew.scheduler.active_workers", + metric.WithDescription("Number of workers currently executing jobs"), + metric.WithUnit("{workers}")); err != nil { + return nil, errors.Wrap(err, "create active_workers gauge") + } + + if m.activeClones, err = meter.Int64Gauge("cachew.scheduler.active_clones", + metric.WithDescription("Number of clone jobs currently executing"), + metric.WithUnit("{jobs}")); err != nil { + return nil, errors.Wrap(err, "create active_clones gauge") + } + + if m.jobsTotal, err = meter.Int64Counter("cachew.scheduler.jobs_total", + metric.WithDescription("Total number of completed scheduler jobs"), + metric.WithUnit("{jobs}")); err != nil { + return nil, errors.Wrap(err, "create jobs_total counter") + } + + if m.jobDuration, err = meter.Float64Histogram("cachew.scheduler.job_duration_seconds", + metric.WithDescription("Duration of scheduler jobs in seconds"), + metric.WithUnit("s")); err != nil { + return nil, errors.Wrap(err, "create job_duration histogram") + } + + return m, nil +} diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 80776b6..5e9e306 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -58,6 +58,7 @@ type Strategy struct { snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo + metrics *gitMetrics } func New( @@ -108,6 +109,11 @@ func New( return nil, errors.Wrap(err, "failed to create scheduler") } + m, err := newGitMetrics() + if err != nil { + return nil, errors.Wrap(err, "create git metrics") + } + s := &Strategy{ config: config, cache: cache, @@ -117,47 +123,11 @@ func New( scheduler: scheduler.WithQueuePrefix("git"), spools: make(map[string]*RepoSpools), tokenManager: tokenManager, + metrics: m, } s.config.ServerURL = strings.TrimRight(config.ServerURL, "/") - existing, err := s.cloneManager.DiscoverExisting(ctx) - if err != nil { - logger.WarnContext(ctx, "Failed to discover existing clones", "error", err) - } - for _, repo := range existing { - logger.InfoContext(ctx, "Running startup fetch for existing repo", "upstream", repo.UpstreamURL()) - - preRefs, err := repo.GetLocalRefs(ctx) - if err != nil { - logger.WarnContext(ctx, "Failed to get pre-fetch refs for existing repo", "upstream", repo.UpstreamURL(), - "error", err) - } - - start := time.Now() - if err := repo.FetchLenient(ctx, gitclone.CloneTimeout); err != nil { - logger.ErrorContext(ctx, "Startup fetch failed for existing repo", "upstream", repo.UpstreamURL(), "error", err, - "duration", time.Since(start)) - continue - } - logger.InfoContext(ctx, "Startup fetch completed for existing repo", "upstream", repo.UpstreamURL(), - "duration", time.Since(start)) - - postRefs, err := repo.GetLocalRefs(ctx) - if err != nil { - logger.WarnContext(ctx, "Failed to get post-fetch refs for existing repo", "upstream", repo.UpstreamURL(), - "error", err) - } else { - maps.DeleteFunc(postRefs, func(k, v string) bool { return preRefs[k] == v }) - logger.InfoContext(ctx, "Post-fetch changed refs for existing repo", "upstream", repo.UpstreamURL(), "refs", postRefs) - } - - if s.config.SnapshotInterval > 0 { - s.scheduleSnapshotJobs(repo) - } - if s.config.RepackInterval > 0 { - s.scheduleRepackJobs(repo) - } - } + s.warmExistingRepos(ctx) s.proxy = &httputil.ReverseProxy{ Director: func(req *http.Request) { @@ -198,6 +168,48 @@ func New( var _ strategy.Strategy = (*Strategy)(nil) +func (s *Strategy) warmExistingRepos(ctx context.Context) { + logger := logging.FromContext(ctx) + existing, err := s.cloneManager.DiscoverExisting(ctx) + if err != nil { + logger.WarnContext(ctx, "Failed to discover existing clones", "error", err) + } + for _, repo := range existing { + logger.InfoContext(ctx, "Running startup fetch for existing repo", "upstream", repo.UpstreamURL()) + + preRefs, err := repo.GetLocalRefs(ctx) + if err != nil { + logger.WarnContext(ctx, "Failed to get pre-fetch refs for existing repo", "upstream", repo.UpstreamURL(), + "error", err) + } + + start := time.Now() + if err := repo.FetchLenient(ctx, gitclone.CloneTimeout); err != nil { + logger.ErrorContext(ctx, "Startup fetch failed for existing repo", "upstream", repo.UpstreamURL(), "error", err, + "duration", time.Since(start)) + continue + } + logger.InfoContext(ctx, "Startup fetch completed for existing repo", "upstream", repo.UpstreamURL(), + "duration", time.Since(start)) + + postRefs, err := repo.GetLocalRefs(ctx) + if err != nil { + logger.WarnContext(ctx, "Failed to get post-fetch refs for existing repo", "upstream", repo.UpstreamURL(), + "error", err) + } else { + maps.DeleteFunc(postRefs, func(k, v string) bool { return preRefs[k] == v }) + logger.InfoContext(ctx, "Post-fetch changed refs for existing repo", "upstream", repo.UpstreamURL(), "refs", postRefs) + } + + if s.config.SnapshotInterval > 0 { + s.scheduleSnapshotJobs(repo) + } + if s.config.RepackInterval > 0 { + s.scheduleRepackJobs(repo) + } + } +} + // SetHTTPTransport overrides the HTTP transport used for upstream requests. // This is intended for testing. func (s *Strategy) SetHTTPTransport(t http.RoundTripper) { @@ -217,11 +229,13 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { logger.DebugContext(ctx, "Git request", "method", r.Method, "host", host, "path", pathValue) if strings.HasSuffix(pathValue, "/snapshot.tar.zst") { + s.metrics.recordRequest(ctx, "snapshot") s.handleSnapshotRequest(w, r, host, pathValue) return } if strings.HasSuffix(pathValue, "/snapshot.bundle") { + s.metrics.recordRequest(ctx, "bundle") s.handleBundleRequest(w, r, host, pathValue) return } @@ -230,11 +244,14 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack") if isReceivePack { + s.metrics.recordRequest(ctx, "receive-pack") logger.DebugContext(ctx, "Forwarding write operation to upstream") s.forwardToUpstream(w, r, host, pathValue) return } + s.metrics.recordRequest(ctx, "upload-pack") + repoPath := ExtractRepoPath(pathValue) upstreamURL := "https://" + host + "/" + repoPath @@ -514,6 +531,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { logger.InfoContext(ctx, "Starting clone", "upstream", upstream, "path", repo.Path()) + cloneStart := time.Now() err := repo.Clone(ctx) // Clean up spools regardless of clone success or failure, so that subsequent @@ -523,11 +541,13 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { } if err != nil { + s.metrics.recordOperation(ctx, "clone", "error", time.Since(cloneStart)) logger.ErrorContext(ctx, "Clone failed", "upstream", upstream, "error", err) repo.ResetToEmpty() return } + s.metrics.recordOperation(ctx, "clone", "success", time.Since(cloneStart)) logger.InfoContext(ctx, "Clone completed", "upstream", upstream, "path", repo.Path()) if s.config.SnapshotInterval > 0 { @@ -601,9 +621,11 @@ func (s *Strategy) doFetch(ctx context.Context, repo *gitclone.Repository) error start := time.Now() if err := repo.Fetch(ctx); err != nil { + s.metrics.recordOperation(ctx, "fetch", "error", time.Since(start)) logger.ErrorContext(ctx, "Fetch failed", "upstream", repo.UpstreamURL(), "duration", time.Since(start), "error", err) return errors.Errorf("fetch failed: %w", err) } + s.metrics.recordOperation(ctx, "fetch", "success", time.Since(start)) logger.InfoContext(ctx, "Fetch completed", "upstream", repo.UpstreamURL(), "duration", time.Since(start)) return nil } diff --git a/internal/strategy/git/metrics.go b/internal/strategy/git/metrics.go new file mode 100644 index 0000000..8fb58c7 --- /dev/null +++ b/internal/strategy/git/metrics.go @@ -0,0 +1,57 @@ +package git + +import ( + "context" + "time" + + "github.com/alecthomas/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type gitMetrics struct { + operationDuration metric.Float64Histogram + operationTotal metric.Int64Counter + requestTotal metric.Int64Counter +} + +func newGitMetrics() (*gitMetrics, error) { + meter := otel.Meter("cachew.git") + m := &gitMetrics{} + var err error + + if m.operationDuration, err = meter.Float64Histogram("cachew.git.operation_duration_seconds", + metric.WithDescription("Duration of git operations (clone, fetch, repack, snapshot)"), + metric.WithUnit("s")); err != nil { + return nil, errors.Wrap(err, "create operation_duration histogram") + } + + if m.operationTotal, err = meter.Int64Counter("cachew.git.operations_total", + metric.WithDescription("Total number of git operations"), + metric.WithUnit("{operations}")); err != nil { + return nil, errors.Wrap(err, "create operations_total counter") + } + + if m.requestTotal, err = meter.Int64Counter("cachew.git.requests_total", + metric.WithDescription("Total number of git HTTP requests by type"), + metric.WithUnit("{requests}")); err != nil { + return nil, errors.Wrap(err, "create requests_total counter") + } + + return m, nil +} + +// recordOperation records the duration and outcome of a git operation (clone, fetch, repack, snapshot). +func (m *gitMetrics) recordOperation(ctx context.Context, operation, status string, duration time.Duration) { + attrs := metric.WithAttributes( + attribute.String("operation", operation), + attribute.String("status", status), + ) + m.operationTotal.Add(ctx, 1, attrs) + m.operationDuration.Record(ctx, duration.Seconds(), attrs) +} + +func (m *gitMetrics) recordRequest(ctx context.Context, requestType string) { + m.requestTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("type", requestType))) +} diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 3e3013b..63be9c7 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -81,6 +81,7 @@ func (s *Strategy) cloneForSnapshot(ctx context.Context, repo *gitclone.Reposito func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository) error { logger := logging.FromContext(ctx) upstream := repo.UpstreamURL() + start := time.Now() logger.InfoContext(ctx, "Snapshot generation started", "upstream", upstream) @@ -126,9 +127,11 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone logger.WarnContext(ctx, "Failed to clean up snapshot dir", "error", rmErr) } if err != nil { + s.metrics.recordOperation(ctx, "snapshot", "error", time.Since(start)) return errors.Wrap(err, "create snapshot") } + s.metrics.recordOperation(ctx, "snapshot", "success", time.Since(start)) logger.InfoContext(ctx, "Snapshot generation completed", "upstream", upstream) return nil }