Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"

"github.com/alecthomas/errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/block/cachew/internal/logging"
)
Expand Down Expand Up @@ -80,6 +82,7 @@ type RootScheduler struct {
maxCloneConcurrency int
cancel context.CancelFunc
store ScheduleStore
metrics *schedulerMetrics
}

var _ Scheduler = &RootScheduler{}
Expand Down Expand Up @@ -111,11 +114,16 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
// Default: reserve at least half the workers for non-clone jobs.
maxClones = max(1, config.Concurrency/2)
}
m, err := newSchedulerMetrics()
if err != nil {
return nil, errors.Wrap(err, "create scheduler metrics")
}
q := &RootScheduler{
workAvailable: make(chan bool, 1024),
active: make(map[string]string),
maxCloneConcurrency: maxClones,
store: store,
metrics: m,
}
ctx, cancel := context.WithCancel(ctx)
q.cancel = cancel
Expand All @@ -141,8 +149,9 @@ func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler {

func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
q.lock.Lock()
defer q.lock.Unlock()
q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run})
q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue)))
q.lock.Unlock()
q.workAvailable <- true
}

Expand Down Expand Up @@ -201,26 +210,53 @@ func (q *RootScheduler) worker(ctx context.Context, id int) {
if !ok {
continue
}
jobAttrs := attribute.String("job.type", jobType(job.id))
start := time.Now()
logger.InfoContext(ctx, "Starting job", "job", job)
if err := job.run(ctx); err != nil {
logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", time.Since(start))
err := job.run(ctx)
elapsed := time.Since(start)
status := "success"
if err != nil {
status = "error"
logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", elapsed)
} else {
logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", time.Since(start))
logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", elapsed)
}
statusAttr := attribute.String("status", status)
q.metrics.jobsTotal.Add(ctx, 1, metric.WithAttributes(jobAttrs, statusAttr))
q.metrics.jobDuration.Record(ctx, elapsed.Seconds(), metric.WithAttributes(jobAttrs, statusAttr))
q.markQueueInactive(job.queue)
q.workAvailable <- true
}
}
}

// jobType extracts a normalised job type from the job ID for metric labels.
func jobType(id string) string {
switch {
case strings.HasSuffix(id, "clone"):
return "clone"
case strings.HasSuffix(id, "deferred-mirror-restore"):
return "clone"
case strings.HasSuffix(id, "fetch"):
return "fetch"
case strings.HasSuffix(id, "snapshot-periodic"), strings.HasSuffix(id, "mirror-snapshot-periodic"):
return "snapshot"
case strings.HasSuffix(id, "repack-periodic"):
return "repack"
default:
return "other"
}
}

func (q *RootScheduler) markQueueInactive(queue string) {
q.lock.Lock()
defer q.lock.Unlock()
if isCloneJob(q.active[queue]) {
q.activeClones--
}
delete(q.active, queue)
q.recordGaugesLocked()
}

// isCloneJob returns true for job IDs that represent long-running clone operations
Expand All @@ -246,7 +282,16 @@ func (q *RootScheduler) takeNextJob() (queueJob, bool) {
if isCloneJob(job.id) {
q.activeClones++
}
q.recordGaugesLocked()
return job, true
}
return queueJob{}, false
}

// recordGaugesLocked updates gauge metrics. Must be called with q.lock held.
func (q *RootScheduler) recordGaugesLocked() {
ctx := context.Background()
q.metrics.queueDepth.Record(ctx, int64(len(q.queue)))
q.metrics.activeWorkers.Record(ctx, int64(len(q.active)))
q.metrics.activeClones.Record(ctx, int64(q.activeClones))
}
53 changes: 53 additions & 0 deletions internal/jobscheduler/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package jobscheduler

