-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
99 lines (85 loc) · 2.6 KB
/
main.go
File metadata and controls
99 lines (85 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/withObsrvr/flowctl-sdk/pkg/processor"
flowctlv1 "github.com/withObsrvr/flow-proto/go/gen/flowctl/v1"
)
func main() {
// Create processor with configuration
config := processor.DefaultConfig()
config.ID = "example-processor"
config.Name = "Example Processor"
config.Description = "A simple example processor"
config.Version = "1.0.0"
config.Endpoint = ":50052"
// Enable flowctl integration if endpoint is provided
flowctlEndpoint := getEnv("FLOWCTL_ENDPOINT", "")
if flowctlEndpoint != "" {
config.FlowctlConfig.Enabled = true
config.FlowctlConfig.Endpoint = flowctlEndpoint
config.FlowctlConfig.ServiceID = getEnv("SERVICE_ID", "example-processor-1")
}
proc, err := processor.New(config)
if err != nil {
log.Fatalf("Failed to create processor: %v", err)
}
// Register processing handler
err = proc.OnProcess(
// Handler function
func(ctx context.Context, event *flowctlv1.Event) (*flowctlv1.Event, error) {
// Process the event (in this case, just append to the payload)
processedPayload := append(event.Payload, []byte(" - processed")...)
// Create output event
outputEvent := &flowctlv1.Event{
Id: fmt.Sprintf("%s-processed", event.Id),
Type: "example.processed.event",
Payload: processedPayload,
Metadata: make(map[string]string),
SourceComponentId: config.ID,
ContentType: event.ContentType,
}
// Copy original metadata and add processing metadata
for k, v := range event.Metadata {
outputEvent.Metadata[k] = v
}
outputEvent.Metadata["processor_id"] = config.ID
outputEvent.Metadata["original_type"] = event.Type
return outputEvent, nil
},
// Input types
[]string{"example.event"},
// Output types
[]string{"example.processed.event"},
)
if err != nil {
log.Fatalf("Failed to register handler: %v", err)
}
// Start the processor
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := proc.Start(ctx); err != nil {
log.Fatalf("Failed to start processor: %v", err)
}
// Wait for interrupt signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
// Stop the processor gracefully
fmt.Println("Shutting down processor...")
if err := proc.Stop(); err != nil {
log.Fatalf("Failed to stop processor: %v", err)
}
}
// getEnv gets an environment variable or returns a default value
func getEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}