Evaluating AxonFlow in production? We're opening limited Design Partner slots.
Free 30-minute architecture and incident-readiness review, priority issue triage, roadmap input, and early feature access.
Apply here or email design-partners@getaxonflow.com.
No commitment required. We reply within 48 hours.
AxonFlow Feedback Week (Feb 5–12, 2026) — We're shipping 3 improvements from user feedback.
Share feedback or email hello@getaxonflow.com for private feedback.
Enterprise-grade Go SDK for AxonFlow AI governance platform. Add invisible AI governance to your applications with production-ready features including retry logic, caching, fail-open strategy, and debug mode.
This SDK is a client library for interacting with a running AxonFlow control plane. It is used from application or agent code to send execution context, policies, and requests at runtime.
A deployed AxonFlow platform (self-hosted or cloud) is required for end-to-end AI governance. SDKs alone are not sufficient—the platform and SDKs are designed to be used together.
If you're new to AxonFlow, this short video shows how the control plane and SDKs work together in a real production setup:
go get github.com/getaxonflow/axonflow-sdk-go/v3package main
import (
"fmt"
"log"
"os"
"github.com/getaxonflow/axonflow-sdk-go/v3"
)
func main() {
// Simple initialization with OAuth2 credentials
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "https://staging-eu.getaxonflow.com",
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
})
// Execute a governed query
resp, err := client.ProxyLLMCall(
"user-token",
"What is the capital of France?",
"chat",
map[string]interface{}{},
)
if err != nil {
log.Fatalf("Query failed: %v", err)
}
if resp.Blocked {
log.Printf("Request blocked: %s", resp.BlockReason)
return
}
fmt.Printf("Result: %s\n", resp.Data)
}import (
"time"
"os"
"github.com/getaxonflow/axonflow-sdk-go/v3"
)
// Full configuration with all features
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "https://staging-eu.getaxonflow.com",
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
Mode: "production", // or "sandbox"
Debug: true, // Enable debug logging
Timeout: 60 * time.Second,
// Retry configuration (exponential backoff)
Retry: axonflow.RetryConfig{
Enabled: true,
MaxAttempts: 3,
InitialDelay: 1 * time.Second,
},
// Cache configuration (in-memory with TTL)
Cache: axonflow.CacheConfig{
Enabled: true,
TTL: 60 * time.Second,
},
})Connect to a self-hosted AxonFlow instance running via docker-compose:
package main
import (
"fmt"
"log"
"github.com/getaxonflow/axonflow-sdk-go/v3"
)
func main() {
// Self-hosted (localhost) - no license key needed!
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "http://localhost:8081",
// That's it - no authentication required for localhost
})
// Use normally - same features as production
resp, err := client.ProxyLLMCall(
"user-token",
"Test with self-hosted AxonFlow",
"chat",
map[string]interface{}{},
)
if err != nil {
log.Fatalf("Query failed: %v", err)
}
fmt.Printf("Result: %s\n", resp.Data)
}Self-hosted deployment:
# Clone and start AxonFlow
git clone https://github.com/getaxonflow/axonflow.git
cd axonflow
export OPENAI_API_KEY=sk-your-key-here
docker-compose up
# Go SDK connects to http://localhost:8081 - no license needed!Features:
- âś… Full AxonFlow features without license
- âś… Perfect for local development and testing
- âś… Same API as production
- âś… Automatically detects localhost and skips authentication
// Quick sandbox client for testing
client := axonflow.Sandbox("demo-client", "demo-secret")
resp, err := client.ProxyLLMCall(
"",
"Test query with sensitive data: SSN 123-45-6789",
"chat",
map[string]interface{}{},
)
// In sandbox, this will be blocked/redacted
if resp.Blocked {
fmt.Printf("Blocked: %s\n", resp.BlockReason)
}Automatic retry on transient failures with exponential backoff:
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "https://staging-eu.getaxonflow.com",
ClientID: "your-client-id",
ClientSecret: "your-secret",
Retry: axonflow.RetryConfig{
Enabled: true,
MaxAttempts: 3, // Retry up to 3 times
InitialDelay: 1 * time.Second, // 1s, 2s, 4s backoff
},
})
// Automatically retries on 5xx errors or network failures
resp, err := client.ProxyLLMCall(...)Reduce latency and load with intelligent caching:
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "https://staging-eu.getaxonflow.com",
ClientID: "your-client-id",
ClientSecret: "your-secret",
Cache: axonflow.CacheConfig{
Enabled: true,
TTL: 60 * time.Second, // Cache for 60 seconds
},
})
// First call: hits AxonFlow
resp1, _ := client.ProxyLLMCall("token", "query", "chat", nil)
// Second call (within 60s): served from cache
resp2, _ := client.ProxyLLMCall("token", "query", "chat", nil)Never block your users if AxonFlow is unavailable:
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "https://staging-eu.getaxonflow.com",
ClientID: "your-client-id",
ClientSecret: "your-secret",
Mode: "production", // Fail-open in production
Debug: true,
})
// If AxonFlow is unavailable, request proceeds with warning
resp, err := client.ProxyLLMCall(...)
// err == nil, resp.Success == true, resp.Error contains warningWrap your LLM clients with automatic AxonFlow governance using the interceptors package:
import (
"context"
"github.com/sashabaranov/go-openai"
"github.com/getaxonflow/axonflow-sdk-go/v3"
"github.com/getaxonflow/axonflow-sdk-go/v3/interceptors"
)
// Initialize AxonFlow client
axonflowClient := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "https://staging-eu.getaxonflow.com",
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
})
// Create an adapter for the OpenAI client
openaiClient := openai.NewClient(os.Getenv("OPENAI_API_KEY"))
// Use the function wrapper for direct usage
wrappedFn := interceptors.WrapOpenAIFunc(
func(ctx context.Context, req interceptors.ChatCompletionRequest) (interceptors.ChatCompletionResponse, error) {
// Convert to go-openai types and call
goReq := openai.ChatCompletionRequest{
Model: req.Model,
Messages: convertMessages(req.Messages),
}
resp, err := openaiClient.CreateChatCompletion(ctx, goReq)
if err != nil {
return interceptors.ChatCompletionResponse{}, err
}
return convertResponse(resp), nil
},
axonflowClient,
"user-token",
)
// Use wrapped function - governance happens automatically
resp, err := wrappedFn(ctx, interceptors.ChatCompletionRequest{
Model: "gpt-4",
Messages: []interceptors.ChatMessage{
{Role: "user", Content: "Hello, world!"},
},
})
if err != nil {
if interceptors.IsPolicyViolationError(err) {
pve, _ := interceptors.GetPolicyViolation(err)
log.Printf("Blocked: %s (policies: %v)", pve.BlockReason, pve.Policies)
}
}import (
"context"
"github.com/getaxonflow/axonflow-sdk-go/v3"
"github.com/getaxonflow/axonflow-sdk-go/v3/interceptors"
)
// Create Anthropic interceptor
wrappedFn := interceptors.WrapAnthropicFunc(
yourAnthropicCreateFn,
axonflowClient,
"user-token",
)
// Use wrapped function
resp, err := wrappedFn(ctx, interceptors.AnthropicMessageRequest{
Model: "claude-3-sonnet-20240229",
MaxTokens: 1024,
Messages: []interceptors.AnthropicMessage{
interceptors.CreateUserMessage("Hello, Claude!"),
},
})For more flexibility, implement the OpenAIChatCompleter or AnthropicMessageCreator interfaces:
// Implement the interface
type MyOpenAIClient struct {
// your fields
}
func (c *MyOpenAIClient) CreateChatCompletion(ctx context.Context, req interceptors.ChatCompletionRequest) (interceptors.ChatCompletionResponse, error) {
// your implementation
}
// Wrap the client
wrapped := interceptors.WrapOpenAIClient(&MyOpenAIClient{}, axonflowClient, "user-token")
// Use wrapped client
resp, err := wrapped.CreateChatCompletion(ctx, req)Integrate with external data sources using AxonFlow's MCP (Model Context Protocol) connectors:
connectors, err := client.ListConnectors()
if err != nil {
log.Fatalf("Failed to list connectors: %v", err)
}
for _, conn := range connectors {
fmt.Printf("Connector: %s (%s)\n", conn.Name, conn.Type)
fmt.Printf(" Description: %s\n", conn.Description)
fmt.Printf(" Installed: %v\n", conn.Installed)
}err := client.InstallConnector(axonflow.ConnectorInstallRequest{
ConnectorID: "amadeus-travel",
Name: "amadeus-prod",
TenantID: "your-tenant-id",
Options: map[string]interface{}{
"environment": "production",
},
Credentials: map[string]string{
"api_key": "your-amadeus-key",
"api_secret": "your-amadeus-secret",
},
})
if err != nil {
log.Fatalf("Failed to install connector: %v", err)
}
fmt.Println("Connector installed successfully!")// Query the Amadeus connector for flight information
resp, err := client.QueryConnector(
"user-session-token", // User token for authentication and audit trail
"amadeus-prod",
"Find flights from Paris to Amsterdam on Dec 15",
map[string]interface{}{
"origin": "CDG",
"destination": "AMS",
"date": "2025-12-15",
},
)
if err != nil {
log.Fatalf("Connector query failed: %v", err)
}
if resp.Success {
fmt.Printf("Flight data: %v\n", resp.Data)
} else {
fmt.Printf("Query failed: %s\n", resp.Error)
}AxonFlow now supports 7 production-ready connectors:
Query Salesforce data using SOQL:
// Query Salesforce contacts
resp, err := client.QueryConnector(
"user-session-token", // User token for authentication and audit trail
"salesforce-crm",
"Find all contacts for account Acme Corp",
map[string]interface{}{
"soql": "SELECT Id, Name, Email, Phone FROM Contact WHERE AccountId = '001xx000003DHP0'",
},
)
if err != nil {
log.Fatalf("Salesforce query failed: %v", err)
}
fmt.Printf("Found %d contacts\n", len(resp.Data.([]interface{})))Authentication: OAuth 2.0 password grant (configured in AxonFlow dashboard)
Execute analytics queries on Snowflake:
// Query Snowflake for sales analytics
resp, err := client.QueryConnector(
"user-session-token", // User token for authentication and audit trail
"snowflake-warehouse",
"Get monthly revenue for last 12 months",
map[string]interface{}{
"sql": `SELECT DATE_TRUNC('month', order_date) as month,
COUNT(*) as orders,
SUM(amount) as revenue
FROM orders
WHERE order_date >= DATEADD(month, -12, CURRENT_DATE())
GROUP BY month
ORDER BY month`,
},
)
if err != nil {
log.Fatalf("Snowflake query failed: %v", err)
}
fmt.Printf("Revenue data: %v\n", resp.Data)Authentication: Key-pair JWT authentication (configured in AxonFlow dashboard)
Send notifications and alerts to Slack channels:
// Send Slack notification
resp, err := client.QueryConnector(
"user-session-token", // User token for authentication and audit trail
"slack-workspace",
"Send deployment notification to #engineering channel",
map[string]interface{}{
"channel": "#engineering",
"text": "🚀 Deployment complete! All systems operational.",
"blocks": []map[string]interface{}{
{
"type": "section",
"text": map[string]string{
"type": "mrkdwn",
"text": "*Deployment Status*\nâś… All systems operational",
},
},
},
},
)
if err != nil {
log.Fatalf("Slack notification failed: %v", err)
}
fmt.Printf("Message sent: %v\n", resp.Success)Authentication: OAuth 2.0 bot token (configured in AxonFlow dashboard)
| Connector | Type | Use Case |
|---|---|---|
| PostgreSQL | Database | Relational data access |
| Redis | Cache | Distributed rate limiting |
| Slack | Communication | Team notifications |
| Salesforce | CRM | Customer data, SOQL queries |
| Snowflake | Data Warehouse | Analytics, reporting |
| Amadeus GDS | Travel | Flight/hotel booking |
| Cassandra | NoSQL | Distributed database |
For complete connector documentation, see https://docs.getaxonflow.com/mcp
Prevent large-scale data extraction with automatic row and byte limits:
// Query with exfiltration limits (default: 10K rows, 10MB)
response, err := client.QueryConnector("postgres", "SELECT * FROM customers", nil)
if err != nil {
log.Fatal(err)
}
// Check exfiltration info
if response.PolicyInfo.ExfiltrationCheck.Exceeded {
log.Printf("Data limit exceeded: %s", response.PolicyInfo.ExfiltrationCheck.LimitType)
// LimitType: "rows" or "bytes"
}
// Configure limits via environment:
// MCP_MAX_ROWS_PER_QUERY=1000
// MCP_MAX_BYTES_PER_QUERY=5242880Enable Orchestrator-based policy evaluation for rate limiting, budget controls, and more:
// Response includes dynamic policy info when enabled
response, err := client.QueryConnector("postgres", "SELECT id FROM users", nil)
if err != nil {
log.Fatal(err)
}
// Check dynamic policy evaluation results
dynamicInfo := response.PolicyInfo.DynamicPolicyInfo
if dynamicInfo.OrchestratorReachable {
log.Printf("Policies evaluated: %d", dynamicInfo.PoliciesEvaluated)
for _, policy := range dynamicInfo.MatchedPolicies {
log.Printf(" %s: %s", policy.PolicyName, policy.Action)
}
}
// Enable via environment:
// MCP_DYNAMIC_POLICIES_ENABLED=trueGenerate and execute complex multi-step plans using AI agent orchestration:
// Generate a travel planning workflow
plan, err := client.GeneratePlan(
"Plan a 3-day trip to Paris with moderate budget",
"travel", // Domain hint (optional)
)
if err != nil {
log.Fatalf("Plan generation failed: %v", err)
}
fmt.Printf("Generated plan %s with %d steps\n", plan.PlanID, len(plan.Steps))
fmt.Printf("Complexity: %d, Parallel: %v\n", plan.Complexity, plan.Parallel)
for i, step := range plan.Steps {
fmt.Printf(" Step %d: %s (%s)\n", i+1, step.Name, step.Type)
fmt.Printf(" Description: %s\n", step.Description)
fmt.Printf(" Agent: %s\n", step.Agent)
}// Execute the generated plan
execResp, err := client.ExecutePlan(plan.PlanID)
if err != nil {
log.Fatalf("Plan execution failed: %v", err)
}
fmt.Printf("Plan Status: %s\n", execResp.Status)
fmt.Printf("Duration: %s\n", execResp.Duration)
if execResp.Status == "completed" {
fmt.Printf("Result:\n%s\n", execResp.Result)
} else if execResp.Status == "failed" {
fmt.Printf("Error: %s\n", execResp.Error)
}// For long-running plans, check status periodically
status, err := client.GetPlanStatus(plan.PlanID)
if err != nil {
log.Fatalf("Failed to get plan status: %v", err)
}
fmt.Printf("Plan Status: %s\n", status.Status)Check if AxonFlow Agent is available:
err := client.HealthCheck()
if err != nil {
log.Printf("AxonFlow Agent is unhealthy: %v", err)
} else {
log.Println("AxonFlow Agent is healthy")
}For applications running in AWS VPC, use the private endpoint for lower latency:
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "https://vpc-private-endpoint.getaxonflow.com:8443", // VPC private endpoint
ClientID: "your-client-id",
ClientSecret: "your-secret",
Mode: "production",
})Network Latency Characteristics:
- Public endpoint: Higher latency (internet routing overhead)
- VPC private endpoint: Lower latency (intra-VPC routing)
resp, err := client.ProxyLLMCall(...)
if err != nil {
// Network errors, timeouts, or AxonFlow unavailability
log.Printf("Request failed: %v", err)
return
}
if resp.Blocked {
// Policy violation - request blocked by governance rules
log.Printf("Request blocked: %s", resp.BlockReason)
log.Printf("Policies evaluated: %v", resp.PolicyInfo.PoliciesEvaluated)
return
}
if !resp.Success {
// Request succeeded but returned error from downstream
log.Printf("Query failed: %s", resp.Error)
return
}
// Success - use resp.Data or resp.Result
fmt.Printf("Result: %v\n", resp.Data)-
Environment Variables: Never hardcode credentials
import "os" client := axonflow.NewClient(axonflow.AxonFlowConfig{ Endpoint: os.Getenv("AXONFLOW_AGENT_URL"), ClientID: os.Getenv("AXONFLOW_CLIENT_ID"), ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"), })
-
Fail-Open in Production: Use
Mode: "production"to fail-open if AxonFlow is unavailable -
Enable Caching: Reduce latency for repeated queries
-
Enable Retry: Handle transient failures automatically
-
Debug in Development: Use
Debug: trueduring development, disable in production -
Health Checks: Monitor AxonFlow availability with periodic health checks
-
Secure Storage: Store credentials in environment variables or secrets management systems (AWS Secrets Manager, HashiCorp Vault, etc.)
| Field | Type | Default | Description |
|---|---|---|---|
Endpoint |
string |
Required | AxonFlow Agent endpoint URL |
ClientID |
string |
Required | OAuth2 client ID for authentication |
ClientSecret |
string |
Required | OAuth2 client secret for authentication |
Mode |
string |
"production" |
"production" or "sandbox" |
Debug |
bool |
false |
Enable debug logging |
Timeout |
time.Duration |
60s |
Request timeout |
Retry.Enabled |
bool |
true |
Enable retry logic |
Retry.MaxAttempts |
int |
3 |
Maximum retry attempts |
Retry.InitialDelay |
time.Duration |
1s |
Initial retry delay (exponential backoff) |
Cache.Enabled |
bool |
true |
Enable caching |
Cache.TTL |
time.Duration |
60s |
Cache time-to-live |
Note: For self-hosted (localhost) deployments, ClientID and ClientSecret are optional.
If you're using older authentication methods (LicenseKey or API keys), migrate to OAuth2 client credentials:
Before (v2.x):
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "https://staging-eu.getaxonflow.com",
LicenseKey: os.Getenv("AXONFLOW_LICENSE_KEY"),
})After (v3.x):
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "https://staging-eu.getaxonflow.com",
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
})How to get credentials:
- Contact AxonFlow support at dev@getaxonflow.com
- Credentials are provided as part of your AxonFlow subscription
- Store credentials securely in environment variables or secrets management systems
Self-hosted users: No credentials required for localhost endpoints.
Complete working examples for all features are available in the examples folder.
// PII Detection - Automatically detect sensitive data
result, _ := client.GetPolicyApprovedContext("user", "SSN: 123-45-6789", nil, nil)
// result.RequiresRedaction = true (SSN detected)
// SQL Injection Detection - Block malicious queries
result, _ := client.GetPolicyApprovedContext("user", "SELECT * FROM users; DROP TABLE users;", nil, nil)
// result.Approved = false, result.BlockReason = "SQL injection detected"
// Static Policies - List and manage built-in policies
policies, _ := client.ListPolicies()
// Returns: [{Name: "pii-detection", Enabled: true}, {Name: "sql-injection", Enabled: true}, ...]
// Dynamic Policies - Create runtime policies
err := client.CreateDynamicPolicy(axonflow.DynamicPolicyRequest{
Name: "block-competitor-queries",
Conditions: `{"contains": ["competitor", "pricing"]}`,
Action: "block",
})
// MCP Connectors - Query external data sources
resp, _ := client.QueryConnector("user-token", "postgres-db", "SELECT name, email FROM customers", nil)
// resp.Data contains query results with PII automatically redacted
// Multi-Agent Planning - Orchestrate complex workflows
plan, _ := client.GeneratePlan("Research AI governance regulations and summarize", "legal")
result, _ := client.ExecutePlan(plan.PlanID)
// Audit Logging - Track all LLM interactions
logs, _ := client.ListAuditLogs(axonflow.AuditLogFilter{UserID: "user-123", Limit: 100})
// Execution Replay - Debug past executions
executions, _ := client.ListExecutions(axonflow.ExecutionFilter{Status: "failed"})These features require an AxonFlow Enterprise license:
// Code Governance - Automated PR reviews with AI
prResult, _ := client.ReviewPullRequest(axonflow.PRReviewRequest{
RepoOwner: "your-org",
RepoName: "your-repo",
PRNumber: 123,
CheckTypes: []string{"security", "style", "performance"},
})
// Cost Controls - Budget management for LLM usage
budget, _ := client.GetBudget("team-engineering")
// Returns: {Limit: 1000.00, Used: 234.56, Remaining: 765.44}
// MCP Policy Enforcement - Automatic PII redaction in connector responses
resp, _ := client.QueryConnector("user", "postgres", "SELECT * FROM customers", nil)
// resp.PolicyInfo.Redacted = true
// resp.PolicyInfo.RedactedFields = ["ssn", "credit_card"]For enterprise features, contact sales@getaxonflow.com.
- Documentation: https://docs.getaxonflow.com
- Issues: https://github.com/getaxonflow/axonflow-sdk-go/issues
- Email: dev@getaxonflow.com
If you are evaluating AxonFlow in a company setting and cannot open a public issue, you can share feedback or blockers confidentially here: Anonymous evaluation feedback form
No email required. Optional contact if you want a response.
MIT

