From c814c8d636accb0ce0c964b5ba08847ea1a117c3 Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Fri, 20 Mar 2026 21:07:37 +0100 Subject: [PATCH] feat: background workers = non-HTTP workers with shared state refactor: address review feedback on background workers - Use `name` instead of `match` for background worker identification - Combine start + wait + lock + copy + unlock into single CGo call (go_frankenphp_worker_get_vars replaces three separate exports) - Remove lockedVarsStacks, InitLockedVarsStacks, and varsVersion - Set FRANKENPHP_WORKER_NAME for all workers (HTTP and background) - Split worker_bg_name into worker_name + is_background_worker flag - Rename httpEnabled to isBackgroundWorker on Go side - Remove name validation regex (same rules as HTTP workers) - Keep $_SERVER['argv'] for background workers (bin/console compat) Add generational cache back Review by henderkes --- background_worker.go | 383 +++++++++ background_worker_test.go | 123 +++ caddy/app.go | 32 + caddy/module.go | 53 ++ caddy/workerconfig.go | 32 +- context.go | 15 +- docs/background-workers.md | 225 +++++ frankenphp.c | 772 +++++++++++++++++- frankenphp.go | 15 + frankenphp.h | 14 +- frankenphp.stub.php | 17 +- frankenphp_arginfo.h | 18 + frankenphp_test.go | 294 +++++++ options.go | 38 +- phpmainthread.go | 4 + phpthread.go | 7 + requestoptions.go | 8 + .../background-worker-binary-entrypoint.php | 12 + testdata/background-worker-binary-safe.php | 23 + testdata/background-worker-crash-starter.php | 10 + testdata/background-worker-crash.php | 18 + testdata/background-worker-dedup.php | 10 + ...kground-worker-enum-missing-entrypoint.php | 13 + testdata/background-worker-enum-missing.php | 14 + testdata/background-worker-helper.php | 14 + testdata/background-worker-identity.php | 11 + .../background-worker-multi-entrypoint-a.php | 8 + .../background-worker-multi-entrypoint-b.php | 8 + .../background-worker-multi-entrypoint.php | 10 + testdata/background-worker-multi-file.php | 8 + testdata/background-worker-multi.php | 17 + testdata/background-worker-no-entrypoint.php | 10 + .../background-worker-restart-entrypoint.php | 19 + testdata/background-worker-restart.php | 7 + ...round-worker-set-server-var-validation.php | 23 + testdata/background-worker-start-twice.php | 10 + testdata/background-worker-start.php | 10 + .../background-worker-stop-fd-entrypoint.php | 18 + ...d-worker-stop-fd-non-background-worker.php | 10 + testdata/background-worker-stop-fd.php | 6 + ...ound-worker-type-validation-entrypoint.php | 63 ++ .../background-worker-type-validation.php | 23 + testdata/background-worker-with-argv.php | 12 + threadworker.go | 32 +- worker.go | 104 ++- 45 files changed, 2537 insertions(+), 36 deletions(-) create mode 100644 background_worker.go create mode 100644 background_worker_test.go create mode 100644 docs/background-workers.md create mode 100644 testdata/background-worker-binary-entrypoint.php create mode 100644 testdata/background-worker-binary-safe.php create mode 100644 testdata/background-worker-crash-starter.php create mode 100644 testdata/background-worker-crash.php create mode 100644 testdata/background-worker-dedup.php create mode 100644 testdata/background-worker-enum-missing-entrypoint.php create mode 100644 testdata/background-worker-enum-missing.php create mode 100644 testdata/background-worker-helper.php create mode 100644 testdata/background-worker-identity.php create mode 100644 testdata/background-worker-multi-entrypoint-a.php create mode 100644 testdata/background-worker-multi-entrypoint-b.php create mode 100644 testdata/background-worker-multi-entrypoint.php create mode 100644 testdata/background-worker-multi-file.php create mode 100644 testdata/background-worker-multi.php create mode 100644 testdata/background-worker-no-entrypoint.php create mode 100644 testdata/background-worker-restart-entrypoint.php create mode 100644 testdata/background-worker-restart.php create mode 100644 testdata/background-worker-set-server-var-validation.php create mode 100644 testdata/background-worker-start-twice.php create mode 100644 testdata/background-worker-start.php create mode 100644 testdata/background-worker-stop-fd-entrypoint.php create mode 100644 testdata/background-worker-stop-fd-non-background-worker.php create mode 100644 testdata/background-worker-stop-fd.php create mode 100644 testdata/background-worker-type-validation-entrypoint.php create mode 100644 testdata/background-worker-type-validation.php create mode 100644 testdata/background-worker-with-argv.php diff --git a/background_worker.go b/background_worker.go new file mode 100644 index 0000000000..6e10572a81 --- /dev/null +++ b/background_worker.go @@ -0,0 +1,383 @@ +package frankenphp + +// #include +// #include "frankenphp.h" +import "C" +import ( + "fmt" + "log/slog" + "sync" + "sync/atomic" + "time" + "unsafe" +) + +// BackgroundWorkerLookup maps worker names to registries, enabling multiple entrypoint files. +type BackgroundWorkerLookup struct { + byName map[string]*BackgroundWorkerRegistry + catchAll *BackgroundWorkerRegistry +} + +func NewBackgroundWorkerLookup() *BackgroundWorkerLookup { + return &BackgroundWorkerLookup{ + byName: make(map[string]*BackgroundWorkerRegistry), + } +} + +// NewBackgroundWorkerLookupWithCatchAll is a convenience constructor for tests. +func NewBackgroundWorkerLookupWithCatchAll(entrypoint string) *BackgroundWorkerLookup { + l := NewBackgroundWorkerLookup() + l.catchAll = NewBackgroundWorkerRegistry(entrypoint) + return l +} + +func (l *BackgroundWorkerLookup) AddNamed(name string, registry *BackgroundWorkerRegistry) { + l.byName[name] = registry +} + +func (l *BackgroundWorkerLookup) SetCatchAll(registry *BackgroundWorkerRegistry) { + l.catchAll = registry +} + +// Resolve returns the registry for the given name, falling back to catch-all. +func (l *BackgroundWorkerLookup) Resolve(name string) *BackgroundWorkerRegistry { + if r, ok := l.byName[name]; ok { + return r + } + return l.catchAll +} + +// StartAutoWorkers iterates unique registries and calls startAutoWorkers on each. +func (l *BackgroundWorkerLookup) StartAutoWorkers() error { + seen := make(map[*BackgroundWorkerRegistry]struct{}) + for _, r := range l.byName { + if _, ok := seen[r]; ok { + continue + } + seen[r] = struct{}{} + if err := r.startAutoWorkers(); err != nil { + return err + } + } + if l.catchAll != nil { + if _, ok := seen[l.catchAll]; !ok { + if err := l.catchAll.startAutoWorkers(); err != nil { + return err + } + } + } + return nil +} + +type backgroundWorkerState struct { + varsPtr unsafe.Pointer // *C.HashTable, persistent, managed by C + mu sync.RWMutex + varsVersion atomic.Uint64 // incremented on each set_vars call + ready chan struct{} + readyOnce sync.Once +} + +type BackgroundWorkerRegistry struct { + entrypoint string + num int // threads per background worker (0 = lazy-start with 1 thread) + maxWorkers int // max lazy-started instances (0 = unlimited) + autoStartNames []string // names to start at boot when num >= 1 + mu sync.Mutex + workers map[string]*backgroundWorkerState +} + +func NewBackgroundWorkerRegistry(entrypoint string) *BackgroundWorkerRegistry { + return &BackgroundWorkerRegistry{ + entrypoint: entrypoint, + workers: make(map[string]*backgroundWorkerState), + } +} + +func (registry *BackgroundWorkerRegistry) Entrypoint() string { + return registry.entrypoint +} + +func (registry *BackgroundWorkerRegistry) Num() int { + if registry.num <= 0 { + return 0 + } + return registry.num +} + +func (registry *BackgroundWorkerRegistry) MaxThreads() int { + if registry.num > 0 { + return registry.num + } + return 1 +} + +func (registry *BackgroundWorkerRegistry) SetNum(num int) { + registry.num = num +} + +func (registry *BackgroundWorkerRegistry) AddAutoStartNames(names ...string) { + registry.autoStartNames = append(registry.autoStartNames, names...) +} + +func (registry *BackgroundWorkerRegistry) SetMaxWorkers(max int) { + registry.maxWorkers = max +} + +func (registry *BackgroundWorkerRegistry) reserve(name string) (*backgroundWorkerState, bool, error) { + registry.mu.Lock() + defer registry.mu.Unlock() + + if bgw := registry.workers[name]; bgw != nil { + return bgw, true, nil + } + + if registry.maxWorkers > 0 && len(registry.workers) >= registry.maxWorkers { + return nil, false, fmt.Errorf("cannot start background worker %q: limit of %d reached - increase max_threads on the catch-all background worker or declare it as a named worker", name, registry.maxWorkers) + } + + bgw := &backgroundWorkerState{ + ready: make(chan struct{}), + } + registry.workers[name] = bgw + + return bgw, false, nil +} + +func (registry *BackgroundWorkerRegistry) remove(name string, bgw *backgroundWorkerState) { + registry.mu.Lock() + defer registry.mu.Unlock() + + if registry.workers[name] == bgw { + delete(registry.workers, name) + } +} + +// startAutoBackgroundWorkers finds all lookups with auto-start names and starts them. +// Called after initWorkers to auto-start named background workers with num >= 1. +func startAutoBackgroundWorkers(workerOpts []workerOpt) error { + seen := make(map[*BackgroundWorkerLookup]struct{}) + for _, w := range workerOpts { + if w.backgroundLookup != nil { + if _, ok := seen[w.backgroundLookup]; !ok { + seen[w.backgroundLookup] = struct{}{} + if err := w.backgroundLookup.StartAutoWorkers(); err != nil { + return err + } + } + } + } + return nil +} + +// startAutoWorkers starts all background workers configured with match names. +func (registry *BackgroundWorkerRegistry) startAutoWorkers() error { + for _, name := range registry.autoStartNames { + if err := startBackgroundWorkerWithRegistry(registry, name); err != nil { + return fmt.Errorf("failed to auto-start background worker %q: %w", name, err) + } + } + return nil +} + +func startBackgroundWorker(thread *phpThread, bgWorkerName string) error { + if bgWorkerName == "" { + return fmt.Errorf("background worker name must not be empty") + } + + lookup := getLookup(thread) + if lookup == nil { + return fmt.Errorf("no background worker configured in this php_server") + } + + registry := lookup.Resolve(bgWorkerName) + if registry == nil || registry.entrypoint == "" { + return fmt.Errorf("no background worker configured in this php_server") + } + + return startBackgroundWorkerWithRegistry(registry, bgWorkerName) +} + +func startBackgroundWorkerWithRegistry(registry *BackgroundWorkerRegistry, bgWorkerName string) error { + bgw, exists, err := registry.reserve(bgWorkerName) + if err != nil { + return err + } + if exists { + return nil + } + + numThreads := registry.MaxThreads() + + worker, err := newWorker(workerOpt{ + name: bgWorkerName, + fileName: registry.entrypoint, + num: numThreads, + env: PrepareEnv(nil), + watch: []string{}, + maxConsecutiveFailures: -1, + }) + if err != nil { + registry.remove(bgWorkerName, bgw) + + return fmt.Errorf("failed to create background worker: %w", err) + } + + worker.isBackgroundWorker = true + worker.backgroundWorker = bgw + worker.backgroundRegistry = registry + + for i := 0; i < numThreads; i++ { + bgWorkerThread := getInactivePHPThread() + if bgWorkerThread == nil { + if i == 0 { + registry.remove(bgWorkerName, bgw) + } + + return fmt.Errorf("no available PHP thread for background worker (increase max_threads)") + } + + scalingMu.Lock() + workers = append(workers, worker) + scalingMu.Unlock() + + convertToWorkerThread(bgWorkerThread, worker) + } + + if globalLogger.Enabled(globalCtx, slog.LevelInfo) { + globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "background worker started", slog.String("name", bgWorkerName), slog.Int("threads", numThreads)) + } + + return nil +} + +func getLookup(thread *phpThread) *BackgroundWorkerLookup { + if handler, ok := thread.handler.(*workerThread); ok && handler.worker.backgroundLookup != nil { + return handler.worker.backgroundLookup + } + if fc, ok := fromContext(thread.context()); ok { + return fc.backgroundLookup + } + + return nil +} + +// go_frankenphp_worker_get_vars starts background workers if needed, waits for them +// to be ready, takes read locks, copies vars via C helper, and releases locks. +// All locking/unlocking happens within this single Go call. +// +// callerVersions/outVersions: if callerVersions is non-nil and all versions match, +// the copy is skipped entirely (returns 1). outVersions receives current versions. +// +//export go_frankenphp_worker_get_vars +func go_frankenphp_worker_get_vars(threadIndex C.uintptr_t, names **C.char, nameLens *C.size_t, nameCount C.int, timeoutMs C.int, returnValue *C.zval, callerVersions *C.uint64_t, outVersions *C.uint64_t) *C.char { + thread := phpThreads[threadIndex] + lookup := getLookup(thread) + if lookup == nil { + return C.CString("no background worker configured in this php_server") + } + + n := int(nameCount) + nameSlice := unsafe.Slice(names, n) + nameLenSlice := unsafe.Slice(nameLens, n) + + sks := make([]*backgroundWorkerState, n) + goNames := make([]string, n) + for i := 0; i < n; i++ { + goNames[i] = C.GoStringN(nameSlice[i], C.int(nameLenSlice[i])) + + // Start background worker if not already running + if err := startBackgroundWorker(thread, goNames[i]); err != nil { + return C.CString(err.Error()) + } + + registry := lookup.Resolve(goNames[i]) + if registry == nil { + return C.CString("background worker not found: " + goNames[i]) + } + registry.mu.Lock() + sks[i] = registry.workers[goNames[i]] + registry.mu.Unlock() + if sks[i] == nil { + return C.CString("background worker not found: " + goNames[i]) + } + } + + // Wait for all workers to be ready + timeout := time.Duration(timeoutMs) * time.Millisecond + timer := time.NewTimer(timeout) + defer timer.Stop() + for i, sk := range sks { + select { + case <-sk.ready: + // background worker has called set_vars + case <-timer.C: + return C.CString(fmt.Sprintf("timeout waiting for background worker: %s", goNames[i])) + } + } + + // Fast path: if all caller versions match, skip lock + copy entirely. + // Read each version once and write to outVersions for the C side to compare. + if callerVersions != nil && outVersions != nil { + callerVSlice := unsafe.Slice(callerVersions, n) + outVSlice := unsafe.Slice(outVersions, n) + allMatch := true + for i, sk := range sks { + v := sk.varsVersion.Load() + outVSlice[i] = C.uint64_t(v) + if uint64(callerVSlice[i]) != v { + allMatch = false + } + } + if allMatch { + return nil // C side sees out == caller, uses cached value + } + } + + // Take all read locks, collect pointers, copy via C helper, then release + ptrs := make([]unsafe.Pointer, n) + for i, sk := range sks { + sk.mu.RLock() + ptrs[i] = sk.varsPtr + } + + C.frankenphp_worker_copy_vars(returnValue, C.int(n), names, nameLens, (*unsafe.Pointer)(unsafe.Pointer(&ptrs[0]))) + + // Write versions while locks are still held + if outVersions != nil { + outVSlice := unsafe.Slice(outVersions, n) + for i, sk := range sks { + outVSlice[i] = C.uint64_t(sk.varsVersion.Load()) + } + } + + for _, sk := range sks { + sk.mu.RUnlock() + } + + return nil +} + +//export go_frankenphp_worker_set_vars +func go_frankenphp_worker_set_vars(threadIndex C.uintptr_t, varsPtr unsafe.Pointer, oldPtr *unsafe.Pointer) *C.char { + thread := phpThreads[threadIndex] + + handler, ok := thread.handler.(*workerThread) + if !ok || !handler.worker.isBackgroundWorker || handler.worker.backgroundWorker == nil { + return C.CString("frankenphp_worker_set_vars() can only be called from a background worker") + } + + sk := handler.worker.backgroundWorker + + sk.mu.Lock() + *oldPtr = sk.varsPtr + sk.varsPtr = varsPtr + sk.varsVersion.Add(1) + sk.mu.Unlock() + + sk.readyOnce.Do(func() { + handler.markBackgroundReady() + close(sk.ready) + }) + + return nil +} diff --git a/background_worker_test.go b/background_worker_test.go new file mode 100644 index 0000000000..b54b19515d --- /dev/null +++ b/background_worker_test.go @@ -0,0 +1,123 @@ +package frankenphp + +import ( + "testing" + "time" + + "github.com/dunglas/frankenphp/internal/state" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type backgroundWorkerTestMetrics struct { + readyCalls int + stopCalls []StopReason +} + +func (m *backgroundWorkerTestMetrics) StartWorker(string) {} + +func (m *backgroundWorkerTestMetrics) ReadyWorker(string) { + m.readyCalls++ +} + +func (m *backgroundWorkerTestMetrics) StopWorker(_ string, reason StopReason) { + m.stopCalls = append(m.stopCalls, reason) +} + +func (m *backgroundWorkerTestMetrics) TotalWorkers(string, int) {} + +func (m *backgroundWorkerTestMetrics) TotalThreads(int) {} + +func (m *backgroundWorkerTestMetrics) StartRequest() {} + +func (m *backgroundWorkerTestMetrics) StopRequest() {} + +func (m *backgroundWorkerTestMetrics) StopWorkerRequest(string, time.Duration) {} + +func (m *backgroundWorkerTestMetrics) StartWorkerRequest(string) {} + +func (m *backgroundWorkerTestMetrics) Shutdown() {} + +func (m *backgroundWorkerTestMetrics) QueuedWorkerRequest(string) {} + +func (m *backgroundWorkerTestMetrics) DequeuedWorkerRequest(string) {} + +func (m *backgroundWorkerTestMetrics) QueuedRequest() {} + +func (m *backgroundWorkerTestMetrics) DequeuedRequest() {} + +func TestStartBackgroundWorkerFailureIsRetryable(t *testing.T) { + lookup := NewBackgroundWorkerLookupWithCatchAll(testDataPath + "/background-worker-with-argv.php") + thread := newPHPThread(0) + thread.state.Set(state.Ready) + thread.handler = &workerThread{ + thread: thread, + worker: &worker{backgroundLookup: lookup}, + } + phpThreads = []*phpThread{thread} + t.Cleanup(func() { + phpThreads = nil + }) + + registry := lookup.Resolve("retryable-background-worker") + + err := startBackgroundWorker(thread, "retryable-background-worker") + require.EqualError(t, err, "no available PHP thread for background worker (increase max_threads)") + assert.Empty(t, registry.workers) + + err = startBackgroundWorker(thread, "retryable-background-worker") + require.EqualError(t, err, "no available PHP thread for background worker (increase max_threads)") + assert.Empty(t, registry.workers) +} + +func TestBackgroundWorkerSetVarsMarksWorkerReady(t *testing.T) { + originalMetrics := metrics + testMetrics := &backgroundWorkerTestMetrics{} + metrics = testMetrics + t.Cleanup(func() { + metrics = originalMetrics + }) + + handler := &workerThread{ + thread: newPHPThread(0), + worker: &worker{name: "background-worker", fileName: "background-worker.php", maxConsecutiveFailures: -1}, + isBootingScript: true, + } + + handler.markBackgroundReady() + handler.markBackgroundReady() + + assert.False(t, handler.isBootingScript) + assert.Equal(t, 0, handler.failureCount) + assert.Equal(t, 1, testMetrics.readyCalls) +} + +func TestBackgroundWorkerBootFailureStaysBootFailureUntilReady(t *testing.T) { + originalMetrics := metrics + testMetrics := &backgroundWorkerTestMetrics{} + metrics = testMetrics + t.Cleanup(func() { + metrics = originalMetrics + }) + + handler := &workerThread{ + thread: newPHPThread(0), + worker: &worker{ + name: "background-worker", + fileName: "background-worker.php", + maxConsecutiveFailures: -1, + }, + isBootingScript: true, + } + + tearDownWorkerScript(handler, 1) + require.Len(t, testMetrics.stopCalls, 1) + assert.Equal(t, StopReason(StopReasonBootFailure), testMetrics.stopCalls[0]) + + testMetrics.stopCalls = nil + handler.isBootingScript = true + handler.markBackgroundReady() + tearDownWorkerScript(handler, 1) + require.Len(t, testMetrics.stopCalls, 1) + assert.Equal(t, StopReason(StopReasonCrash), testMetrics.stopCalls[0]) +} diff --git a/caddy/app.go b/caddy/app.go index 9242d870c6..10f1545194 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -20,6 +20,9 @@ import ( "github.com/dunglas/frankenphp/internal/fastabs" ) +// defaultMaxBackgroundWorkers is the default safety cap for catch-all background workers. +const defaultMaxBackgroundWorkers = 16 + var ( options []frankenphp.Option optionsMU sync.RWMutex @@ -155,13 +158,42 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithMaxIdleTime(f.MaxIdleTime), ) + // Reserve threads for background workers outside the HTTP scaling budget + reservedThreads := 0 for _, w := range f.Workers { + if !w.Background { + continue + } + if w.Name != "" { + // Named: reserve num threads (0 = lazy-start, no reservation) + reservedThreads += w.Num + } else { + // Catch-all: reserve up to the safety cap (default 16) + maxW := w.MaxThreads + if maxW <= 0 { + maxW = defaultMaxBackgroundWorkers + } + reservedThreads += maxW + } + } + if reservedThreads > 0 { + f.opts = append(f.opts, frankenphp.WithReservedThreads(reservedThreads)) + } + + for _, w := range f.Workers { + // Background workers are not HTTP workers - they are started lazily + // via get_vars/task_send, not registered as part of the HTTP worker pool + if w.Background { + continue + } + w.options = append(w.options, frankenphp.WithWorkerEnv(w.Env), frankenphp.WithWorkerWatchMode(w.Watch), frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures), frankenphp.WithWorkerMaxThreads(w.MaxThreads), frankenphp.WithWorkerRequestOptions(w.requestOptions...), + frankenphp.WithWorkerBackgroundLookup(w.backgroundLookup), ) f.opts = append(f.opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, w.options...)) diff --git a/caddy/module.go b/caddy/module.go index 2241e216e2..c9f43cb8ca 100644 --- a/caddy/module.go +++ b/caddy/module.go @@ -49,6 +49,7 @@ type FrankenPHPModule struct { resolvedDocumentRoot string preparedEnv frankenphp.PreparedEnv preparedEnvNeedsReplacement bool + backgroundLookup *frankenphp.BackgroundWorkerLookup logger *slog.Logger requestOptions []frankenphp.RequestOption } @@ -78,6 +79,10 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error { f.assignMercureHub(ctx) + // Build background worker lookup with per-entrypoint registries. + var bgLookup *frankenphp.BackgroundWorkerLookup + registries := make(map[string]*frankenphp.BackgroundWorkerRegistry) + loggerOpt := frankenphp.WithRequestLogger(f.logger) for i, wc := range f.Workers { // make the file path absolute from the public directory @@ -91,10 +96,53 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error { wc.inheritEnv(f.Env) } + if wc.Background { + if bgLookup == nil { + bgLookup = frankenphp.NewBackgroundWorkerLookup() + } + + // Get or create a registry for this entrypoint + registry, ok := registries[wc.FileName] + if !ok { + registry = frankenphp.NewBackgroundWorkerRegistry(wc.FileName) + registries[wc.FileName] = registry + } + + if wc.Name != "" { + // Named background workers: auto-start only if num >= 1 + if wc.Num > 0 { + registry.AddAutoStartNames(wc.Name) + registry.SetNum(wc.Num) + } + // Map the name to this registry in the lookup + bgLookup.AddNamed(wc.Name, registry) + } else { + // Catch-all: set safety cap + maxW := wc.MaxThreads + if maxW <= 0 { + maxW = defaultMaxBackgroundWorkers + } + registry.SetMaxWorkers(maxW) + bgLookup.SetCatchAll(registry) + } + } + wc.requestOptions = append(wc.requestOptions, loggerOpt) f.Workers[i] = wc } + // Assign the background lookup to all non-background workers + // so they can start/access background workers via get_vars/task_send + if bgLookup != nil { + f.backgroundLookup = bgLookup + for i, wc := range f.Workers { + if !wc.Background && wc.backgroundLookup == nil { + wc.backgroundLookup = bgLookup + f.Workers[i] = wc + } + } + } + workers, err := fapp.addModuleWorkers(f.Workers...) if err != nil { return err @@ -241,6 +289,7 @@ func (f *FrankenPHPModule) ServeHTTP(w http.ResponseWriter, r *http.Request, _ c opts, frankenphp.WithOriginalRequest(new(ctx.Value(caddyhttp.OriginalRequestCtxKey).(http.Request))), frankenphp.WithWorkerName(workerName), + frankenphp.WithRequestBackgroundLookup(f.backgroundLookup), )..., ) @@ -317,8 +366,12 @@ func (f *FrankenPHPModule) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } // Check if a worker with this filename already exists in this module + // Background workers are excluded (they share filenames by design) fileNames := make(map[string]struct{}, len(f.Workers)) for _, w := range f.Workers { + if w.Background { + continue + } if _, ok := fileNames[w.FileName]; ok { return fmt.Errorf(`workers in a single "php" or "php_server" block must not have duplicate filenames: %q`, w.FileName) } diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index c50f0d0688..f45e7dd1ce 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -41,6 +41,9 @@ type workerConfig struct { MatchPath []string `json:"match_path,omitempty"` // MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick) MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"` + // Background marks this worker as a background worker (non-HTTP) + Background bool `json:"background,omitempty"` + backgroundLookup *frankenphp.BackgroundWorkerLookup options []frankenphp.WorkerOption requestOptions []frankenphp.RequestOption @@ -123,9 +126,13 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { wc.Watch = append(wc.Watch, patterns...) } case "match": - // provision the path so it's identical to Caddy match rules + if wc.Background { + return wc, d.Err(`"match" is not supported for background workers, use "name" instead`) + } + args := d.RemainingArgs() + // For HTTP workers, provision as Caddy path matchers // see: https://github.com/caddyserver/caddy/blob/master/modules/caddyhttp/matchers.go - caddyMatchPath := (caddyhttp.MatchPath)(d.RemainingArgs()) + caddyMatchPath := (caddyhttp.MatchPath)(args) if err := caddyMatchPath.Provision(caddy.Context{}); err != nil { return wc, d.WrapErr(err) } @@ -145,8 +152,10 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { } wc.MaxConsecutiveFailures = v + case "background": + wc.Background = true default: - return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads", v) + return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads, background", v) } } @@ -154,6 +163,18 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { return wc, d.Err(`the "file" argument must be specified`) } + if wc.Background { + // Validate background worker constraints + if wc.Num > 1 { + return wc, d.Err(`"num" > 1 is not yet supported for background workers`) + } + if wc.MaxThreads > 1 { + return wc, d.Err(`"max_threads" > 1 is not yet supported for background workers`) + } + // MaxConsecutiveFailures defaults to 6 (same as HTTP workers) + // via defaultMaxConsecutiveFailures in options.go + } + if frankenphp.EmbeddedAppPath != "" && filepath.IsLocal(wc.FileName) { wc.FileName = filepath.Join(frankenphp.EmbeddedAppPath, wc.FileName) } @@ -174,6 +195,11 @@ func (wc *workerConfig) inheritEnv(env map[string]string) { } func (wc *workerConfig) matchesPath(r *http.Request, documentRoot string) bool { + // background workers don't handle HTTP requests + if wc.Background { + return false + } + // try to match against a pattern if one is assigned if len(wc.MatchPath) != 0 { return (caddyhttp.MatchPath)(wc.MatchPath).Match(r) diff --git a/context.go b/context.go index 92f3b7471c..51063ee4c0 100644 --- a/context.go +++ b/context.go @@ -16,13 +16,14 @@ import ( type frankenPHPContext struct { mercureContext - documentRoot string - splitPath []string - env PreparedEnv - logger *slog.Logger - request *http.Request - originalRequest *http.Request - worker *worker + documentRoot string + splitPath []string + env PreparedEnv + logger *slog.Logger + request *http.Request + originalRequest *http.Request + worker *worker + backgroundLookup *BackgroundWorkerLookup docURI string pathInfo string diff --git a/docs/background-workers.md b/docs/background-workers.md new file mode 100644 index 0000000000..f445c53e9c --- /dev/null +++ b/docs/background-workers.md @@ -0,0 +1,225 @@ +# Background Workers + +Background workers are long-running PHP scripts that run outside the HTTP request cycle. +They observe their environment and publish configuration that HTTP [workers](worker.md) can read in real time. + +## How It Works + +1. A background worker runs its own event loop (subscribe to Redis, watch files, poll an API...) +2. It calls `frankenphp_worker_set_vars()` to publish a snapshot of key-value pairs +3. HTTP workers call `frankenphp_worker_get_vars()` to read the latest snapshot +4. The first `get_vars()` call blocks until the background worker has published - no startup race condition + +## Configuration + +Add `worker` directives with `background` to your [`php_server` or `php` block](config.md#caddyfile-config): + +```caddyfile +example.com { + php_server { + # Named background workers - lazy-started on first get_vars() + worker /app/bin/console { background; name config-watcher } + worker /app/bin/console { background; name feature-flags } + + # Catch-all - handles any unlisted name via get_vars() + worker /app/bin/console { background } + } +} +``` + +### Named vs catch-all + +- **Named** (with `name`): the worker name is a known identifier. Lazy-started on first `get_vars()` call. Multiple named workers can share the same entrypoint file. +- **Catch-all** (no `name`): also lazy-started on first `get_vars()`. Use `max_threads` to set a safety cap on how many can be created (defaults to 16). Not declaring a catch-all forbids unlisted names. + +### Config constraints + +- `num` and `max_threads` are accepted but capped at 1 for now (pooling is a future feature). Values > 1 are rejected with a clear error. +- `max_threads` on catch-all workers sets a safety cap for lazy-started instances (defaults to 16). +- `max_consecutive_failures` defaults to 6 (same as HTTP workers). +- `env` and `watch` work the same as HTTP workers. + +### Thread reservation + +Background workers get dedicated thread slots outside the global `max_threads` budget. +They don't compete with HTTP auto-scaling. For catch-all workers, `max_threads` determines +the reservation (default 16). Named workers with `num 0` (default) are lazy-started but +still reserve 1 thread (`max_threads` defaults to `max(num, 1)`). + +Each `php_server` block has its own isolated background worker scope. + +## PHP API + +### `frankenphp_worker_get_vars(string|array $name, float $timeout = 30.0): array` + +Starts a background worker (at-most-once) and returns its published variables. + +- First call blocks until the background worker calls `set_vars()` or the timeout expires +- Subsequent calls return the latest snapshot immediately +- When `$name` is an array, all background workers start in parallel and vars are returned keyed by name: + +```php +$redis = frankenphp_worker_get_vars('redis-watcher'); +// ['MASTER_HOST' => '10.0.0.1', 'MASTER_PORT' => '6379'] + +$all = frankenphp_worker_get_vars(['redis-watcher', 'feature-flags']); +// ['redis-watcher' => [...], 'feature-flags' => [...]] +``` + +- `$name` is available as `$_SERVER['FRANKENPHP_WORKER_NAME']` (set for all workers, HTTP and background) and `$_SERVER['argv'][1]` in background workers (for CLI compatibility with `bin/console`) +- Throws `RuntimeException` on timeout, missing entrypoint, or background worker crash +- Throws `LogicException` if the vars contain an enum class or case that cannot be resolved +- Works in both worker and non-worker mode +- **Per-request cache**: within a single HTTP request, repeated calls with the same name return the same array instance if the data hasn't changed. This means `===` comparisons are O(1) (pointer equality), and no lock or copy is needed on cache hit + +### `frankenphp_worker_set_vars(array $vars): void` + +Publishes a snapshot of key-value pairs from inside a background worker. +Each call **replaces** the entire snapshot atomically. + +Supported value types: `null`, `bool`, `int`, `float`, `string`, `array` (nested), and **enums**. +Objects, resources, and references are rejected. + +- Throws `RuntimeException` if not called from a background worker context +- Throws `ValueError` if values contain objects, resources, or references + +### `frankenphp_worker_get_signaling_stream(): resource` + +Returns a readable stream used for receiving signals from FrankenPHP. +Signals are newline-terminated strings: `"stop\n"` signals shutdown or restart. +Read with `fgets()` to identify the signal type. + +Use `stream_select()` instead of `sleep()` or `usleep()` to wait between iterations: + +```php +function background_worker_should_stop(float $timeout = 0): bool +{ + static $signalingStream; + $signalingStream ??= frankenphp_worker_get_signaling_stream(); + $s = (int) $timeout; + + return match (@stream_select(...[[$signalingStream], [], [], $s, (int) (($timeout - $s) * 1e6)])) { + 0 => false, // timeout + false => true, // error (pipe closed) = stop + default => "stop\n" === fgets($signalingStream), + }; +} + +do { + // ... do work, call set_vars() ... +} while (!background_worker_should_stop(5)); +``` + +> [!WARNING] +> Avoid using `sleep()` or `usleep()` in background workers. They block at the C level and cannot be interrupted. +> A background worker using `sleep(60)` would delay shutdown or worker restart by up to 60 seconds. +> Use `stream_select()` with the signaling stream instead - it wakes up immediately when FrankenPHP needs the thread to stop. + +- Throws `RuntimeException` if not called from a background worker context + +## Example + +### Background Worker Entrypoint + +```php + run_redis_watcher(), + default => throw new \RuntimeException("Unknown background worker: $command"), +}; + +function run_redis_watcher(): void +{ + $signalingStream = frankenphp_worker_get_signaling_stream(); + $sentinel = Amp\Redis\createRedisClient('tcp://sentinel-host:26379'); + + $subscription = $sentinel->subscribe('+switch-master'); + + Amp\async(function () use ($subscription) { + foreach ($subscription as $message) { + [$name, $oldIp, $oldPort, $newIp, $newPort] = explode(' ', $message); + frankenphp_worker_set_vars([ + 'MASTER_HOST' => $newIp, + 'MASTER_PORT' => (int) $newPort, + ]); + } + }); + + $master = $sentinel->rawCommand('SENTINEL', 'get-master-addr-by-name', 'mymaster'); + frankenphp_worker_set_vars([ + 'MASTER_HOST' => $master[0], + 'MASTER_PORT' => (int) $master[1], + ]); + + Amp\EventLoop::onReadable($signalingStream, function ($id) use ($signalingStream) { + if ("stop\n" === fgets($signalingStream)) { + Amp\EventLoop::cancel($id); + } + }); + Amp\EventLoop::run(); +} +``` + +### HTTP Worker + +```php +boot(); + +while (frankenphp_handle_request(function () use ($app) { + $redis = frankenphp_worker_get_vars('redis-watcher'); + + $app->handle($_GET, $_POST, $_COOKIE, $_FILES, $_SERVER + $redis); +})) { + gc_collect_cycles(); +} +``` + +### Graceful Degradation + +```php +if (function_exists('frankenphp_worker_get_vars')) { + $config = frankenphp_worker_get_vars('config-watcher'); +} else { + $config = ['MASTER_HOST' => getenv('REDIS_HOST') ?: '127.0.0.1']; +} +``` + +## Runtime Behavior + +- Background workers get their own dedicated thread from the reserved pool; they don't reduce HTTP capacity +- Execution timeout is automatically disabled +- Shebangs (`#!/usr/bin/env php`) are silently skipped +- `SCRIPT_FILENAME` is set to the entrypoint's full path +- `$_SERVER['FRANKENPHP_WORKER_NAME']` contains the worker name (set for all workers, HTTP and background) +- `$_SERVER['FRANKENPHP_WORKER_BACKGROUND']` is `true` for background workers, `false` for HTTP workers +- Background workers also get `$_SERVER['argv']` = `[entrypoint, name]` for CLI compatibility +- Crash recovery: automatic restart with exponential backoff +- Graceful shutdown via `frankenphp_worker_get_signaling_stream()` and `stream_select()` +- Grace period: on restart/shutdown, background workers receive `"stop\n"` on the signaling stream and have 5 seconds to exit gracefully. Workers still blocked after 5 seconds are force-killed (Linux ZTS, Windows) or abandoned (macOS). +- Worker restarts stop and immediately restart background workers (same as HTTP workers) +- Use `error_log()` or `frankenphp_log()` for logging - avoid `echo` + +For advanced use cases (amphp, ReactPHP), the signaling stream can be registered directly +in the event loop - see `frankenphp_worker_get_signaling_stream()`. + +## Performance + +`get_vars` is designed to be called on every HTTP request with minimal overhead: + +- **Per-request cache**: repeated calls within the same request return the cached result. No Go call, no lock, no copy. `===` comparisons between cached results are O(1) (same `HashTable` pointer). +- **Generational check**: on cache miss, an atomic version counter is checked before taking any lock. If the data hasn't changed since the last request, the lock and copy are skipped entirely. +- **Immutable array zero-copy**: when opcache produces an `IS_ARRAY_IMMUTABLE` array (e.g. from a `const` or literal), both `set_vars` and `get_vars` skip allocation entirely - the pointer is shared directly. +- **Interned string optimization**: string keys and enum names that live in shared memory (`ZSTR_IS_INTERNED`) skip copy and free in all code paths. +- **Precomputed hashes**: when copying persistent arrays to request memory, string key hashes are preserved from the persistent copy. diff --git a/frankenphp.c b/frankenphp.c index c25a3505f3..b04170ca04 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -1,6 +1,7 @@ #include "frankenphp.h" #include #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #ifdef PHP_WIN32 #include +#include #else #include #endif @@ -85,8 +87,153 @@ HashTable *main_thread_env = NULL; __thread uintptr_t thread_index; __thread bool is_worker_thread = false; +__thread char *worker_name = NULL; +__thread bool is_background_worker = false; +__thread int worker_stop_fds[2] = {-1, -1}; +__thread php_stream *worker_signaling_stream = NULL; __thread HashTable *sandboxed_env = NULL; +/* Best-effort force-kill for stuck background workers after grace period. + * - Linux ZTS: arm PHP's per-thread timer -> "max execution time" fatal + * - Windows: CancelSynchronousIo + QueueUserAPC -> interrupts I/O and sleeps + * - macOS/other: no-op (threads abandoned, exit when blocking call returns) */ +static int force_kill_num_threads = 0; +#ifdef ZEND_MAX_EXECUTION_TIMERS +static timer_t *thread_php_timers = NULL; +static bool *thread_php_timer_saved = NULL; +#elif defined(PHP_WIN32) +static HANDLE *thread_handles = NULL; +static bool *thread_handle_saved = NULL; +static void CALLBACK frankenphp_noop_apc(ULONG_PTR param) { (void)param; } +#endif + +void frankenphp_init_force_kill(int num_threads) { + force_kill_num_threads = num_threads; +#ifdef ZEND_MAX_EXECUTION_TIMERS + thread_php_timers = calloc(num_threads, sizeof(timer_t)); + thread_php_timer_saved = calloc(num_threads, sizeof(bool)); +#elif defined(PHP_WIN32) + thread_handles = calloc(num_threads, sizeof(HANDLE)); + thread_handle_saved = calloc(num_threads, sizeof(bool)); +#endif +} + +void frankenphp_save_php_timer(uintptr_t idx) { + if (idx >= (uintptr_t)force_kill_num_threads) { + return; + } +#ifdef ZEND_MAX_EXECUTION_TIMERS + if (thread_php_timers && EG(pid)) { + thread_php_timers[idx] = EG(max_execution_timer_timer); + thread_php_timer_saved[idx] = true; + } +#elif defined(PHP_WIN32) + if (thread_handles) { + DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), + GetCurrentProcess(), &thread_handles[idx], 0, FALSE, + DUPLICATE_SAME_ACCESS); + thread_handle_saved[idx] = true; + } +#endif + (void)idx; +} + +void frankenphp_force_kill_thread(uintptr_t idx) { + if (idx >= (uintptr_t)force_kill_num_threads) { + return; + } +#ifdef ZEND_MAX_EXECUTION_TIMERS + if (thread_php_timers && thread_php_timer_saved[idx]) { + struct itimerspec its; + its.it_value.tv_sec = 0; + its.it_value.tv_nsec = 1; + its.it_interval.tv_sec = 0; + its.it_interval.tv_nsec = 0; + timer_settime(thread_php_timers[idx], 0, &its, NULL); + } +#elif defined(PHP_WIN32) + if (thread_handles && thread_handle_saved[idx]) { + CancelSynchronousIo(thread_handles[idx]); + QueueUserAPC((PAPCFUNC)frankenphp_noop_apc, thread_handles[idx], 0); + } +#endif + (void)idx; +} + +void frankenphp_destroy_force_kill(void) { +#ifdef ZEND_MAX_EXECUTION_TIMERS + if (thread_php_timers) { + free(thread_php_timers); + thread_php_timers = NULL; + } + if (thread_php_timer_saved) { + free(thread_php_timer_saved); + thread_php_timer_saved = NULL; + } +#elif defined(PHP_WIN32) + if (thread_handles) { + for (int i = 0; i < force_kill_num_threads; i++) { + if (thread_handle_saved && thread_handle_saved[i]) { + CloseHandle(thread_handles[i]); + } + } + free(thread_handles); + thread_handles = NULL; + } + if (thread_handle_saved) { + free(thread_handle_saved); + thread_handle_saved = NULL; + } +#endif + force_kill_num_threads = 0; +} + +/* Per-thread cache for get_vars results. + * Maps worker name (string) -> {version, cached_zval}. + * When the version matches, the cached zval is returned with a refcount bump, + * giving the same HashTable pointer -> === comparisons are O(1). */ +typedef struct { + uint64_t version; + zval value; +} bg_worker_vars_cache_entry; +__thread HashTable *worker_vars_cache = NULL; + +static void frankenphp_worker_close_stop_fds(void) { + if (worker_stop_fds[0] >= 0) { +#ifdef PHP_WIN32 + _close(worker_stop_fds[0]); +#else + close(worker_stop_fds[0]); +#endif + worker_stop_fds[0] = -1; + } + + if (worker_stop_fds[1] >= 0) { +#ifdef PHP_WIN32 + _close(worker_stop_fds[1]); +#else + close(worker_stop_fds[1]); +#endif + worker_stop_fds[1] = -1; + } +} + +static int frankenphp_worker_open_stop_pipe(void) { +#ifdef PHP_WIN32 + return _pipe(worker_stop_fds, 4096, _O_BINARY); +#else + return pipe(worker_stop_fds); +#endif +} + +static int frankenphp_worker_dup_fd(int fd) { +#ifdef PHP_WIN32 + return _dup(fd); +#else + return dup(fd); +#endif +} + void frankenphp_update_local_thread_context(bool is_worker) { is_worker_thread = is_worker; @@ -94,6 +241,76 @@ void frankenphp_update_local_thread_context(bool is_worker) { PG(ignore_user_abort) = is_worker ? 1 : original_user_abort_setting; } +static void bg_worker_vars_cache_dtor(zval *zv) { + bg_worker_vars_cache_entry *entry = Z_PTR_P(zv); + zval_ptr_dtor(&entry->value); + free(entry); +} + +static void bg_worker_vars_cache_reset(void) { + if (worker_vars_cache) { + zend_hash_destroy(worker_vars_cache); + free(worker_vars_cache); + worker_vars_cache = NULL; + } +} + +void frankenphp_set_worker_name(char *name, bool background) { + free(worker_name); + if (name) { + size_t len = strlen(name) + 1; + worker_name = malloc(len); + memcpy(worker_name, name, len); + } else { + worker_name = NULL; + } + is_background_worker = background; + if (!background) { + return; + } + worker_signaling_stream = NULL; + zend_unset_timeout(); + + /* Create a pipe for stop signaling */ + frankenphp_worker_close_stop_fds(); + if (frankenphp_worker_open_stop_pipe() != 0) { + worker_stop_fds[0] = -1; + worker_stop_fds[1] = -1; + } +} + +int frankenphp_worker_get_stop_fd_write(void) { return worker_stop_fds[1]; } + +static int bg_worker_pipe_write_impl(int fd, const char *buf, int len) { + if (fd < 0) { + return -1; + } + +#ifdef PHP_WIN32 + return _write(fd, buf, len); +#else + return (int)write(fd, buf, len); +#endif +} + +int frankenphp_worker_write_stop_fd(int fd) { + return bg_worker_pipe_write_impl(fd, "stop\n", 5); +} + +int frankenphp_worker_write_task_signal(int fd) { + return bg_worker_pipe_write_impl(fd, "task\n", 5); +} + +void frankenphp_worker_close_fd(int fd) { + if (fd >= 0) { +#ifdef PHP_WIN32 + _close(fd); +#else + close(fd); +#endif + } +} + static void frankenphp_update_request_context() { /* the server context is stored on the go side, still SG(server_context) needs * to not be NULL */ @@ -534,11 +751,14 @@ PHP_FUNCTION(frankenphp_handle_request) { Z_PARAM_FUNC(fci, fcc) ZEND_PARSE_PARAMETERS_END(); - if (!is_worker_thread) { - /* not a worker, throw an error */ + if (!is_worker_thread || is_background_worker) { zend_throw_exception( spl_ce_RuntimeException, - "frankenphp_handle_request() called while not in worker mode", 0); + is_background_worker + ? "frankenphp_handle_request() cannot be called from a background " + "worker" + : "frankenphp_handle_request() called while not in worker mode", + 0); RETURN_THROWS(); } @@ -596,6 +816,7 @@ PHP_FUNCTION(frankenphp_handle_request) { } } + bg_worker_vars_cache_reset(); frankenphp_worker_request_shutdown(); go_frankenphp_finish_worker_request(thread_index, callback_ret); if (result.r1 != NULL) { @@ -608,6 +829,506 @@ PHP_FUNCTION(frankenphp_handle_request) { RETURN_TRUE; } +/* Persistent enum storage */ +typedef struct { + zend_string *class_name; + zend_string *case_name; +} bg_worker_enum_t; + +/* Forward declarations */ +static void bg_worker_free_persistent_zval(zval *z); +static void bg_worker_request_copy_zval(zval *dst, zval *src); + +/* Check if a HashTable is an opcache immutable array - safe to share + * across threads without copying. */ +static bool bg_worker_is_immutable(HashTable *ht) { + return (GC_FLAGS(ht) & IS_ARRAY_IMMUTABLE) != 0; +} + +/* Free a stored vars pointer only if it's a persistent copy (not immutable). */ +static void bg_worker_free_stored_vars(void *ptr) { + if (ptr != NULL) { + HashTable *ht = (HashTable *)ptr; + if (!bg_worker_is_immutable(ht)) { + zval z; + ZVAL_ARR(&z, ht); + bg_worker_free_persistent_zval(&z); + } + } +} + +/* Copy or reference a stored vars pointer to request memory. + * Immutable arrays are returned as zero-copy references. */ +static void bg_worker_read_stored_vars(zval *dst, void *ptr) { + HashTable *ht = (HashTable *)ptr; + if (bg_worker_is_immutable(ht)) { + ZVAL_ARR(dst, ht); /* zero-copy: immutable = safe to share */ + } else { + zval src; + ZVAL_ARR(&src, ht); + bg_worker_request_copy_zval(dst, &src); + } +} + +/* Validate that a zval tree contains only scalars, arrays, and enums */ +static bool bg_worker_validate_zval(zval *z) { + switch (Z_TYPE_P(z)) { + case IS_NULL: + case IS_FALSE: + case IS_TRUE: + case IS_LONG: + case IS_DOUBLE: + case IS_STRING: + return true; + case IS_OBJECT: + return (Z_OBJCE_P(z)->ce_flags & ZEND_ACC_ENUM) != 0; + case IS_ARRAY: { + zval *val; + ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(z), val) { + if (!bg_worker_validate_zval(val)) + return false; + } + ZEND_HASH_FOREACH_END(); + return true; + } + default: + return false; + } +} + +/* Deep-copy a zval into persistent memory */ +static void bg_worker_persist_zval(zval *dst, zval *src) { + switch (Z_TYPE_P(src)) { + case IS_NULL: + case IS_FALSE: + case IS_TRUE: + ZVAL_COPY_VALUE(dst, src); + break; + case IS_LONG: + ZVAL_LONG(dst, Z_LVAL_P(src)); + break; + case IS_DOUBLE: + ZVAL_DOUBLE(dst, Z_DVAL_P(src)); + break; + case IS_STRING: { + zend_string *s = Z_STR_P(src); + if (ZSTR_IS_INTERNED(s)) { + ZVAL_STR(dst, s); /* interned = shared memory, no copy needed */ + } else { + ZVAL_NEW_STR(dst, zend_string_init(ZSTR_VAL(s), ZSTR_LEN(s), 1)); + } + break; + } + case IS_OBJECT: { + /* Must be an enum (validated earlier) */ + zend_class_entry *ce = Z_OBJCE_P(src); + bg_worker_enum_t *e = pemalloc(sizeof(bg_worker_enum_t), 1); + e->class_name = + ZSTR_IS_INTERNED(ce->name) + ? ce->name + : zend_string_init(ZSTR_VAL(ce->name), ZSTR_LEN(ce->name), 1); + zval *case_name_zval = zend_enum_fetch_case_name(Z_OBJ_P(src)); + zend_string *case_str = Z_STR_P(case_name_zval); + e->case_name = + ZSTR_IS_INTERNED(case_str) + ? case_str + : zend_string_init(ZSTR_VAL(case_str), ZSTR_LEN(case_str), 1); + ZVAL_PTR(dst, e); + break; + } + case IS_ARRAY: { + HashTable *src_ht = Z_ARRVAL_P(src); + HashTable *dst_ht = pemalloc(sizeof(HashTable), 1); + zend_hash_init(dst_ht, zend_hash_num_elements(src_ht), NULL, NULL, 1); + ZVAL_ARR(dst, dst_ht); + + zend_string *key; + zend_ulong idx; + zval *val; + ZEND_HASH_FOREACH_KEY_VAL(src_ht, idx, key, val) { + zval pval; + bg_worker_persist_zval(&pval, val); + if (key) { + if (ZSTR_IS_INTERNED(key)) { + zend_hash_add_new(dst_ht, key, &pval); + } else { + zend_string *pkey = zend_string_init(ZSTR_VAL(key), ZSTR_LEN(key), 1); + zend_hash_add_new(dst_ht, pkey, &pval); + zend_string_release(pkey); + } + } else { + zend_hash_index_add_new(dst_ht, idx, &pval); + } + } + ZEND_HASH_FOREACH_END(); + break; + } + default: + ZVAL_NULL(dst); + break; + } +} + +/* Deep-free a persistent zval tree */ +static void bg_worker_free_persistent_zval(zval *z) { + switch (Z_TYPE_P(z)) { + case IS_STRING: + if (!ZSTR_IS_INTERNED(Z_STR_P(z))) { + zend_string_free(Z_STR_P(z)); + } + break; + case IS_PTR: { + bg_worker_enum_t *e = (bg_worker_enum_t *)Z_PTR_P(z); + if (!ZSTR_IS_INTERNED(e->class_name)) + zend_string_free(e->class_name); + if (!ZSTR_IS_INTERNED(e->case_name)) + zend_string_free(e->case_name); + pefree(e, 1); + break; + } + case IS_ARRAY: { + zval *val; + ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(z), val) { + bg_worker_free_persistent_zval(val); + } + ZEND_HASH_FOREACH_END(); + zend_hash_destroy(Z_ARRVAL_P(z)); + pefree(Z_ARRVAL_P(z), 1); + break; + } + default: + break; + } +} + +/* Go-callable wrapper to free a persistent HashTable */ +void frankenphp_worker_free_persistent_ht(void *ptr) { + if (ptr && !bg_worker_is_immutable((HashTable *)ptr)) { + zval z; + ZVAL_ARR(&z, (HashTable *)ptr); + bg_worker_free_persistent_zval(&z); + } +} + +/* Deep-copy a persistent zval tree into request memory */ +static void bg_worker_request_copy_zval(zval *dst, zval *src) { + switch (Z_TYPE_P(src)) { + case IS_NULL: + case IS_FALSE: + case IS_TRUE: + ZVAL_COPY_VALUE(dst, src); + break; + case IS_LONG: + ZVAL_LONG(dst, Z_LVAL_P(src)); + break; + case IS_DOUBLE: + ZVAL_DOUBLE(dst, Z_DVAL_P(src)); + break; + case IS_STRING: + if (ZSTR_IS_INTERNED(Z_STR_P(src))) { + ZVAL_STR(dst, Z_STR_P(src)); + } else { + ZVAL_STRINGL(dst, Z_STRVAL_P(src), Z_STRLEN_P(src)); + } + break; + case IS_PTR: { + bg_worker_enum_t *e = (bg_worker_enum_t *)Z_PTR_P(src); + zend_class_entry *ce = zend_lookup_class(e->class_name); + if (!ce || !(ce->ce_flags & ZEND_ACC_ENUM)) { + zend_throw_exception_ex(spl_ce_LogicException, 0, + "Background worker enum class \"%s\" not found", + ZSTR_VAL(e->class_name)); + ZVAL_NULL(dst); + break; + } + zend_object *enum_obj = zend_enum_get_case_cstr(ce, ZSTR_VAL(e->case_name)); + if (!enum_obj) { + zend_throw_exception_ex( + spl_ce_LogicException, 0, + "Background worker enum case \"%s::%s\" not found", + ZSTR_VAL(e->class_name), ZSTR_VAL(e->case_name)); + ZVAL_NULL(dst); + break; + } + ZVAL_OBJ_COPY(dst, enum_obj); + break; + } + case IS_ARRAY: { + HashTable *src_ht = Z_ARRVAL_P(src); + array_init_size(dst, zend_hash_num_elements(src_ht)); + HashTable *dst_ht = Z_ARRVAL_P(dst); + + zend_string *key; + zend_ulong idx; + zval *val; + ZEND_HASH_FOREACH_KEY_VAL(src_ht, idx, key, val) { + zval rval; + bg_worker_request_copy_zval(&rval, val); + if (EG(exception)) { + zval_ptr_dtor(&rval); + break; + } + if (key) { + if (ZSTR_IS_INTERNED(key)) { + zend_hash_add_new(dst_ht, key, &rval); + } else { + zend_string *rkey = zend_string_init(ZSTR_VAL(key), ZSTR_LEN(key), 0); + ZSTR_H(rkey) = ZSTR_H(key); + zend_hash_add_new(dst_ht, rkey, &rval); + zend_string_release(rkey); + } + } else { + zend_hash_index_add_new(dst_ht, idx, &rval); + } + } + ZEND_HASH_FOREACH_END(); + break; + } + default: + ZVAL_NULL(dst); + break; + } +} + +PHP_FUNCTION(frankenphp_worker_set_vars) { + zval *vars_array = NULL; + + ZEND_PARSE_PARAMETERS_START(1, 1); + Z_PARAM_ARRAY(vars_array); + ZEND_PARSE_PARAMETERS_END(); + + HashTable *ht = Z_ARRVAL_P(vars_array); + + if (bg_worker_is_immutable(ht)) { + /* Fast path: immutable arrays are already in shared memory. + * No validation needed (immutable arrays contain only safe types). + * No deep-copy needed. Store the pointer directly. */ + void *old = NULL; + char *error = go_frankenphp_worker_set_vars(thread_index, ht, &old); + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } + bg_worker_free_stored_vars(old); + } else { + /* Slow path: validate, deep-copy to persistent memory */ + zval *val; + ZEND_HASH_FOREACH_VAL(ht, val) { + if (!bg_worker_validate_zval(val)) { + zend_value_error("Values must be null, scalars, arrays, or enums; " + "objects and resources are not allowed"); + RETURN_THROWS(); + } + } + ZEND_HASH_FOREACH_END(); + + zval persistent; + bg_worker_persist_zval(&persistent, vars_array); + + void *old = NULL; + char *error = + go_frankenphp_worker_set_vars(thread_index, Z_ARRVAL(persistent), &old); + if (error) { + bg_worker_free_persistent_zval(&persistent); + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } + bg_worker_free_stored_vars(old); + } +} + +/* Copy vars from persistent storage into a PHP zval. + * For count == 1: copies directly into dst. + * For count > 1: creates a keyed array in dst. + * Returns true on success, false if an exception occurred. */ +bool frankenphp_worker_copy_vars(zval *dst, int count, char **names, + size_t *name_lens, void **ptrs) { + if (count == 1) { + if (ptrs[0]) { + bg_worker_read_stored_vars(dst, ptrs[0]); + } else { + array_init(dst); + } + return !EG(exception); + } + + array_init(dst); + for (int i = 0; i < count; i++) { + zval worker_vars; + if (ptrs[i]) { + bg_worker_read_stored_vars(&worker_vars, ptrs[i]); + if (EG(exception)) { + zval_ptr_dtor(&worker_vars); + return false; + } + } else { + array_init(&worker_vars); + } + add_assoc_zval_ex(dst, names[i], name_lens[i], &worker_vars); + } + return true; +} + +PHP_FUNCTION(frankenphp_worker_get_vars) { + zval *names = NULL; + double timeout = 30.0; + + ZEND_PARSE_PARAMETERS_START(1, 2); + Z_PARAM_ZVAL(names); + Z_PARAM_OPTIONAL + Z_PARAM_DOUBLE(timeout); + ZEND_PARSE_PARAMETERS_END(); + + if (timeout < 0) { + zend_value_error("Timeout must not be negative"); + RETURN_THROWS(); + } + int timeout_ms = (int)(timeout * 1000); + + if (Z_TYPE_P(names) == IS_STRING) { + if (Z_STRLEN_P(names) == 0) { + zend_value_error("Background worker name must not be empty"); + RETURN_THROWS(); + } + + char *name_ptr = Z_STRVAL_P(names); + size_t name_len_val = Z_STRLEN_P(names); + + /* Check per-request cache */ + uint64_t caller_version = 0; + uint64_t out_version = 0; + bg_worker_vars_cache_entry *cached = NULL; + if (worker_vars_cache) { + zval *entry_zv = + zend_hash_str_find(worker_vars_cache, name_ptr, name_len_val); + if (entry_zv) { + cached = Z_PTR_P(entry_zv); + caller_version = cached->version; + } + } + + char *error = go_frankenphp_worker_get_vars( + thread_index, &name_ptr, &name_len_val, 1, timeout_ms, return_value, + cached ? &caller_version : NULL, &out_version); + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } + if (EG(exception)) { + RETURN_THROWS(); + } + + /* Cache hit: Go skipped the copy because version matched */ + if (cached && out_version == caller_version) { + ZVAL_COPY(return_value, &cached->value); + return; + } + + /* Cache miss: store the new result */ + if (!worker_vars_cache) { + worker_vars_cache = malloc(sizeof(HashTable)); + zend_hash_init(worker_vars_cache, 4, NULL, bg_worker_vars_cache_dtor, 0); + } + bg_worker_vars_cache_entry *entry = malloc(sizeof(*entry)); + entry->version = out_version; + ZVAL_COPY(&entry->value, return_value); + zval entry_zv; + ZVAL_PTR(&entry_zv, entry); + zend_hash_str_update(worker_vars_cache, name_ptr, name_len_val, &entry_zv); + + return; + } + + if (Z_TYPE_P(names) != IS_ARRAY) { + zend_type_error("Argument #1 ($name) must be of type string|array, %s " + "given", + zend_zval_type_name(names)); + RETURN_THROWS(); + } + + HashTable *ht = Z_ARRVAL_P(names); + zval *val; + + ZEND_HASH_FOREACH_VAL(ht, val) { + if (Z_TYPE_P(val) != IS_STRING || Z_STRLEN_P(val) == 0) { + zend_value_error("All background worker names must be non-empty strings"); + RETURN_THROWS(); + } + } + ZEND_HASH_FOREACH_END(); + + int name_count = zend_hash_num_elements(ht); + char **name_ptrs = malloc(sizeof(char *) * name_count); + size_t *name_lens_arr = malloc(sizeof(size_t) * name_count); + int idx = 0; + ZEND_HASH_FOREACH_VAL(ht, val) { + name_ptrs[idx] = Z_STRVAL_P(val); + name_lens_arr[idx] = Z_STRLEN_P(val); + idx++; + } + ZEND_HASH_FOREACH_END(); + + char *error = go_frankenphp_worker_get_vars( + thread_index, name_ptrs, name_lens_arr, name_count, timeout_ms, + return_value, NULL, NULL); + free(name_ptrs); + free(name_lens_arr); + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } +} + +PHP_FUNCTION(frankenphp_worker_get_signaling_stream) { + ZEND_PARSE_PARAMETERS_NONE(); + + if (!is_background_worker) { + zend_throw_exception(spl_ce_RuntimeException, + "frankenphp_worker_get_signaling_stream() can only " + "be called from a background worker", + 0); + RETURN_THROWS(); + } + + /* Return the cached stream if already created */ + if (worker_signaling_stream != NULL) { + php_stream_to_zval(worker_signaling_stream, return_value); + GC_ADDREF(Z_COUNTED_P(return_value)); + return; + } + + if (worker_stop_fds[0] < 0) { + zend_throw_exception(spl_ce_RuntimeException, + "failed to create background worker stop pipe", 0); + RETURN_THROWS(); + } + + int fd = frankenphp_worker_dup_fd(worker_stop_fds[0]); + if (fd < 0) { + zend_throw_exception(spl_ce_RuntimeException, + "failed to dup background worker stop fd", 0); + RETURN_THROWS(); + } + + php_stream *stream = php_stream_fopen_from_fd(fd, "rb", NULL); + if (!stream) { + frankenphp_worker_close_fd(fd); + zend_throw_exception(spl_ce_RuntimeException, + "failed to create stream from stop fd", 0); + RETURN_THROWS(); + } + + worker_signaling_stream = stream; + php_stream_to_zval(stream, return_value); + + /* Keep an extra ref so PHP can't destroy the stream while TLS caches it */ + GC_ADDREF(Z_COUNTED_P(return_value)); +} + PHP_FUNCTION(headers_send) { zend_long response_code = 200; @@ -1047,6 +1768,9 @@ static void *php_thread(void *arg) { #endif #endif + /* Save PHP's timer handle for best-effort force-kill after grace period */ + frankenphp_save_php_timer(thread_index); + // loop until Go signals to stop char *scriptName = NULL; while ((scriptName = go_frankenphp_before_script_execution(thread_index))) { @@ -1058,6 +1782,7 @@ static void *php_thread(void *arg) { ts_free_thread(); #endif + frankenphp_worker_close_stop_fds(); go_frankenphp_on_thread_shutdown(thread_index); return NULL; @@ -1217,6 +1942,46 @@ int frankenphp_execute_script(char *file_name) { file_handle.primary_script = 1; + if (worker_name != NULL) { + zend_is_auto_global_str("_SERVER", sizeof("_SERVER") - 1); + zval *server = &PG(http_globals)[TRACK_VARS_SERVER]; + if (server && Z_TYPE_P(server) == IS_ARRAY) { + zval name_zval; + ZVAL_STRING(&name_zval, worker_name); + zend_hash_str_update(Z_ARRVAL_P(server), "FRANKENPHP_WORKER_NAME", + sizeof("FRANKENPHP_WORKER_NAME") - 1, &name_zval); + + zval bg_zval; + ZVAL_BOOL(&bg_zval, is_background_worker); + zend_hash_str_update(Z_ARRVAL_P(server), "FRANKENPHP_WORKER_BACKGROUND", + sizeof("FRANKENPHP_WORKER_BACKGROUND") - 1, + &bg_zval); + } + } + + if (is_background_worker) { + CG(skip_shebang) = 1; + + /* Background workers run indefinitely - disable max_execution_time */ + zend_set_timeout(0, 0); + + zval *server = &PG(http_globals)[TRACK_VARS_SERVER]; + if (server && Z_TYPE_P(server) == IS_ARRAY) { + zval argv_array; + array_init(&argv_array); + add_next_index_string(&argv_array, file_name); + add_next_index_string(&argv_array, worker_name); + + zval argc_zval; + ZVAL_LONG(&argc_zval, 2); + + zend_hash_str_update(Z_ARRVAL_P(server), "argv", sizeof("argv") - 1, + &argv_array); + zend_hash_str_update(Z_ARRVAL_P(server), "argc", sizeof("argc") - 1, + &argc_zval); + } + } + zend_first_try { EG(exit_status) = 0; php_execute_script(&file_handle); @@ -1233,6 +1998,7 @@ int frankenphp_execute_script(char *file_name) { sandboxed_env = NULL; } + bg_worker_vars_cache_reset(); php_request_shutdown((void *)0); frankenphp_free_request_context(); diff --git a/frankenphp.go b/frankenphp.go index d2aaa3c7d9..e08daef000 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -48,6 +48,7 @@ var ( ErrMainThreadCreation = errors.New("error creating the main thread") ErrScriptExecution = errors.New("error during PHP script execution") ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config") + ErrNotHTTPWorker = errors.New("worker is not an HTTP worker") ErrInvalidRequestPath = ErrRejected{"invalid request path", http.StatusBadRequest} ErrInvalidContentLengthHeader = ErrRejected{"invalid Content-Length header", http.StatusBadRequest} @@ -235,6 +236,15 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return numWorkers, nil } +// applyReservedThreads adds reserved background worker threads to the thread budget. +// Called after calculateMaxThreads so background threads don't compete with HTTP scaling. +func applyReservedThreads(opt *opt) { + if opt.reservedThreads > 0 { + opt.numThreads += opt.reservedThreads + opt.maxThreads += opt.reservedThreads + } +} + // Init starts the PHP runtime and the configured workers. func Init(options ...Option) error { if isRunning { @@ -286,6 +296,8 @@ func Init(options ...Option) error { return err } + applyReservedThreads(opt) + metrics.TotalThreads(opt.numThreads) config := Config() @@ -414,6 +426,9 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error // Detect if a worker is available to handle this request if fc.worker != nil { + if fc.worker.isBackgroundWorker { + return ErrNotHTTPWorker + } return fc.worker.handleRequest(ch) } diff --git a/frankenphp.h b/frankenphp.h index f25cb85128..7742f3d672 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -171,7 +171,14 @@ bool frankenphp_new_php_thread(uintptr_t thread_index); bool frankenphp_shutdown_dummy_request(void); int frankenphp_execute_script(char *file_name); void frankenphp_update_local_thread_context(bool is_worker); - +void frankenphp_set_worker_name(char *name, bool background); +int frankenphp_worker_get_stop_fd_write(void); +int frankenphp_worker_write_stop_fd(int fd); +int frankenphp_worker_write_task_signal(int fd); +void frankenphp_worker_free_persistent_ht(void *ptr); +bool frankenphp_worker_copy_vars(zval *dst, int count, char **names, + size_t *name_lens, void **ptrs); +void frankenphp_worker_close_fd(int fd); int frankenphp_execute_script_cli(char *script, int argc, char **argv, bool eval); @@ -188,4 +195,9 @@ int frankenphp_get_current_memory_limit(); void register_extensions(zend_module_entry **m, int len); +void frankenphp_init_force_kill(int num_threads); +void frankenphp_save_php_timer(uintptr_t thread_index); +void frankenphp_force_kill_thread(uintptr_t thread_index); +void frankenphp_destroy_force_kill(void); + #endif diff --git a/frankenphp.stub.php b/frankenphp.stub.php index d6c85aa05f..fbc258fb5b 100644 --- a/frankenphp.stub.php +++ b/frankenphp.stub.php @@ -16,6 +16,19 @@ function frankenphp_handle_request(callable $callback): bool {} +/** + * @param array> $vars Nested arrays must recursively follow the same type constraints + */ +function frankenphp_worker_set_vars(array $vars): void {} + +/** + * @return array|\UnitEnum> Nested arrays recursively follow the same type constraints + */ +function frankenphp_worker_get_vars(string|array $name, float $timeout = 30.0): array {} + +/** @return resource */ +function frankenphp_worker_get_signaling_stream() {} + function headers_send(int $status = 200): int {} function frankenphp_finish_request(): bool {} @@ -50,7 +63,7 @@ function apache_response_headers(): array|bool {} function mercure_publish(string|array $topics, string $data = '', bool $private = false, ?string $id = null, ?string $type = null, ?int $retry = null): string {} /** - * @param int $level The importance or severity of a log event. The higher the level, the more important or severe the event. For more details, see: https://pkg.go.dev/log/slog#Level - * array $context Values of the array will be converted to the corresponding Go type (if supported by FrankenPHP) and added to the context of the structured logs using https://pkg.go.dev/log/slog#Attr + * @param int $level The importance or severity of a log event. The higher the level, the more important or severe the event. For more details, see: https://pkg.go.dev/log/slog#Level + * @param array $context Values of the array will be converted to the corresponding Go type (if supported by FrankenPHP) and added to the context of the structured logs using https://pkg.go.dev/log/slog#Attr */ function frankenphp_log(string $message, int $level = 0, array $context = []): void {} diff --git a/frankenphp_arginfo.h b/frankenphp_arginfo.h index 4f2707cbca..55620eed74 100644 --- a/frankenphp_arginfo.h +++ b/frankenphp_arginfo.h @@ -5,6 +5,18 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_handle_request, 0, 1, ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_worker_set_vars, 0, 1, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, vars, IS_ARRAY, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_worker_get_vars, 0, 1, IS_ARRAY, 0) + ZEND_ARG_TYPE_MASK(0, name, MAY_BE_STRING|MAY_BE_ARRAY, NULL) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, timeout, IS_DOUBLE, 0, "30.0") +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_frankenphp_worker_get_signaling_stream, 0, 0, 0) +ZEND_END_ARG_INFO() + ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_headers_send, 0, 0, IS_LONG, 0) ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, status, IS_LONG, 0, "200") ZEND_END_ARG_INFO() @@ -43,6 +55,9 @@ ZEND_END_ARG_INFO() ZEND_FUNCTION(frankenphp_handle_request); +ZEND_FUNCTION(frankenphp_worker_set_vars); +ZEND_FUNCTION(frankenphp_worker_get_vars); +ZEND_FUNCTION(frankenphp_worker_get_signaling_stream); ZEND_FUNCTION(headers_send); ZEND_FUNCTION(frankenphp_finish_request); ZEND_FUNCTION(frankenphp_request_headers); @@ -53,6 +68,9 @@ ZEND_FUNCTION(frankenphp_log); static const zend_function_entry ext_functions[] = { ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request) + ZEND_FE(frankenphp_worker_set_vars, arginfo_frankenphp_worker_set_vars) + ZEND_FE(frankenphp_worker_get_vars, arginfo_frankenphp_worker_get_vars) + ZEND_FE(frankenphp_worker_get_signaling_stream, arginfo_frankenphp_worker_get_signaling_stream) ZEND_FE(headers_send, arginfo_headers_send) ZEND_FE(frankenphp_finish_request, arginfo_frankenphp_finish_request) ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request, arginfo_fastcgi_finish_request) diff --git a/frankenphp_test.go b/frankenphp_test.go index 47e65c490b..77d2a8daa5 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -7,6 +7,8 @@ package frankenphp_test import ( "bytes" "context" + "crypto/md5" + "encoding/hex" "errors" "flag" "fmt" @@ -46,6 +48,7 @@ type testOptions struct { realServer bool logger *slog.Logger initOpts []frankenphp.Option + workerOpts []frankenphp.WorkerOption requestOpts []frankenphp.RequestOption phpIni map[string]string } @@ -67,6 +70,7 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), * frankenphp.WithWorkerEnv(opts.env), frankenphp.WithWorkerWatchMode(opts.watch), } + workerOpts = append(workerOpts, opts.workerOpts...) initOpts = append(initOpts, frankenphp.WithWorkers("workerName", testDataDir+opts.workerScript, opts.nbWorkers, workerOpts...)) } initOpts = append(initOpts, opts.initOpts...) @@ -803,6 +807,296 @@ func testFileUpload(t *testing.T, opts *testOptions) { }, opts) } +func TestBackgroundWorkerGetVars(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-with-argv.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + // get_vars blocks until the background worker calls set_vars - no polling needed + body, _ := testGet("http://example.com/background-worker-start.php", handler, t) + assert.Equal(t, "test-worker", body) + }, &testOptions{ + workerScript: "background-worker-start.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(frankenphp.NewBackgroundWorkerLookupWithCatchAll(entrypoint)), + }, + }) +} + +func TestBackgroundWorkerGetVarsIdentity(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-with-argv.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-identity.php", handler, t) + assert.Equal(t, "IDENTICAL", body) + }, &testOptions{ + workerScript: "background-worker-identity.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(frankenphp.NewBackgroundWorkerLookupWithCatchAll(entrypoint)), + }, + }) +} + +func TestBackgroundWorkerAtMostOnce(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-dedup.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-start-twice.php", handler, t) + assert.Equal(t, "dedup-worker", body) + }, &testOptions{ + workerScript: "background-worker-start-twice.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(frankenphp.NewBackgroundWorkerLookupWithCatchAll(entrypoint)), + }, + }) +} + +func TestBackgroundWorkerNoEntrypoint(t *testing.T) { + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-no-entrypoint.php", handler, t) + assert.Equal(t, "no background worker configured in this php_server", body) + }, &testOptions{ + workerScript: "background-worker-no-entrypoint.php", + nbWorkers: 1, + nbParallelRequests: 1, + }) +} + +func TestBackgroundWorkerSetVarsValidation(t *testing.T) { + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-set-server-var-validation.php", handler, t) + assert.Contains(t, body, "NON_BACKGROUND:blocked") + assert.Contains(t, body, "STREAM_NON_BACKGROUND:blocked") + }, &testOptions{ + workerScript: "background-worker-set-server-var-validation.php", + nbWorkers: 1, + nbParallelRequests: 1, + }) +} + +func TestBackgroundWorkerTypeValidation(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-type-validation-entrypoint.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-type-validation.php", handler, t) + assert.Contains(t, body, "INT_VAL:allowed") + assert.Contains(t, body, "INT_KEY:allowed") + assert.Contains(t, body, "NESTED:allowed") + assert.Contains(t, body, "OBJECT:blocked") + assert.Contains(t, body, "REFERENCE:blocked") + assert.Contains(t, body, "ENUM:allowed") + assert.Contains(t, body, "ENUM_RESTORED:match") + }, &testOptions{ + workerScript: "background-worker-type-validation.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(frankenphp.NewBackgroundWorkerLookupWithCatchAll(entrypoint)), + }, + }) +} + +func TestBackgroundWorkerBinarySafe(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-binary-entrypoint.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-binary-safe.php", handler, t) + assert.Contains(t, body, "BINARY_LEN:11") + assert.Contains(t, body, "BINARY_CONTENT:"+hex.EncodeToString([]byte("hello\x00world"))) + assert.Contains(t, body, "UTF8:héllo wörld 🚀") + assert.Contains(t, body, "EMPTY_EXISTS:yes") + assert.Contains(t, body, "EMPTY_LEN:0") + }, &testOptions{ + workerScript: "background-worker-binary-safe.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(frankenphp.NewBackgroundWorkerLookupWithCatchAll(entrypoint)), + }, + }) +} + +func TestBackgroundWorkerGetVarsMultiple(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-multi-entrypoint.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-multi.php", handler, t) + assert.Equal(t, "worker-a:NAME_WORKER_A=worker-a,worker-b:NAME_WORKER_B=worker-b", body) + }, &testOptions{ + workerScript: "background-worker-multi.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(frankenphp.NewBackgroundWorkerLookupWithCatchAll(entrypoint)), + }, + }) +} + +func TestBackgroundWorkerEnumMissing(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-enum-missing-entrypoint.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-enum-missing.php", handler, t) + assert.Contains(t, body, "LogicException:") + assert.Contains(t, body, "SidekickOnlyEnum") + }, &testOptions{ + workerScript: "background-worker-enum-missing.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(frankenphp.NewBackgroundWorkerLookupWithCatchAll(entrypoint)), + }, + }) +} + +func TestBackgroundWorkerCrashRestart(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-crash.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + // get_vars blocks - background worker crashes, restarts, then publishes + body, _ := testGet("http://example.com/background-worker-crash-starter.php", handler, t) + assert.Equal(t, "restarted", body) + }, &testOptions{ + workerScript: "background-worker-crash-starter.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(frankenphp.NewBackgroundWorkerLookupWithCatchAll(entrypoint)), + }, + }) +} + +func TestBackgroundWorkerSignalingStream(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-stop-fd-entrypoint.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-stop-fd.php", handler, t) + assert.Equal(t, "stream", body) + }, &testOptions{ + workerScript: "background-worker-stop-fd.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(frankenphp.NewBackgroundWorkerLookupWithCatchAll(entrypoint)), + }, + }) +} + +func TestBackgroundWorkerStopsOnWorkerRestart(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-restart-entrypoint.php" + name := fmt.Sprintf("restart-background-worker-%d", time.Now().UnixNano()) + hash := fmt.Sprintf("%x", md5.Sum([]byte(name))) + runningMarker := filepath.Join(os.TempDir(), "background-worker-restart-running-"+hash) + t.Cleanup(func() { + _ = os.Remove(runningMarker) + }) + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-restart.php?name="+name, handler, t) + assert.Equal(t, "1", body) + require.FileExists(t, runningMarker) + + frankenphp.RestartWorkers() + + require.Eventually(t, func() bool { + _, err := os.Stat(runningMarker) + return errors.Is(err, os.ErrNotExist) + }, 5*time.Second, 50*time.Millisecond) + + body, _ = testGet("http://example.com/background-worker-restart.php?name="+name, handler, t) + assert.Equal(t, "2", body) + require.FileExists(t, runningMarker) + + frankenphp.RestartWorkers() + + require.Eventually(t, func() bool { + _, err := os.Stat(runningMarker) + return errors.Is(err, os.ErrNotExist) + }, 5*time.Second, 50*time.Millisecond) + }, &testOptions{ + workerScript: "background-worker-restart.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(frankenphp.NewBackgroundWorkerLookupWithCatchAll(entrypoint)), + }, + }) +} + +func TestBackgroundWorkerSignalingStreamNonBackgroundWorker(t *testing.T) { + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-stop-fd-non-background-worker.php", handler, t) + assert.Equal(t, "thrown", body) + }, &testOptions{ + workerScript: "background-worker-stop-fd-non-background-worker.php", + nbWorkers: 1, + nbParallelRequests: 1, + }) +} + +func TestBackgroundWorkerMultipleEntrypoints(t *testing.T) { + cwd, _ := os.Getwd() + + t.Run("entrypoint-a", func(t *testing.T) { + lookup := frankenphp.NewBackgroundWorkerLookupWithCatchAll(cwd + "/testdata/background-worker-multi-entrypoint-a.php") + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-multi-file.php", handler, t) + assert.Equal(t, "entrypoint-a:worker-from-a,entrypoint-a:worker-from-b", body) + }, &testOptions{ + workerScript: "background-worker-multi-file.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(lookup), + }, + }) + }) + + t.Run("entrypoint-b", func(t *testing.T) { + lookup := frankenphp.NewBackgroundWorkerLookupWithCatchAll(cwd + "/testdata/background-worker-multi-entrypoint-b.php") + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-multi-file.php", handler, t) + assert.Equal(t, "entrypoint-b:worker-from-a,entrypoint-b:worker-from-b", body) + }, &testOptions{ + workerScript: "background-worker-multi-file.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundLookup(lookup), + }, + }) + }) +} + func ExampleServeHTTP() { if err := frankenphp.Init(); err != nil { panic(err) diff --git a/options.go b/options.go index 9ba1f916f6..63c42c7615 100644 --- a/options.go +++ b/options.go @@ -22,15 +22,16 @@ type WorkerOption func(*workerOpt) error type opt struct { hotReloadOpt - ctx context.Context - numThreads int - maxThreads int - workers []workerOpt - logger *slog.Logger - metrics Metrics - phpIni map[string]string - maxWaitTime time.Duration - maxIdleTime time.Duration + ctx context.Context + numThreads int + maxThreads int + reservedThreads int // threads reserved for background workers (outside HTTP scaling budget) + workers []workerOpt + logger *slog.Logger + metrics Metrics + phpIni map[string]string + maxWaitTime time.Duration + maxIdleTime time.Duration } type workerOpt struct { @@ -49,6 +50,7 @@ type workerOpt struct { onThreadShutdown func(int) onServerStartup func() onServerShutdown func() + backgroundLookup *BackgroundWorkerLookup } // WithContext sets the main context to use. @@ -77,6 +79,16 @@ func WithMaxThreads(maxThreads int) Option { } } +// WithReservedThreads reserves threads for background workers. +// These threads are added to numThreads and maxThreads, outside the HTTP scaling budget. +func WithReservedThreads(n int) Option { + return func(o *opt) error { + o.reservedThreads = n + + return nil + } +} + func WithMetrics(m Metrics) Option { return func(o *opt) error { o.metrics = m @@ -85,6 +97,14 @@ func WithMetrics(m Metrics) Option { } } +func WithWorkerBackgroundLookup(lookup *BackgroundWorkerLookup) WorkerOption { + return func(w *workerOpt) error { + w.backgroundLookup = lookup + + return nil + } +} + // WithWorkers configures the PHP workers to start func WithWorkers(name, fileName string, num int, options ...WorkerOption) Option { return func(o *opt) error { diff --git a/phpmainthread.go b/phpmainthread.go index ba3917e846..39ebf4b073 100644 --- a/phpmainthread.go +++ b/phpmainthread.go @@ -54,6 +54,9 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) return nil, err } + // initialize force-kill support for stuck background workers + C.frankenphp_init_force_kill(C.int(mainThread.maxThreads)) + // initialize all other threads phpThreads = make([]*phpThread, mainThread.maxThreads) phpThreads[0] = initialThread @@ -95,6 +98,7 @@ func drainPHPThreads() { } doneWG.Wait() + C.frankenphp_destroy_force_kill() mainThread.state.Set(state.Done) mainThread.state.WaitFor(state.Reserved) phpThreads = nil diff --git a/phpthread.go b/phpthread.go index fdf263717c..bf3ceae057 100644 --- a/phpthread.go +++ b/phpthread.go @@ -71,6 +71,13 @@ func (thread *phpThread) shutdown() { return } + // Signal background worker stop pipe to unblock stream_select/sleep + if handler, ok := thread.handler.(*workerThread); ok && handler.worker.isBackgroundWorker { + if fd := handler.worker.backgroundStopFdWrite.Load(); fd >= 0 { + C.frankenphp_worker_write_stop_fd(C.int(fd)) + } + } + close(thread.drainChan) thread.state.WaitFor(state.Done) thread.drainChan = make(chan struct{}) diff --git a/requestoptions.go b/requestoptions.go index 42cc3cf7c0..64ac8ef579 100644 --- a/requestoptions.go +++ b/requestoptions.go @@ -164,3 +164,11 @@ func WithWorkerName(name string) RequestOption { return nil } } + +func WithRequestBackgroundLookup(lookup *BackgroundWorkerLookup) RequestOption { + return func(o *frankenPHPContext) error { + o.backgroundLookup = lookup + + return nil + } +} diff --git a/testdata/background-worker-binary-entrypoint.php b/testdata/background-worker-binary-entrypoint.php new file mode 100644 index 0000000000..4692c15f0f --- /dev/null +++ b/testdata/background-worker-binary-entrypoint.php @@ -0,0 +1,12 @@ + "hello\x00world", + 'UTF8_TEST' => "héllo wörld 🚀", + 'EMPTY_VAL' => "", +]); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-binary-safe.php b/testdata/background-worker-binary-safe.php new file mode 100644 index 0000000000..ca3637ff23 --- /dev/null +++ b/testdata/background-worker-binary-safe.php @@ -0,0 +1,23 @@ +getMessage(); + return; + } + + $results = []; + + $bin = $vars['BINARY_TEST'] ?? 'NOT_SET'; + $results[] = 'BINARY_LEN:' . strlen($bin); + $results[] = 'BINARY_CONTENT:' . bin2hex($bin); + + $results[] = 'UTF8:' . ($vars['UTF8_TEST'] ?? 'NOT_SET'); + + $results[] = 'EMPTY_EXISTS:' . (array_key_exists('EMPTY_VAL', $vars) ? 'yes' : 'no'); + $results[] = 'EMPTY_LEN:' . strlen($vars['EMPTY_VAL'] ?? 'NOT_SET'); + + echo implode("\n", $results); +}); diff --git a/testdata/background-worker-crash-starter.php b/testdata/background-worker-crash-starter.php new file mode 100644 index 0000000000..59ace50998 --- /dev/null +++ b/testdata/background-worker-crash-starter.php @@ -0,0 +1,10 @@ +getMessage(); + } +}); diff --git a/testdata/background-worker-crash.php b/testdata/background-worker-crash.php new file mode 100644 index 0000000000..9b59e7b69f --- /dev/null +++ b/testdata/background-worker-crash.php @@ -0,0 +1,18 @@ + 'restarted']); + +while (!background_worker_should_stop(30)) { +} + +@unlink($marker); diff --git a/testdata/background-worker-dedup.php b/testdata/background-worker-dedup.php new file mode 100644 index 0000000000..874de3b203 --- /dev/null +++ b/testdata/background-worker-dedup.php @@ -0,0 +1,10 @@ + $name]); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-enum-missing-entrypoint.php b/testdata/background-worker-enum-missing-entrypoint.php new file mode 100644 index 0000000000..67903909e1 --- /dev/null +++ b/testdata/background-worker-enum-missing-entrypoint.php @@ -0,0 +1,13 @@ + SidekickOnlyEnum::Foo]); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-enum-missing.php b/testdata/background-worker-enum-missing.php new file mode 100644 index 0000000000..7088b5b73d --- /dev/null +++ b/testdata/background-worker-enum-missing.php @@ -0,0 +1,14 @@ +getMessage(); + } catch (\Throwable $e) { + echo 'other:' . get_class($e); + } +}); diff --git a/testdata/background-worker-helper.php b/testdata/background-worker-helper.php new file mode 100644 index 0000000000..fb9960c0c8 --- /dev/null +++ b/testdata/background-worker-helper.php @@ -0,0 +1,14 @@ + false, // timeout + false => true, // error (pipe closed) = stop + default => "stop\n" === fgets($signalingStream), + }; +} diff --git a/testdata/background-worker-identity.php b/testdata/background-worker-identity.php new file mode 100644 index 0000000000..dbae8400ae --- /dev/null +++ b/testdata/background-worker-identity.php @@ -0,0 +1,11 @@ +getMessage(); + } +}); diff --git a/testdata/background-worker-multi-entrypoint-a.php b/testdata/background-worker-multi-entrypoint-a.php new file mode 100644 index 0000000000..dc886f671a --- /dev/null +++ b/testdata/background-worker-multi-entrypoint-a.php @@ -0,0 +1,8 @@ + 'entrypoint-a', 'NAME' => $_SERVER['FRANKENPHP_WORKER_NAME'] ?? 'unknown']); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-multi-entrypoint-b.php b/testdata/background-worker-multi-entrypoint-b.php new file mode 100644 index 0000000000..ee23b075d3 --- /dev/null +++ b/testdata/background-worker-multi-entrypoint-b.php @@ -0,0 +1,8 @@ + 'entrypoint-b', 'NAME' => $_SERVER['FRANKENPHP_WORKER_NAME'] ?? 'unknown']); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-multi-entrypoint.php b/testdata/background-worker-multi-entrypoint.php new file mode 100644 index 0000000000..b63af382df --- /dev/null +++ b/testdata/background-worker-multi-entrypoint.php @@ -0,0 +1,10 @@ + $name]); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-multi-file.php b/testdata/background-worker-multi-file.php new file mode 100644 index 0000000000..758809f9a1 --- /dev/null +++ b/testdata/background-worker-multi-file.php @@ -0,0 +1,8 @@ + $vars) { + foreach ($vars as $k => $v) { + $parts[] = "$name:$k=$v"; + } + } + echo implode(',', $parts); + } catch (\Throwable $e) { + echo 'ERROR:' . $e->getMessage(); + } +}); diff --git a/testdata/background-worker-no-entrypoint.php b/testdata/background-worker-no-entrypoint.php new file mode 100644 index 0000000000..715bc9037b --- /dev/null +++ b/testdata/background-worker-no-entrypoint.php @@ -0,0 +1,10 @@ +getMessage(); + } +}); diff --git a/testdata/background-worker-restart-entrypoint.php b/testdata/background-worker-restart-entrypoint.php new file mode 100644 index 0000000000..52783b6cfc --- /dev/null +++ b/testdata/background-worker-restart-entrypoint.php @@ -0,0 +1,19 @@ + (string) $generation]); + +try { + while (!background_worker_should_stop(30)) { + } +} finally { + @unlink($runningMarker); +} diff --git a/testdata/background-worker-restart.php b/testdata/background-worker-restart.php new file mode 100644 index 0000000000..2189573303 --- /dev/null +++ b/testdata/background-worker-restart.php @@ -0,0 +1,7 @@ + 'val']); + $results[] = 'NON_BACKGROUND:no_error'; + } catch (\RuntimeException $e) { + $results[] = 'NON_BACKGROUND:blocked'; + } + + // get_signaling_stream from non-background-worker context should throw + try { + frankenphp_worker_get_signaling_stream(); + $results[] = 'STREAM_NON_BACKGROUND:no_error'; + } catch (\RuntimeException $e) { + $results[] = 'STREAM_NON_BACKGROUND:blocked'; + } + + echo implode("\n", $results); +}); diff --git a/testdata/background-worker-start-twice.php b/testdata/background-worker-start-twice.php new file mode 100644 index 0000000000..56952a687c --- /dev/null +++ b/testdata/background-worker-start-twice.php @@ -0,0 +1,10 @@ +getMessage(); + } +}); diff --git a/testdata/background-worker-start.php b/testdata/background-worker-start.php new file mode 100644 index 0000000000..c4ea3fce1f --- /dev/null +++ b/testdata/background-worker-start.php @@ -0,0 +1,10 @@ +getMessage(); + } +}); diff --git a/testdata/background-worker-stop-fd-entrypoint.php b/testdata/background-worker-stop-fd-entrypoint.php new file mode 100644 index 0000000000..40bd3cfe99 --- /dev/null +++ b/testdata/background-worker-stop-fd-entrypoint.php @@ -0,0 +1,18 @@ + get_resource_type($stream), +]); + +$r = [$stream]; +$w = $e = []; +stream_select($r, $w, $e, 30); + +$signal = fgets($stream); + +frankenphp_worker_set_vars([ + 'STREAM_TYPE' => get_resource_type($stream), + 'SIGNAL' => $signal, +]); diff --git a/testdata/background-worker-stop-fd-non-background-worker.php b/testdata/background-worker-stop-fd-non-background-worker.php new file mode 100644 index 0000000000..50af635dd9 --- /dev/null +++ b/testdata/background-worker-stop-fd-non-background-worker.php @@ -0,0 +1,10 @@ + 123]); + $results[] = 'INT_VAL:allowed'; +} catch (\Throwable $e) { + $results[] = 'INT_VAL:blocked'; +} + +// int keys allowed +try { + frankenphp_worker_set_vars([0 => 'val']); + $results[] = 'INT_KEY:allowed'; +} catch (\Throwable $e) { + $results[] = 'INT_KEY:blocked'; +} + +// nested arrays allowed +try { + frankenphp_worker_set_vars(['nested' => ['a' => 1, 'b' => [true, null]]]); + $results[] = 'NESTED:allowed'; +} catch (\Throwable $e) { + $results[] = 'NESTED:blocked'; +} + +// objects rejected +try { + frankenphp_worker_set_vars(['KEY' => new \stdClass()]); + $results[] = 'OBJECT:allowed'; +} catch (\ValueError $e) { + $results[] = 'OBJECT:blocked'; +} + +// references rejected +try { + $ref = 'hello'; + frankenphp_worker_set_vars(['KEY' => &$ref]); + $results[] = 'REFERENCE:allowed'; +} catch (\ValueError $e) { + $results[] = 'REFERENCE:blocked'; +} + +// enums allowed - final set_vars with all results +try { + $results[] = 'ENUM:allowed'; // if we get here, the call below will confirm it + frankenphp_worker_set_vars(['status' => TestStatus::Active, 'RESULTS' => implode(',', $results)]); +} catch (\Throwable $e) { + $results[array_key_last($results)] = 'ENUM:blocked'; + frankenphp_worker_set_vars(['RESULTS' => implode(',', $results)]); +} + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-type-validation.php b/testdata/background-worker-type-validation.php new file mode 100644 index 0000000000..7ed892f3bb --- /dev/null +++ b/testdata/background-worker-type-validation.php @@ -0,0 +1,23 @@ + $name]); + +while (!background_worker_should_stop(30)) { +} diff --git a/threadworker.go b/threadworker.go index a0984afab7..28975c7436 100644 --- a/threadworker.go +++ b/threadworker.go @@ -101,10 +101,15 @@ func (handler *workerThread) name() string { func setupWorkerScript(handler *workerThread, worker *worker) { metrics.StartWorker(worker.name) - // Create a dummy request to set up the worker + opts := append([]RequestOption(nil), worker.requestOptions...) + C.frankenphp_set_worker_name(handler.thread.pinCString(worker.name), C._Bool(worker.isBackgroundWorker)) + if worker.isBackgroundWorker { + worker.backgroundStopFdWrite.Store(int32(C.frankenphp_worker_get_stop_fd_write())) + } + fc, err := newDummyContext( filepath.Base(worker.fileName), - worker.requestOptions..., + opts..., ) if err != nil { panic(err) @@ -120,6 +125,21 @@ func setupWorkerScript(handler *workerThread, worker *worker) { if globalLogger.Enabled(ctx, slog.LevelDebug) { globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) } + + if worker.isBackgroundWorker { + handler.thread.state.Set(state.Ready) + fc.scriptFilename = worker.fileName + } +} + +func (handler *workerThread) markBackgroundReady() { + if !handler.isBootingScript { + return + } + + handler.failureCount = 0 + handler.isBootingScript = false + metrics.ReadyWorker(handler.worker.name) } func tearDownWorkerScript(handler *workerThread, exitStatus int) { @@ -127,6 +147,12 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { handler.dummyFrankenPHPContext = nil handler.dummyContext = nil + // Invalidate the cached stop fd - the C thread-local pipe is closed in frankenphp.c + // Prevents stale fd from being written to during the restart window + if worker.isBackgroundWorker { + worker.backgroundStopFdWrite.Store(-1) + } + // if the worker request is not nil, the script might have crashed // make sure to close the worker request context if handler.workerFrankenPHPContext != nil { @@ -296,7 +322,7 @@ func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t, retval *C.zval thread.handler.(*workerThread).workerFrankenPHPContext = nil thread.handler.(*workerThread).workerContext = nil - if globalLogger.Enabled(ctx, slog.LevelDebug) { + if globalLogger.Enabled(ctx, slog.LevelDebug) && !thread.handler.(*workerThread).worker.isBackgroundWorker { if fc.request == nil { fc.logger.LogAttrs(ctx, slog.LevelDebug, "request handling finished", slog.String("worker", fc.worker.name), slog.Int("thread", thread.threadIndex)) } else { diff --git a/worker.go b/worker.go index c97cc4a3a7..3a9b4f4cb0 100644 --- a/worker.go +++ b/worker.go @@ -16,6 +16,10 @@ import ( "github.com/dunglas/frankenphp/internal/state" ) +// backgroundWorkerGracePeriod is the time background workers have to stop +// gracefully after receiving the stop signal before being force-killed. +const backgroundWorkerGracePeriod = 5 * time.Second + // represents a worker script and can have many threads assigned to it type worker struct { mercureContext @@ -33,6 +37,11 @@ type worker struct { onThreadReady func(int) onThreadShutdown func(int) queuedRequests atomic.Int32 + isBackgroundWorker bool + backgroundLookup *BackgroundWorkerLookup + backgroundRegistry *BackgroundWorkerRegistry + backgroundWorker *backgroundWorkerState + backgroundStopFdWrite atomic.Int32 // write end of the stop pipe, -1 if not set } var ( @@ -95,6 +104,11 @@ func initWorkers(opt []workerOpt) error { startupFailChan = nil } + // Auto-start named background workers with num >= 1 + if err := startAutoBackgroundWorkers(opt); err != nil { + return err + } + return nil } @@ -148,8 +162,12 @@ func newWorker(o workerOpt) (*worker, error) { maxConsecutiveFailures: o.maxConsecutiveFailures, onThreadReady: o.onThreadReady, onThreadShutdown: o.onThreadShutdown, + isBackgroundWorker: false, + backgroundLookup: o.backgroundLookup, } + w.backgroundStopFdWrite.Store(-1) + w.configureMercure(&o) w.requestOptions = append( @@ -167,6 +185,9 @@ func newWorker(o workerOpt) (*worker, error) { // EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown func DrainWorkers() { + scalingMu.Lock() + defer scalingMu.Unlock() + _ = drainWorkerThreads() } @@ -174,21 +195,35 @@ func drainWorkerThreads() []*phpThread { var ( ready sync.WaitGroup drainedThreads []*phpThread + bgThreads []*phpThread + bgWorkers []*worker ) for _, worker := range workers { worker.threadMutex.RLock() - ready.Add(len(worker.threads)) + threads := append([]*phpThread(nil), worker.threads...) + worker.threadMutex.RUnlock() - for _, thread := range worker.threads { - if !thread.state.RequestSafeStateChange(state.Restarting) { - ready.Done() + for _, thread := range threads { + if worker.isBackgroundWorker { + // Signal background workers to stop via the signaling stream + if !thread.state.RequestSafeStateChange(state.ShuttingDown) { + continue + } + if fd := worker.backgroundStopFdWrite.Load(); fd >= 0 { + C.frankenphp_worker_write_stop_fd(C.int(fd)) + } + close(thread.drainChan) + bgThreads = append(bgThreads, thread) + bgWorkers = append(bgWorkers, worker) + continue + } - // no state change allowed == thread is shutting down - // we'll proceed to restart all other threads anyway + if !thread.state.RequestSafeStateChange(state.Restarting) { continue } + ready.Add(1) close(thread.drainChan) drainedThreads = append(drainedThreads, thread) @@ -197,12 +232,65 @@ func drainWorkerThreads() []*phpThread { ready.Done() }(thread) } - - worker.threadMutex.RUnlock() } ready.Wait() + // Wait for background workers with a grace period. + // Well-written workers check the signaling stream and stop promptly. + // Stuck workers (e.g., blocking C calls) are abandoned after the timeout; + // new threads are created on restart, and the old thread exits when the + // blocking call eventually returns. + if len(bgThreads) > 0 { + bgDone := make(chan struct{}) + go func() { + for _, thread := range bgThreads { + thread.state.WaitFor(state.Done) + } + close(bgDone) + }() + + select { + case <-bgDone: + // all stopped gracefully + case <-time.After(backgroundWorkerGracePeriod): + // Best-effort force-kill: arm PHP's max_execution_time timer on + // stuck threads. Linux ZTS: arms PHP's timer. Windows: interrupts + // I/O and alertable waits. Other platforms: no-op. + // Safe because after 5s, stuck threads are guaranteed to be in C code. + for _, thread := range bgThreads { + if !thread.state.Is(state.Done) { + C.frankenphp_force_kill_thread(C.uintptr_t(thread.threadIndex)) + } + } + globalLogger.Warn("background workers did not stop within grace period, force-killing stuck threads") + } + + // Clean up registry entries for stopped workers + stopped := make(map[*worker]struct{}, len(bgWorkers)) + for _, w := range bgWorkers { + if w.backgroundRegistry != nil && w.backgroundWorker != nil { + w.backgroundRegistry.remove(w.name, w.backgroundWorker) + } + stopped[w] = struct{}{} + } + filtered := workers[:0] + for _, w := range workers { + if _, ok := stopped[w]; !ok { + filtered = append(filtered, w) + } + } + workers = filtered + + // Reset drained background threads for restart + for _, thread := range bgThreads { + thread.drainChan = make(chan struct{}) + if mainThread.state.Is(state.Ready) { + thread.state.Set(state.Reserved) + } + } + } + return drainedThreads }