Skip to content

Latest commit

 

History

History
704 lines (541 loc) · 15.6 KB

File metadata and controls

704 lines (541 loc) · 15.6 KB

Amp Admin Client Guide

The Amp Admin Client provides Python bindings for the Amp Admin API, enabling you to register datasets, deploy jobs, and manage your Amp infrastructure programmatically.

Table of Contents

Installation

The admin client is included in the amp package:

pip install amp

Or with uv:

uv add amp

Quick Start

Basic Client Setup

from amp import Client

# Initialize client with both query and admin capabilities
client = Client(
    query_url="grpc://localhost:8815",  # Flight SQL endpoint
    admin_url="http://localhost:8080",   # Admin API endpoint
    auth_token="your-auth-token"         # Optional authentication
)

Register a Dataset

# Define your dataset manifest
manifest = {
    'kind': 'manifest',
    'dependencies': {
        'eth': '_/eth_firehose@1.0.0'
    },
    'tables': {
        'blocks': {
            'input': {'sql': 'SELECT * FROM eth.blocks'},
            'schema': {'arrow': {'fields': [...]}},
            'network': 'mainnet'
        }
    },
    'functions': {}
}

# Register the dataset
client.datasets.register(
    namespace='_',
    name='my_dataset',
    version='1.0.0',
    manifest=manifest
)

Deploy and Monitor

# Deploy the dataset
deploy_response = client.datasets.deploy(
    namespace='_',
    name='my_dataset',
    version='1.0.0',
    parallelism=4,
    end_block='latest'
)

# Wait for completion
job = client.jobs.wait_for_completion(
    deploy_response.job_id,
    poll_interval=5.0,
    timeout=3600.0
)

print(f"Job completed with status: {job.status}")

Core Concepts

Manifests

A manifest is a JSON document that defines a dataset's structure, dependencies, tables, and functions. Manifests include:

  • dependencies: References to other datasets this dataset depends on
  • tables: SQL transformations and output schemas
  • functions: Custom Python/SQL functions (optional)
  • network: Blockchain network identifier

Datasets and Versions

Datasets are versioned using semantic versioning (e.g., 1.0.0). Each version has:

  • A unique manifest
  • Immutable registration
  • Independent deployment history

Jobs

Jobs represent long-running operations like dataset deployments. Jobs have states:

  • Pending: Queued for execution
  • Running: Currently executing
  • Completed: Successfully finished
  • Failed: Encountered an error
  • Cancelled: Stopped by user

Client Configuration

Unified Client

The Client class provides both query and admin functionality:

from amp import Client

# Full configuration
client = Client(
    query_url="grpc://localhost:8815",
    admin_url="http://localhost:8080",
    auth_token="your-token"
)

# Query operations (Flight SQL)
df = client.sql("SELECT * FROM eth.blocks LIMIT 10").to_pandas()

# Admin operations (HTTP API)
datasets = client.datasets.list_all()

Admin-Only Client

If you only need admin functionality:

from amp.admin import AdminClient

admin = AdminClient(
    base_url="http://localhost:8080",
    auth_token="your-token"
)

# Access admin operations
admin.datasets.list_all()
admin.jobs.get(123)

Backward Compatibility

The legacy url parameter still works for Flight SQL:

# This still works
client = Client(url="grpc://localhost:8815")
client.sql("SELECT * FROM eth.blocks")

Environment Variables

You can configure the client using environment variables:

export AMP_QUERY_URL="grpc://localhost:8815"
export AMP_ADMIN_URL="http://localhost:8080"
export AMP_AUTH_TOKEN="your-token"
import os
from amp import Client

client = Client(
    query_url=os.getenv('AMP_QUERY_URL'),
    admin_url=os.getenv('AMP_ADMIN_URL'),
    auth_token=os.getenv('AMP_AUTH_TOKEN')
)

Dataset Operations

Registering Datasets

# Simple registration
client.datasets.register(
    namespace='_',
    name='eth_blocks',
    version='1.0.0',
    manifest=manifest
)

# Registration without explicit version (server assigns)
client.datasets.register(
    namespace='_',
    name='eth_blocks',
    manifest=manifest
)

Listing Datasets

# List all datasets
response = client.datasets.list_all()

for dataset in response.datasets:
    print(f"{dataset.namespace}/{dataset.name}@{dataset.latest_version}")
    print(f"  Available versions: {dataset.versions}")

Getting Dataset Versions

# Get all versions of a dataset
versions_response = client.datasets.get_versions('_', 'eth_blocks')

print(f"Latest: {versions_response.special_tags.latest}")
print(f"Dev: {versions_response.special_tags.dev}")

for version_info in versions_response.versions:
    print(f"  {version_info.version} - {version_info.manifest_hash}")

Getting Version Details

# Get specific version info
version = client.datasets.get_version('_', 'eth_blocks', '1.0.0')
print(f"Manifest hash: {version.manifest_hash}")
print(f"Created: {version.created_at}")

Getting Manifests