import (
"github.com/alecthomas/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

type schedulerMetrics struct {
queueDepth metric.Int64Gauge
activeWorkers metric.Int64Gauge
activeClones metric.Int64Gauge
jobsTotal metric.Int64Counter
jobDuration metric.Float64Histogram
}

func newSchedulerMetrics() (*schedulerMetrics, error) {
meter := otel.Meter("cachew.scheduler")
m := &schedulerMetrics{}
var err error

if m.queueDepth, err = meter.Int64Gauge("cachew.scheduler.queue_depth",
metric.WithDescription("Number of jobs waiting in the scheduler queue"),
metric.WithUnit("{jobs}")); err != nil {
return nil, errors.Wrap(err, "create queue_depth gauge")
}

if m.activeWorkers, err = meter.Int64Gauge("cachew.scheduler.active_workers",
metric.WithDescription("Number of workers currently executing jobs"),
metric.WithUnit("{workers}")); err != nil {
return nil, errors.Wrap(err, "create active_workers gauge")
}

if m.activeClones, err = meter.Int64Gauge("cachew.scheduler.active_clones",
metric.WithDescription("Number of clone jobs currently executing"),
metric.WithUnit("{jobs}")); err != nil {
return nil, errors.Wrap(err, "create active_clones gauge")
}

if m.jobsTotal, err = meter.Int64Counter("cachew.scheduler.jobs_total",
metric.WithDescription("Total number of completed scheduler jobs"),
metric.WithUnit("{jobs}")); err != nil {
return nil, errors.Wrap(err, "create jobs_total counter")
}

if m.jobDuration, err = meter.Float64Histogram("cachew.scheduler.job_duration_seconds",
metric.WithDescription("Duration of scheduler jobs in seconds"),
metric.WithUnit("s")); err != nil {
return nil, errors.Wrap(err, "create job_duration histogram")
}

return m, nil
}
98 changes: 60 additions & 38 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Strategy struct {
snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry
coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry
deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo
metrics *gitMetrics
}

func New(
Expand Down Expand Up @@ -108,6 +109,11 @@ func New(
return nil, errors.Wrap(err, "failed to create scheduler")
}

m, err := newGitMetrics()
if err != nil {
return nil, errors.Wrap(err, "create git metrics")
}

s := &Strategy{
config: config,
cache: cache,
Expand All @@ -117,47 +123,11 @@ func New(
scheduler: scheduler.WithQueuePrefix("git"),
spools: make(map[string]*RepoSpools),
tokenManager: tokenManager,
metrics: m,
}
s.config.ServerURL = strings.TrimRight(config.ServerURL, "/")

existing, err := s.cloneManager.DiscoverExisting(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to discover existing clones", "error", err)
}
for _, repo := range existing {
logger.InfoContext(ctx, "Running startup fetch for existing repo", "upstream", repo.UpstreamURL())

preRefs, err := repo.GetLocalRefs(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to get pre-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
"error", err)
}

start := time.Now()
if err := repo.FetchLenient(ctx, gitclone.CloneTimeout); err != nil {
logger.ErrorContext(ctx, "Startup fetch failed for existing repo", "upstream", repo.UpstreamURL(), "error", err,
"duration", time.Since(start))
continue
}
logger.InfoContext(ctx, "Startup fetch completed for existing repo", "upstream", repo.UpstreamURL(),
"duration", time.Since(start))

postRefs, err := repo.GetLocalRefs(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to get post-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
"error", err)
} else {
maps.DeleteFunc(postRefs, func(k, v string) bool { return preRefs[k] == v })
logger.InfoContext(ctx, "Post-fetch changed refs for existing repo", "upstream", repo.UpstreamURL(), "refs", postRefs)
}

if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
if s.config.RepackInterval > 0 {
s.scheduleRepackJobs(repo)
}
}
s.warmExistingRepos(ctx)

s.proxy = &httputil.ReverseProxy{
Director: func(req *http.Request) {
Expand Down Expand Up @@ -198,6 +168,48 @@ func New(

var _ strategy.Strategy = (*Strategy)(nil)

func (s *Strategy) warmExistingRepos(ctx context.Context) {
logger := logging.FromContext(ctx)
existing, err := s.cloneManager.DiscoverExisting(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to discover existing clones", "error", err)
}
for _, repo := range existing {
logger.InfoContext(ctx, "Running startup fetch for existing repo", "upstream", repo.UpstreamURL())

preRefs, err := repo.GetLocalRefs(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to get pre-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
"error", err)
}

start := time.Now()
if err := repo.FetchLenient(ctx, gitclone.CloneTimeout); err != nil {
logger.ErrorContext(ctx, "Startup fetch failed for existing repo", "upstream", repo.UpstreamURL(), "error", err,
"duration", time.Since(start))
continue
}
logger.InfoContext(ctx, "Startup fetch completed for existing repo", "upstream", repo.UpstreamURL(),
"duration", time.Since(start))

postRefs, err := repo.GetLocalRefs(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to get post-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
"error", err)
} else {
maps.DeleteFunc(postRefs, func(k, v string) bool { return preRefs[k] == v })
logger.InfoContext(ctx, "Post-fetch changed refs for existing repo", "upstream", repo.UpstreamURL(), "refs", postRefs)
}

if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
if s.config.RepackInterval > 0 {
s.scheduleRepackJobs(repo)
}
}
}

// SetHTTPTransport overrides the HTTP transport used for upstream requests.
// This is intended for testing.
func (s *Strategy) SetHTTPTransport(t http.RoundTripper) {
Expand All @@ -217,11 +229,13 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
logger.DebugContext(ctx, "Git request", "method", r.Method, "host", host, "path", pathValue)

if strings.HasSuffix(pathValue, "/snapshot.tar.zst") {
s.metrics.recordRequest(ctx, "snapshot")
s.handleSnapshotRequest(w, r, host, pathValue)
return
}

if strings.HasSuffix(pathValue, "/snapshot.bundle") {
s.metrics.recordRequest(ctx, "bundle")
s.handleBundleRequest(w, r, host, pathValue)
return
}
Expand All @@ -230,11 +244,14 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack")

if isReceivePack {
s.metrics.recordRequest(ctx, "receive-pack")
logger.DebugContext(ctx, "Forwarding write operation to upstream")
s.forwardToUpstream(w, r, host, pathValue)
return
}

s.metrics.recordRequest(ctx, "upload-pack")

repoPath := ExtractRepoPath(pathValue)
upstreamURL := "https://" + host + "/" + repoPath

Expand Down Expand Up @@ -514,6 +531,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {

logger.InfoContext(ctx, "Starting clone", "upstream", upstream, "path", repo.Path())

cloneStart := time.Now()
err := repo.Clone(ctx)

// Clean up spools regardless of clone success or failure, so that subsequent
Expand All @@ -523,11 +541,13 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
}

if err != nil {
s.metrics.recordOperation(ctx, "clone", "error", time.Since(cloneStart))
logger.ErrorContext(ctx, "Clone failed", "upstream", upstream, "error", err)
repo.ResetToEmpty()
return
}

s.metrics.recordOperation(ctx, "clone", "success", time.Since(cloneStart))
logger.InfoContext(ctx, "Clone completed", "upstream", upstream, "path", repo.Path())

if s.config.SnapshotInterval > 0 {
Expand Down Expand Up @@ -601,9 +621,11 @@ func (s *Strategy) doFetch(ctx context.Context, repo *gitclone.Repository) error

start := time.Now()
if err := repo.Fetch(ctx); err != nil {
s.metrics.recordOperation(ctx, "fetch", "error", time.Since(start))
logger.ErrorContext(ctx, "Fetch failed", "upstream", repo.UpstreamURL(), "duration", time.Since(start), "error", err)
return errors.Errorf("fetch failed: %w", err)
}
s.metrics.recordOperation(ctx, "fetch", "success", time.Since(start))
logger.InfoContext(ctx, "Fetch completed", "upstream", repo.UpstreamURL(), "duration", time.Since(start))
return nil
}
Loading