From 9024569effe150c3e178c2c8a5b7712686cc3e6a Mon Sep 17 00:00:00 2001 From: Lukas Bindreiter Date: Mon, 11 May 2026 15:59:42 +0200 Subject: [PATCH] Query telemetry go client --- .../go/workflows/ConfigureConsoleLogging.mdx | 48 +++++++++ api-reference/go/workflows/Jobs.QueryLogs.mdx | 71 ++++++++++++++ .../go/workflows/Jobs.QuerySpans.mdx | 71 ++++++++++++++ api-reference/go/workflows/WithSpan.mdx | 58 +++++++++++ api-reference/go/workflows/WithSpanResult.mdx | 60 ++++++++++++ docs.json | 5 + .../observability/integrations/axiom.mdx | 82 ++++++++++++++++ .../integrations/open-telemetry.mdx | 65 ++++++++++++ workflows/observability/introduction.mdx | 3 +- workflows/observability/logging.mdx | 98 ++++++++++++++++++- workflows/observability/query.mdx | 71 +++++++++++++- workflows/observability/tracing.mdx | 67 ++++++++++++- 12 files changed, 690 insertions(+), 9 deletions(-) create mode 100644 api-reference/go/workflows/ConfigureConsoleLogging.mdx create mode 100644 api-reference/go/workflows/Jobs.QueryLogs.mdx create mode 100644 api-reference/go/workflows/Jobs.QuerySpans.mdx create mode 100644 api-reference/go/workflows/WithSpan.mdx create mode 100644 api-reference/go/workflows/WithSpanResult.mdx diff --git a/api-reference/go/workflows/ConfigureConsoleLogging.mdx b/api-reference/go/workflows/ConfigureConsoleLogging.mdx new file mode 100644 index 0000000..f52378b --- /dev/null +++ b/api-reference/go/workflows/ConfigureConsoleLogging.mdx @@ -0,0 +1,48 @@ +--- +title: workflows.ConfigureConsoleLogging +sidebarTitle: ConfigureConsoleLogging +icon: rectangle-terminal +--- + +```go +func ConfigureConsoleLogging(level slog.Level) +``` + +Configure the default `slog` logger to write workflow logs to standard output at the given level. + +The console handler composes with the Tilebox log exporter installed by `workflows.NewClient()`, so logs can be sent to Tilebox and printed locally. Calling `ConfigureConsoleLogging` more than once does not add duplicate console handlers. + +## Parameters + + + The minimum log level to print to the console. + + +## Returns + +Nothing. + + +```go Go +import ( + "context" + "log/slog" + + "github.com/tilebox/tilebox-go/workflows/v1" +) + +func main() { + ctx := context.Background() + workflows.ConfigureConsoleLogging(slog.LevelDebug) + + client := workflows.NewClient() + runner, err := client.NewTaskRunner(ctx) + if err != nil { + slog.ErrorContext(ctx, "failed to create task runner", slog.Any("error", err)) + return + } + + runner.Run(ctx) +} +``` + diff --git a/api-reference/go/workflows/Jobs.QueryLogs.mdx b/api-reference/go/workflows/Jobs.QueryLogs.mdx new file mode 100644 index 0000000..30f9f05 --- /dev/null +++ b/api-reference/go/workflows/Jobs.QueryLogs.mdx @@ -0,0 +1,71 @@ +--- +title: Client.Jobs.QueryLogs +sidebarTitle: Jobs.QueryLogs +icon: rectangle-terminal +--- + +```go +func (*JobClient) QueryLogs( + ctx context.Context, + jobID uuid.UUID, + options ...workflows.TelemetryQueryOption, +) iter.Seq2[*workflows.LogRecord, error] +``` + +Query log records emitted while running a job. + +The logs are lazily loaded and returned as a sequence of log records. Use [Collect](/api-reference/go/workflows/Collect) to transform the sequence into a slice. + +## Parameters + + + The ID of the job to query logs for. + + + Options for querying logs. + + +## Options + + + Sort logs by time. Use `workflows.Ascending` for oldest first or `workflows.Descending` for newest first. + + + Limit the number of log records returned. + + +## Returns + +A sequence of log records. Each record includes `Time`, `Level`, `Body`, and structured attributes. + + +```go Go +import ( + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/tilebox/tilebox-go/workflows/v1" +) + +jobID := uuid.MustParse("019e07b1-916b-0630-f3ba-f1c33235d174") + +for record, err := range client.Jobs.QueryLogs( + ctx, + jobID, + workflows.WithSortDirection(workflows.Ascending), +) { + if err != nil { + slog.ErrorContext(ctx, "failed to query job logs", slog.Any("error", err)) + return + } + + fmt.Printf("%s %-5s %s\n", + record.Time.Format(time.RFC3339), + record.Level, + record.Body, + ) +} +``` + diff --git a/api-reference/go/workflows/Jobs.QuerySpans.mdx b/api-reference/go/workflows/Jobs.QuerySpans.mdx new file mode 100644 index 0000000..f511e72 --- /dev/null +++ b/api-reference/go/workflows/Jobs.QuerySpans.mdx @@ -0,0 +1,71 @@ +--- +title: Client.Jobs.QuerySpans +sidebarTitle: Jobs.QuerySpans +icon: chart-gantt +--- + +```go +func (*JobClient) QuerySpans( + ctx context.Context, + jobID uuid.UUID, + options ...workflows.TelemetryQueryOption, +) iter.Seq2[*workflows.Span, error] +``` + +Query spans emitted while running a job. + +The spans are lazily loaded and returned as a sequence of spans. Use [Collect](/api-reference/go/workflows/Collect) to transform the sequence into a slice. + +## Parameters + + + The ID of the job to query spans for. + + + Options for querying spans. + + +## Options + + + Sort spans by start time. Use `workflows.Ascending` for oldest first or `workflows.Descending` for newest first. + + + Limit the number of spans returned. + + +## Returns + +A sequence of spans. Each span includes `StartTime`, `Name`, `StatusCode`, `Attributes`, and `Duration()`. + + +```go Go +import ( + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/tilebox/tilebox-go/workflows/v1" +) + +jobID := uuid.MustParse("019e07b1-916b-0630-f3ba-f1c33235d174") + +for span, err := range client.Jobs.QuerySpans( + ctx, + jobID, + workflows.WithSortDirection(workflows.Ascending), +) { + if err != nil { + slog.ErrorContext(ctx, "failed to query job spans", slog.Any("error", err)) + return + } + + fmt.Printf("%s %-40s %s\n", + span.StartTime.Format(time.RFC3339), + span.Name, + span.Duration(), + ) +} +``` + diff --git a/api-reference/go/workflows/WithSpan.mdx b/api-reference/go/workflows/WithSpan.mdx new file mode 100644 index 0000000..b7fcbbd --- /dev/null +++ b/api-reference/go/workflows/WithSpan.mdx @@ -0,0 +1,58 @@ +--- +title: workflows.WithSpan +sidebarTitle: WithSpan +icon: chart-gantt +--- + +```go +func WithSpan( + ctx context.Context, + name string, + f func(ctx context.Context) error, +) error +``` + +Wrap a function with a [tracing span](/workflows/observability/tracing) using the current task runner's tracer. + +Use `WithSpan` inside a task `Execute` method. If the context is not a task execution context, the function runs without creating a span. + +## Parameters + + + The task execution context. + + + The name of the span. + + + The function to wrap. + + +## Returns + +The error returned by `f`, if any. + + +```go Go +import ( + "context" + "fmt" + + "github.com/tilebox/tilebox-go/workflows/v1" +) + +type ProcessScene struct{} + +func (t *ProcessScene) Execute(ctx context.Context) error { + err := workflows.WithSpan(ctx, "write-output", func(ctx context.Context) error { + // Write output here. + return nil + }) + if err != nil { + return fmt.Errorf("failed to write output: %w", err) + } + + return nil +} +``` + diff --git a/api-reference/go/workflows/WithSpanResult.mdx b/api-reference/go/workflows/WithSpanResult.mdx new file mode 100644 index 0000000..5dc8a44 --- /dev/null +++ b/api-reference/go/workflows/WithSpanResult.mdx @@ -0,0 +1,60 @@ +--- +title: workflows.WithSpanResult +sidebarTitle: WithSpanResult +icon: chart-gantt +--- + +```go +func WithSpanResult[Result any]( + ctx context.Context, + name string, + f func(ctx context.Context) (Result, error), +) (Result, error) +``` + +Wrap a function with a [tracing span](/workflows/observability/tracing) using the current task runner's tracer and return the function result. + +Use `WithSpanResult` inside a task `Execute` method. If the context is not a task execution context, the function runs without creating a span. + +## Parameters + + + The task execution context. + + + The name of the span. + + + The function to wrap. + + +## Returns + +The result and error returned by `f`. + + +```go Go +import ( + "context" + "fmt" + "log/slog" + + "github.com/tilebox/tilebox-go/workflows/v1" +) + +type ProcessScene struct{} + +func (t *ProcessScene) Execute(ctx context.Context) error { + pixels, err := workflows.WithSpanResult(ctx, "compute-index", func(ctx context.Context) (int, error) { + // Compute an index here. + return 42, nil + }) + if err != nil { + return fmt.Errorf("failed to compute index: %w", err) + } + + slog.InfoContext(ctx, "index computed", slog.Int("pixels", pixels)) + return nil +} +``` + diff --git a/docs.json b/docs.json index b1cb0fb..ecbe42a 100644 --- a/docs.json +++ b/docs.json @@ -254,6 +254,9 @@ "api-reference/go/workflows/GetCurrentCluster", "api-reference/go/workflows/SubmitSubtask", "api-reference/go/workflows/SubmitSubtasks", + "api-reference/go/workflows/ConfigureConsoleLogging", + "api-reference/go/workflows/WithSpan", + "api-reference/go/workflows/WithSpanResult", "api-reference/go/workflows/WithTaskSpan", "api-reference/go/workflows/WithTaskSpanResult", "api-reference/go/workflows/NewTaskRunner", @@ -268,6 +271,8 @@ "api-reference/go/workflows/Jobs.Get", "api-reference/go/workflows/Jobs.Retry", "api-reference/go/workflows/Jobs.Cancel", + "api-reference/go/workflows/Jobs.QueryLogs", + "api-reference/go/workflows/Jobs.QuerySpans", "api-reference/go/workflows/Jobs.Query", "api-reference/go/workflows/Collect" ] diff --git a/workflows/observability/integrations/axiom.mdx b/workflows/observability/integrations/axiom.mdx index 667d5fd..eea1e70 100644 --- a/workflows/observability/integrations/axiom.mdx +++ b/workflows/observability/integrations/axiom.mdx @@ -16,6 +16,7 @@ Built-in Tilebox Console observability does not require Axiom. Axiom export is o Create Axiom datasets for logs and traces and an API key with ingest permissions. Then configure export when the runner process starts. + ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.logging import configure_otel_logging_axiom @@ -38,6 +39,66 @@ client = Client(name="sentinel-2-runner") runner = client.runner(tasks=[ProcessScene]) runner.run_forever() ``` +```go Go +package main + +import ( + "context" + "log/slog" + + "github.com/tilebox/tilebox-go/observability" + "github.com/tilebox/tilebox-go/observability/logger" + "github.com/tilebox/tilebox-go/observability/tracer" + "github.com/tilebox/tilebox-go/workflows/v1" + "go.opentelemetry.io/otel" +) + +type ProcessScene struct{} + +func (t *ProcessScene) Execute(ctx context.Context) error { + slog.InfoContext(ctx, "processing scene") + return nil +} + +func main() { + ctx := context.Background() + service := &observability.Service{Name: "sentinel-2-runner"} + apiKey := "" + + traceProvider, shutdownTracer, err := tracer.NewAxiomProvider(ctx, service, "workflow-traces", apiKey) + if err != nil { + slog.ErrorContext(ctx, "failed to configure Axiom tracing", slog.Any("error", err)) + return + } + defer shutdownTracer(ctx) + otel.SetTracerProvider(traceProvider) + + logHandler, shutdownLogger, err := logger.NewAxiomHandler(ctx, service, "workflow-logs", apiKey, + logger.WithLevel(slog.LevelInfo), + ) + if err != nil { + slog.ErrorContext(ctx, "failed to configure Axiom logging", slog.Any("error", err)) + return + } + defer shutdownLogger(ctx) + slog.SetDefault(logger.New(logHandler)) + + client := workflows.NewClient() + runner, err := client.NewTaskRunner(ctx) + if err != nil { + slog.ErrorContext(ctx, "failed to create task runner", slog.Any("error", err)) + return + } + + if err := runner.RegisterTasks(&ProcessScene{}); err != nil { + slog.ErrorContext(ctx, "failed to register tasks", slog.Any("error", err)) + return + } + + runner.Run(ctx) +} +``` + ## Environment variables @@ -49,10 +110,31 @@ You can omit credentials from code by setting environment variables: | `AXIOM_LOGS_DATASET` | `configure_otel_logging_axiom()` | | `AXIOM_TRACES_DATASET` | `configure_otel_tracing_axiom()` | + ```python Python configure_otel_tracing_axiom(service="sentinel-2-runner") configure_otel_logging_axiom(service="sentinel-2-runner") ``` +```go Go +traceProvider, shutdownTracer, err := tracer.NewAxiomProviderFromEnv(ctx, service) +if err != nil { + slog.ErrorContext(ctx, "failed to configure Axiom tracing", slog.Any("error", err)) + return +} +defer shutdownTracer(ctx) +otel.SetTracerProvider(traceProvider) + +logHandler, shutdownLogger, err := logger.NewAxiomHandlerFromEnv(ctx, service, + logger.WithLevel(slog.LevelInfo), +) +if err != nil { + slog.ErrorContext(ctx, "failed to configure Axiom logging", slog.Any("error", err)) + return +} +defer shutdownLogger(ctx) +slog.SetDefault(logger.New(logHandler)) +``` + ## Existing Axiom screenshots diff --git a/workflows/observability/integrations/open-telemetry.mdx b/workflows/observability/integrations/open-telemetry.mdx index 4b50da9..4fc3ab5 100644 --- a/workflows/observability/integrations/open-telemetry.mdx +++ b/workflows/observability/integrations/open-telemetry.mdx @@ -10,6 +10,7 @@ Tilebox uses OpenTelemetry data models for workflow telemetry. Built-in Tilebox Call the configuration functions when the runner process starts, before creating the client or runner. + ```python Python from tilebox.workflows import Client from tilebox.workflows.observability.logging import configure_otel_logging @@ -32,6 +33,70 @@ client = Client(name="sentinel-2-runner") runner = client.runner(tasks=[ProcessScene]) runner.run_forever() ``` +```go Go +package main + +import ( + "context" + "log/slog" + + "github.com/tilebox/tilebox-go/observability" + "github.com/tilebox/tilebox-go/observability/logger" + "github.com/tilebox/tilebox-go/observability/tracer" + "github.com/tilebox/tilebox-go/workflows/v1" + "go.opentelemetry.io/otel" +) + +type ProcessScene struct{} + +func (t *ProcessScene) Execute(ctx context.Context) error { + slog.InfoContext(ctx, "processing scene") + return nil +} + +func main() { + ctx := context.Background() + service := &observability.Service{Name: "sentinel-2-runner"} + + traceProvider, shutdownTracer, err := tracer.NewOtelProvider(ctx, service, + tracer.WithEndpointURL("http://localhost:4318/v1/traces"), + tracer.WithHeaders(map[string]string{"Authorization": "Bearer "}), + ) + if err != nil { + slog.ErrorContext(ctx, "failed to configure tracing", slog.Any("error", err)) + return + } + defer shutdownTracer(ctx) + otel.SetTracerProvider(traceProvider) + + logHandler, shutdownLogger, err := logger.NewOtelHandler(ctx, service, + logger.WithEndpointURL("http://localhost:4318/v1/logs"), + logger.WithHeaders(map[string]string{"Authorization": "Bearer "}), + logger.WithLevel(slog.LevelInfo), + ) + if err != nil { + slog.ErrorContext(ctx, "failed to configure logging", slog.Any("error", err)) + return + } + defer shutdownLogger(ctx) + slog.SetDefault(logger.New(logHandler)) + + client := workflows.NewClient() + runner, err := client.NewTaskRunner(ctx) + if err != nil { + slog.ErrorContext(ctx, "failed to create task runner", slog.Any("error", err)) + return + } + + if err := runner.RegisterTasks(&ProcessScene{}); err != nil { + slog.ErrorContext(ctx, "failed to register tasks", slog.Any("error", err)) + return + } + + runner.Run(ctx) +} +``` + If the endpoint does not include `/v1/traces` or `/v1/logs`, the Python SDK adds the correct path automatically. diff --git a/workflows/observability/introduction.mdx b/workflows/observability/introduction.mdx index dbe5b3c..a863b24 100644 --- a/workflows/observability/introduction.mdx +++ b/workflows/observability/introduction.mdx @@ -87,7 +87,6 @@ import ( "fmt" "log/slog" - "github.com/tilebox/tilebox-go" "github.com/tilebox/tilebox-go/workflows/v1" "github.com/tilebox/tilebox-go/workflows/v1/subtask" ) @@ -99,7 +98,7 @@ type ProcessScene struct { func (t *ProcessScene) Execute(ctx context.Context) error { slog.InfoContext(ctx, "processing scene", slog.String("scene_id", t.SceneID)) - return tilebox.WithSpan(ctx, "plan-subtasks", func(ctx context.Context) error { + return workflows.WithSpan(ctx, "plan-subtasks", func(ctx context.Context) error { thumbnail, err := workflows.SubmitSubtask(ctx, &BuildThumbnail{SceneID: t.SceneID}) if err != nil { return fmt.Errorf("failed to submit thumbnail subtask: %w", err) diff --git a/workflows/observability/logging.mdx b/workflows/observability/logging.mdx index 89fdf1a..afac772 100644 --- a/workflows/observability/logging.mdx +++ b/workflows/observability/logging.mdx @@ -65,6 +65,7 @@ Logs are also added as events on the active trace span, so a trace view can show Built-in Tilebox export does not require configuration. For local development, add a console handler to print Tilebox workflow logs to standard output. + ```python Python import logging @@ -79,13 +80,51 @@ client = Client() runner = client.runner(tasks=[ProcessScene]) runner.run_forever() ``` +```go Go +package main + +import ( + "context" + "log/slog" + + "github.com/tilebox/tilebox-go/workflows/v1" +) + +type ProcessScene struct{} + +func (t *ProcessScene) Execute(ctx context.Context) error { + slog.InfoContext(ctx, "processing scene") + return nil +} + +func main() { + ctx := context.Background() + workflows.ConfigureConsoleLogging(slog.LevelDebug) + + client := workflows.NewClient() + runner, err := client.NewTaskRunner(ctx) + if err != nil { + slog.ErrorContext(ctx, "failed to create task runner", slog.Any("error", err)) + return + } + + if err := runner.RegisterTasks(&ProcessScene{}); err != nil { + slog.ErrorContext(ctx, "failed to register tasks", slog.Any("error", err)) + return + } + + runner.Run(ctx) +} +``` + -`configure_console_logging()` is process-wide for Tilebox workflow loggers. Use it for local runs and debugging distributed runners. +`configure_console_logging()` and `workflows.ConfigureConsoleLogging()` are process-wide for Tilebox workflow loggers. Use them for local runs and debugging distributed runners. ## Configure the client log level Use `Client.configure_logging()` to choose which task and runner logs a client exports to Tilebox. + ```python Python import logging @@ -96,13 +135,30 @@ client = Client(name="sentinel-2-runner") # Export task logs at DEBUG and internal runner logs at INFO. client.configure_logging(level=logging.DEBUG, runner_level=logging.INFO) ``` +```go Go +package main + +import ( + "log/slog" -The `level` argument applies to logs emitted with `context.logger`. The optional `runner_level` argument applies to internal task runner logs. If `runner_level` is omitted, it uses the same value as `level`. + "github.com/tilebox/tilebox-go/workflows/v1" +) + +func main() { + workflows.ConfigureConsoleLogging(slog.LevelDebug) + client := workflows.NewClient() + _ = client +} +``` + + +The Python `level` argument applies to logs emitted with `context.logger`. The optional `runner_level` argument applies to internal task runner logs. If `runner_level` is omitted, it uses the same value as `level`. In Go, `workflows.ConfigureConsoleLogging()` sets the local console log level, and `workflows.NewClient()` configures Tilebox workflow log export. ## Query logs -You can retrieve logs for a job through the jobs client and convert the result to a pandas DataFrame. +You can retrieve logs for a job through the jobs client. Python results can also be converted to a pandas DataFrame. + ```python Python from tilebox.workflows import Client @@ -115,6 +171,42 @@ for record in logs: df = logs.to_pandas() ``` +```go Go +package main + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/tilebox/tilebox-go/workflows/v1" +) + +func main() { + ctx := context.Background() + client := workflows.NewClient() + jobID := uuid.MustParse("019e07b1-916b-0630-f3ba-f1c33235d174") + + logs, err := workflows.Collect( + client.Jobs.QueryLogs(ctx, jobID, workflows.WithSortDirection(workflows.Ascending)), + ) + if err != nil { + slog.ErrorContext(ctx, "failed to query job logs", slog.Any("error", err)) + return + } + + for _, record := range logs { + fmt.Printf("%s %-5s %s\n", + record.Time.Format(time.RFC3339Nano), + record.Level, + record.Body, + ) + } +} +``` + See [Query telemetry](/workflows/observability/query) for the log and span query APIs. diff --git a/workflows/observability/query.mdx b/workflows/observability/query.mdx index 4a7a337..b7f6802 100644 --- a/workflows/observability/query.mdx +++ b/workflows/observability/query.mdx @@ -1,6 +1,6 @@ --- title: Query telemetry -description: Query workflow logs and spans for a job from Python and convert the results to pandas DataFrames. +description: Query workflow logs and spans for a job from Python or Go. icon: magnifying-glass-chart --- @@ -10,6 +10,7 @@ Tilebox stores logs and spans for each workflow job. Use the jobs client to quer `query_logs()` returns a `LogRecords` list. Pagination is handled automatically. + ```python Python from tilebox.workflows import Client @@ -22,6 +23,40 @@ for record in logs: print(record.time, record.severity_text, record.body) print(record.attributes) ``` +```go Go +package main + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/tilebox/tilebox-go/workflows/v1" +) + +func main() { + ctx := context.Background() + client := workflows.NewClient() + jobID := uuid.MustParse("019e07b1-916b-0630-f3ba-f1c33235d174") + + for record, err := range client.Jobs.QueryLogs( + ctx, + jobID, + workflows.WithSortDirection(workflows.Ascending), + ) { + if err != nil { + slog.ErrorContext(ctx, "failed to query job logs", slog.Any("error", err)) + return + } + + fmt.Println(record.Time.Format(time.RFC3339Nano), record.Level, record.Body) + fmt.Println(record.Attributes) + } +} +``` + Each log record includes: @@ -50,6 +85,7 @@ logs_df[["time", "severity_text", "body"]] `query_spans()` returns a `Spans` list. Pagination is handled automatically. + ```python Python spans = client.jobs().query_spans(job.id) @@ -57,6 +93,39 @@ for span in spans: print(span.name, span.status_code, span.duration) print(span.attributes) ``` +```go Go +package main + +import ( + "context" + "fmt" + "log/slog" + + "github.com/google/uuid" + "github.com/tilebox/tilebox-go/workflows/v1" +) + +func main() { + ctx := context.Background() + client := workflows.NewClient() + jobID := uuid.MustParse("019e07b1-916b-0630-f3ba-f1c33235d174") + + for span, err := range client.Jobs.QuerySpans( + ctx, + jobID, + workflows.WithSortDirection(workflows.Ascending), + ) { + if err != nil { + slog.ErrorContext(ctx, "failed to query job spans", slog.Any("error", err)) + return + } + + fmt.Println(span.Name, span.StatusCode, span.Duration()) + fmt.Println(span.Attributes) + } +} +``` + Each span includes: diff --git a/workflows/observability/tracing.mdx b/workflows/observability/tracing.mdx index 539ed68..7429ae4 100644 --- a/workflows/observability/tracing.mdx +++ b/workflows/observability/tracing.mdx @@ -39,13 +39,13 @@ package tasks import ( "context" - "github.com/tilebox/tilebox-go" + "github.com/tilebox/tilebox-go/workflows/v1" ) type ProcessScene struct{} func (t *ProcessScene) Execute(ctx context.Context) error { - return tilebox.WithSpan(ctx, "compute-index", func(ctx context.Context) error { + return workflows.WithSpan(ctx, "compute-index", func(ctx context.Context) error { // perform expensive computation return nil }) @@ -61,6 +61,7 @@ If a task raises an exception, Tilebox records the exception on the task span an For finer-grained error reporting, record errors on your custom spans before re-raising them. + ```python Python class ProcessScene(Task): scene_id: str @@ -74,11 +75,38 @@ class ProcessScene(Task): span.record_exception(error) raise ``` +```go Go +package tasks + +import ( + "context" + "fmt" + + "github.com/tilebox/tilebox-go/workflows/v1" +) + +type ProcessScene struct{} + +func (t *ProcessScene) Execute(ctx context.Context) error { + return workflows.WithSpan(ctx, "publish-output", func(ctx context.Context) error { + if err := publishOutput(); err != nil { + return fmt.Errorf("failed to publish output: %w", err) + } + return nil + }) +} + +func publishOutput() error { + return nil +} +``` + ## Query spans -You can retrieve spans for a job through the jobs client and convert the result to a pandas DataFrame. +You can retrieve spans for a job through the jobs client. Python results can also be converted to a pandas DataFrame. + ```python Python from tilebox.workflows import Client @@ -91,6 +119,39 @@ for span in spans: df = spans.to_pandas() ``` +```go Go +package main + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/tilebox/tilebox-go/workflows/v1" +) + +func main() { + ctx := context.Background() + client := workflows.NewClient() + jobID := uuid.MustParse("019e07b1-916b-0630-f3ba-f1c33235d174") + + for span, err := range client.Jobs.QuerySpans(ctx, jobID) { + if err != nil { + slog.ErrorContext(ctx, "failed to query job spans", slog.Any("error", err)) + return + } + + fmt.Printf("%s %-40s %s\n", + span.StartTime.Format(time.RFC3339Nano), + span.Name, + span.Duration(), + ) + } +} +``` + See [Query telemetry](/workflows/observability/query) for the log and span query APIs.