# Retrieve the manifest for a version
manifest = client.datasets.get_manifest('_', 'eth_blocks', '1.0.0')

print(f"Tables: {list(manifest['tables'].keys())}")
print(f"Dependencies: {manifest['dependencies']}")

Deploying Datasets

# Deploy with options
deploy_response = client.datasets.deploy(
    namespace='_',
    name='eth_blocks',
    version='1.0.0',
    parallelism=8,           # Number of parallel workers
    end_block='latest'       # Stop at latest block (vs continuous)
)

print(f"Started job: {deploy_response.job_id}")

Deleting Datasets

# Delete all versions of a dataset
client.datasets.delete('_', 'old_dataset')

Job Management

Getting Job Status

# Get job by ID
job = client.jobs.get(123)

print(f"Status: {job.status}")
print(f"Node: {job.node_id}")
print(f"Descriptor: {job.descriptor}")

Listing Jobs

# List jobs with pagination
response = client.jobs.list(limit=50)

for job in response.jobs:
    print(f"Job {job.id}: {job.status}")

# Continue pagination if needed
if response.next_cursor:
    next_page = client.jobs.list(
        limit=50,
        last_job_id=response.next_cursor
    )

Waiting for Completion

# Block until job completes or times out
try:
    final_job = client.jobs.wait_for_completion(
        job_id=123,
        poll_interval=5.0,    # Check every 5 seconds
        timeout=3600.0        # Give up after 1 hour
    )

    if final_job.status == 'Completed':
        print("Job succeeded!")
    elif final_job.status == 'Failed':
        print("Job failed!")

except TimeoutError as e:
    print(f"Job did not complete in time: {e}")

Stopping Jobs

# Stop a running job
client.jobs.stop(123)

Deleting Jobs

# Delete a single job
client.jobs.delete(123)

# Delete multiple jobs
client.jobs.delete_many([123, 124, 125])

Schema Validation

The schema client validates SQL queries and returns their output schemas without execution:

# Validate a query and get its schema
schema_response = client.schema.get_output_schema(
    sql_query='SELECT block_num, hash, timestamp FROM eth.blocks WHERE block_num > 1000000',
    is_sql_dataset=True
)

# Inspect the Arrow schema
print(schema_response.schema)

This is particularly useful for:

  • Validating queries before registration
  • Understanding output structure
  • Generating correct Arrow schemas for manifests

Manifest Generation

The QueryBuilder provides a fluent API for generating manifests from SQL queries:

Basic Manifest Generation

# Build a query
query = client.sql("SELECT block_num, hash FROM eth.blocks")

# Add dependencies
query = query.with_dependency('eth', '_/eth_firehose@1.0.0')

# Generate manifest
manifest = query.to_manifest(
    table_name='blocks',
    network='mainnet'
)

print(manifest)
# {
#     'kind': 'manifest',
#     'dependencies': {'eth': '_/eth_firehose@1.0.0'},
#     'tables': {
#         'blocks': {
#             'input': {'sql': 'SELECT block_num, hash FROM eth.blocks'},
#             'schema': {'arrow': {...}},  # Auto-fetched
#             'network': 'mainnet'
#         }
#     },
#     'functions': {}
# }

One-Line Registration and Deployment

The most powerful pattern combines query building, manifest generation, registration, and deployment:

# Build, register, and deploy in one chain
job = (
    client.sql("SELECT block_num, hash FROM eth.blocks")
    .with_dependency('eth', '_/eth_firehose@1.0.0')
    .register_as(
        namespace='_',
        name='eth_blocks_simple',
        version='1.0.0',
        table_name='blocks',
        network='mainnet'
    )
    .deploy(
        end_block='latest',
        parallelism=4,
        wait=True  # Block until completion
    )
)

print(f"Deployment completed: {job.status}")

Multiple Dependencies

manifest = (
    client.sql("""
        SELECT
            t.token_address,
            t.amount,
            m.name,
            m.symbol
        FROM erc20_transfers t
        JOIN token_metadata m ON t.token_address = m.address
    """)
    .with_dependency('erc20_transfers', '_/erc20_transfers@1.0.0')
    .with_dependency('token_metadata', '_/token_metadata@1.0.0')
    .to_manifest('enriched_transfers', 'mainnet')
)

Deployment Workflows

Development Workflow

# 1. Develop query locally
query = client.sql("""
    SELECT
        block_num,
        COUNT(*) as tx_count
    FROM eth.transactions
    GROUP BY block_num
""")

# Test the query
df = query.to_pandas()
print(df.head())

# 2. Register as dataset
query = query.with_dependency('eth', '_/eth_firehose@1.0.0')

client.datasets.register(
    namespace='_',
    name='tx_counts',
    version='0.1.0',
    manifest=query.to_manifest('tx_counts', 'mainnet')
)

# 3. Deploy to limited range for testing
deploy_resp = client.datasets.deploy(
    namespace='_',
    name='tx_counts',
    version='0.1.0',
    end_block='10000',  # Test on first 10k blocks
    parallelism=2
)

# 4. Monitor
job = client.jobs.wait_for_completion(deploy_resp.job_id, timeout=600)

if job.status == 'Completed':
    print("Test deployment successful!")

    # 5. Deploy full version
    prod_deploy = client.datasets.deploy(
        namespace='_',
        name='tx_counts',
        version='0.1.0',
        end_block='latest',
        parallelism=8
    )

Production Workflow

# Register production version
context = (
    client.sql("SELECT * FROM processed_data")
    .with_dependency('raw', '_/raw_data@2.0.0')
    .register_as('_', 'processed_data', '2.0.0', 'data', 'mainnet')
)

# Deploy without waiting
deploy_resp = context.deploy(
    end_block='latest',
    parallelism=16,
    wait=False
)

print(f"Started production deployment: {deploy_resp.job_id}")

# Monitor separately (e.g., in a monitoring service)
def monitor_job(job_id):
    while True:
        job = client.jobs.get(job_id)

        if job.status in ['Completed', 'Failed', 'Cancelled']:
            return job

        print(f"Job {job_id} status: {job.status}")
        time.sleep(30)

final_job = monitor_job(deploy_resp.job_id)

Continuous Deployment

# Deploy continuous processing (no end_block)
deploy_resp = client.datasets.deploy(
    namespace='_',
    name='realtime_data',
    version='1.0.0',
    parallelism=4
    # end_block=None means continuous
)

# Job will run indefinitely, processing new blocks as they arrive
print(f"Continuous deployment started: {deploy_resp.job_id}")

# Stop later when needed
client.jobs.stop(deploy_resp.job_id)

Error Handling

The admin client provides typed exceptions for different error scenarios:

Error Types

from amp.admin.errors import (
    AdminAPIError,           # Base exception
    DatasetNotFoundError,
    InvalidManifestError,
    JobNotFoundError,
    DependencyValidationError,
    InternalServerError,
)

Handling Errors

try:
    client.datasets.register('_', 'my_dataset', '1.0.0', manifest)

except InvalidManifestError as e:
    print(f"Manifest validation failed: {e.message}")
    print(f"Error code: {e.error_code}")

except DependencyValidationError as e:
    print(f"Dependency issue: {e.message}")

except AdminAPIError as e:
    print(f"API error: {e.error_code} - {e.message}")
    print(f"HTTP status: {e.status_code}")

Robust Deployment

def robust_deploy(client, namespace, name, version, **deploy_options):
    """Deploy with comprehensive error handling."""
    try:
        # Check if dataset exists
        try:
            version_info = client.datasets.get_version(namespace, name, version)
            print(f"Found existing version: {version_info.manifest_hash}")
        except DatasetNotFoundError:
            raise ValueError(f"Dataset {namespace}/{name}@{version} not registered")

        # Deploy
        deploy_resp = client.datasets.deploy(
            namespace, name, version, **deploy_options
        )

        # Wait for completion
        job = client.jobs.wait_for_completion(
            deploy_resp.job_id,
            poll_interval=5.0,
            timeout=3600.0
        )

        if job.status == 'Completed':
            print(f"Deployment successful: job {job.id}")
            return job
        else:
            raise RuntimeError(f"Job failed with status: {job.status}")

    except TimeoutError:
        print("Deployment timeout - job may still be running")
        raise

    except AdminAPIError as e:
        print(f"API error during deployment: {e.message}")
        raise

# Usage
job = robust_deploy(
    client,
    namespace='_',
    name='my_dataset',
    version='1.0.0',
    parallelism=4,
    end_block='latest'
)

Best Practices

1. Use Context Managers

with Client(query_url=..., admin_url=..., auth_token=...) as client:
    # Client will automatically close connections
    client.datasets.register(...)

2. Validate Schemas Early

# Validate before registration
schema = client.schema.get_output_schema(sql_query, True)
print(f"Query will produce {len(schema.schema['fields'])} columns")

3. Version Your Datasets

# Use semantic versioning
# - Major: Breaking schema changes
# - Minor: Backward-compatible additions
# - Patch: Bug fixes

client.datasets.register('_', 'my_dataset', '1.0.0', manifest_v1)
client.datasets.register('_', 'my_dataset', '1.1.0', manifest_v1_1)  # Added columns
client.datasets.register('_', 'my_dataset', '2.0.0', manifest_v2)    # Breaking change

4. Monitor Long-Running Jobs

# Don't block main thread for long deployments
deploy_resp = client.datasets.deploy(..., wait=False)

# Monitor asynchronously
import threading

def monitor():
    job = client.jobs.wait_for_completion(deploy_resp.job_id)
    print(f"Job finished: {job.status}")

thread = threading.Thread(target=monitor)
thread.start()

5. Handle Dependencies Correctly

# Always specify full dependency references
query = (
    client.sql("SELECT * FROM base.data")
    .with_dependency('base', '_/base_dataset@1.0.0')  # Include version!
)

# Not: .with_dependency('base', 'base_dataset')  # ❌ Missing namespace/version

Next Steps