Complete API reference for the Amp Admin Client.
Main client providing both Flight SQL query operations and admin operations.
Module: amp.client
Client(
url: Optional[str] = None,
query_url: Optional[str] = None,
admin_url: Optional[str] = None,
auth_token: Optional[str] = None
)Parameters:
url(str, optional): Legacy parameter for Flight SQL URL. If provided andquery_urlis not, this value is used forquery_url.query_url(str, optional): Query endpoint URL via Flight SQL (e.g.,"grpc://localhost:8815").admin_url(str, optional): Admin API HTTP endpoint URL (e.g.,"http://localhost:8080").auth_token(str, optional): Authentication token for Admin API requests.
Raises:
ValueError: When accessing admin properties without configuringadmin_url.
Example:
from amp import Client
# Full configuration
client = Client(
query_url="grpc://localhost:8815",
admin_url="http://localhost:8080",
auth_token="my-token"
)
# Query-only (backward compatible)
client = Client(url="grpc://localhost:8815")Access the DatasetsClient for dataset operations.
Returns: DatasetsClient
Raises: ValueError if admin_url was not configured.
Access the JobsClient for job operations.
Returns: JobsClient
Raises: ValueError if admin_url was not configured.
Access the SchemaClient for schema operations.
Returns: SchemaClient
Raises: ValueError if admin_url was not configured.
Create a QueryBuilder for the given SQL query.
Parameters:
sql(str): SQL query string.
Returns: QueryBuilder instance.
Example:
qb = client.sql("SELECT * FROM eth.blocks LIMIT 10")
df = qb.to_pandas()Low-level HTTP client for the Admin API. Typically you'll use the unified Client class instead.
Module: amp.admin.client
AdminClient(
base_url: str,
auth_token: Optional[str] = None
)Parameters:
base_url(str): Base URL for the Admin API (e.g.,"http://localhost:8080").auth_token(str, optional): Authentication token. If provided, addsAuthorization: Bearer <token>header.
Example:
from amp.admin import AdminClient
admin = AdminClient(
base_url="http://localhost:8080",
auth_token="my-token"
)Access the DatasetsClient.
Returns: DatasetsClient
Access the JobsClient.
Returns: JobsClient
Access the SchemaClient.
Returns: SchemaClient
Close the HTTP client connection.
Example:
admin.close()AdminClient can be used as a context manager:
with AdminClient("http://localhost:8080") as admin:
admin.datasets.list_all()
# Connection automatically closedClient for dataset registration, deployment, and management operations.
Module: amp.admin.datasets
Register a new dataset or dataset version.
register(
namespace: str,
name: str,
version: Optional[str],
manifest: dict
) -> NoneParameters:
namespace(str): Dataset namespace (e.g.,"_").name(str): Dataset name.version(str, optional): Semantic version string (e.g.,"1.0.0"). If not provided, server assigns version.manifest(dict): Dataset manifest dictionary.
Raises:
InvalidManifestError: If manifest validation fails.DependencyValidationError: If referenced dependencies don't exist.
Example:
manifest = {
'kind': 'manifest',
'dependencies': {'eth': '_/eth_firehose@1.0.0'},
'tables': {'blocks': {...}},
'functions': {}
}
client.datasets.register('_', 'my_dataset', '1.0.0', manifest)Deploy a registered dataset version.
deploy(
namespace: str,
name: str,
version: str,
parallelism: Optional[int] = None,
end_block: Optional[str] = None
) -> models.DeployResponseParameters:
namespace(str): Dataset namespace.name(str): Dataset name.version(str): Version to deploy.parallelism(int, optional): Number of parallel workers.end_block(str, optional): Block to stop at (e.g.,"latest","1000000"). If not provided, runs continuously.
Returns: DeployResponse with job_id field.
Raises:
DatasetNotFoundError: If dataset/version doesn't exist.
Example:
response = client.datasets.deploy(
'_', 'my_dataset', '1.0.0',
parallelism=4,
end_block='latest'
)
print(f"Job ID: {response.job_id}")List all registered datasets.
list_all() -> models.ListDatasetsResponseReturns: ListDatasetsResponse containing list of DatasetSummary objects.
Example:
response = client.datasets.list_all()
for dataset in response.datasets:
print(f"{dataset.namespace}/{dataset.name}@{dataset.latest_version}")Get all versions of a dataset.
get_versions(
namespace: str,
name: str
) -> models.VersionsResponseParameters:
namespace(str): Dataset namespace.name(str): Dataset name.
Returns: VersionsResponse with versions list and special_tags dict.
Raises:
DatasetNotFoundError: If dataset doesn't exist.
Example:
response = client.datasets.get_versions('_', 'eth_blocks')
print(f"Latest: {response.special_tags.latest}")
for version in response.versions:
print(f" {version.version}")Get details of a specific dataset version.
get_version(
namespace: str,
name: str,
version: str
) -> models.VersionInfoParameters:
namespace(str): Dataset namespace.name(str): Dataset name.version(str): Version string.
Returns: VersionInfo with version metadata.
Raises:
DatasetNotFoundError: If dataset or version doesn't exist.
Example:
info = client.datasets.get_version('_', 'eth_blocks', '1.0.0')
print(f"Manifest hash: {info.manifest_hash}")
print(f"Created: {info.created_at}")Retrieve the manifest for a specific dataset version.
get_manifest(
namespace: str,
name: str,
version: str
) -> dictParameters:
namespace(str): Dataset namespace.name(str): Dataset name.version(str): Version string.
Returns: Manifest dictionary.
Raises:
DatasetNotFoundError: If dataset or version doesn't exist.
Example:
manifest = client.datasets.get_manifest('_', 'eth_blocks', '1.0.0')
print(f"Tables: {list(manifest['tables'].keys())}")Delete a dataset and all its versions.
delete(
namespace: str,
name: str
) -> NoneParameters:
namespace(str): Dataset namespace.name(str): Dataset name.
Raises:
DatasetNotFoundError: If dataset doesn't exist.
Example:
client.datasets.delete('_', 'old_dataset')Client for job monitoring and management.
Module: amp.admin.jobs
Get details of a specific job.
get(job_id: int) -> models.JobInfoParameters:
job_id(int): Job ID.
Returns: JobInfo with job details.
Raises:
JobNotFoundError: If job doesn't exist.
Example:
job = client.jobs.get(123)
print(f"Status: {job.status}")
print(f"Node: {job.node_id}")List jobs with pagination.
list(
limit: int = 100,
last_job_id: Optional[int] = None
) -> models.ListJobsResponseParameters:
limit(int, optional): Maximum number of jobs to return. Default: 100.last_job_id(int, optional): Cursor for pagination. Returns jobs after this ID.
Returns: ListJobsResponse with jobs list and optional next_cursor.
Example:
# First page
response = client.jobs.list(limit=50)
for job in response.jobs:
print(f"Job {job.id}: {job.status}")
# Next page
if response.next_cursor:
next_page = client.jobs.list(limit=50, last_job_id=response.next_cursor)Poll job status until completion or timeout.
wait_for_completion(
job_id: int,
poll_interval: float = 5.0,
timeout: float = 3600.0
) -> models.JobInfoParameters:
job_id(int): Job ID to monitor.poll_interval(float, optional): Seconds between status checks. Default: 5.0.timeout(float, optional): Maximum seconds to wait. Default: 3600.0 (1 hour).
Returns: JobInfo with final job status.
Raises:
TimeoutError: If job doesn't complete within timeout.JobNotFoundError: If job doesn't exist.
Example:
try:
job = client.jobs.wait_for_completion(
job_id=123,
poll_interval=5.0,
timeout=1800.0 # 30 minutes
)
if job.status == 'Completed':
print("Success!")
else:
print(f"Job ended with status: {job.status}")
except TimeoutError:
print("Job timed out")Stop a running job.
stop(job_id: int) -> NoneParameters:
job_id(int): Job ID to stop.
Raises:
JobNotFoundError: If job doesn't exist.
Example:
client.jobs.stop(123)Delete a single job.
delete(job_id: int) -> NoneParameters:
job_id(int): Job ID to delete.
Raises:
JobNotFoundError: If job doesn't exist.
Example:
client.jobs.delete(123)Delete multiple jobs.
delete_many(job_ids: list[int]) -> NoneParameters:
job_ids(list[int]): List of job IDs to delete.
Example:
client.jobs.delete_many([123, 124, 125])Client for SQL query validation and schema inference.
Module: amp.admin.schema
Validate SQL query and get its output Arrow schema without executing it.
get_output_schema(
sql_query: str,
is_sql_dataset: bool = True
) -> models.OutputSchemaResponseParameters:
sql_query(str): SQL query to analyze.is_sql_dataset(bool, optional): Whether this is for a SQL dataset. Default: True.
Returns: OutputSchemaResponse with Arrow schema.
Raises:
GetOutputSchemaError: If schema analysis fails.DependencyValidationError: If query references invalid dependencies.
Example:
response = client.schema.get_output_schema(
'SELECT block_num, hash FROM eth.blocks WHERE block_num > 1000000',
is_sql_dataset=True
)
print(response.schema) # Arrow schema dictAll data models are Pydantic v2 models defined in amp.admin.models.
Summary information about a dataset.
Fields:
namespace(str): Dataset namespacename(str): Dataset namelatest_version(str): Latest version stringversions(list[str]): All available versions
Detailed information about a dataset version.
Fields:
version(str): Version stringmanifest_hash(str): Hash of the manifestcreated_at(str): ISO timestampupdated_at(str): ISO timestamp
Response containing all versions of a dataset.
Fields:
namespace(str): Dataset namespacename(str): Dataset nameversions(list[VersionInfo]): List of version detailsspecial_tags(SpecialTags): Special version tags
Special version tags for a dataset.
Fields:
latest(str): Latest stable versiondev(str, optional): Development version
Information about a job.
Fields:
id(int): Job IDstatus(str): Job status ("Pending","Running","Completed","Failed","Cancelled")descriptor(dict): Job configurationnode_id(str, optional): Worker node ID
Response from listing jobs.
Fields:
jobs(list[JobInfo]): List of jobsnext_cursor(int, optional): Cursor for next page
Response from listing datasets.
Fields:
datasets(list[DatasetSummary]): List of dataset summaries
Response from deploying a dataset.
Fields:
job_id(int): ID of the created job
Response containing Arrow schema for a query.
Fields:
schema(dict): Arrow schema dictionary
Request to register a dataset.
Fields:
namespace(str): Dataset namespacename(str): Dataset nameversion(str, optional): Version stringmanifest(dict): Dataset manifest
Request to get output schema for a query.
Fields:
sql_query(str): SQL queryis_sql_dataset(bool): Whether this is for a SQL dataset
All error classes are defined in amp.admin.errors.
Base exception for all Admin API errors.
Attributes:
error_code(str): Error code from APImessage(str): Human-readable error messagestatus_code(int): HTTP status code
Example:
try:
client.datasets.register(...)
except AdminAPIError as e:
print(f"Error: {e.error_code} - {e.message}")
print(f"HTTP Status: {e.status_code}")All specific errors inherit from AdminAPIError:
DatasetNotFoundError: Dataset or version not found (404)InvalidManifestError: Manifest validation failed (400)JobNotFoundError: Job not found (404)DependencyValidationError: Invalid dependency reference (400)GetOutputSchemaError: Schema analysis failed (400)InvalidDependencyError: Malformed dependency specification (400)InternalServerError: Server error (500)BadGatewayError: Gateway error (502)ServiceUnavailableError: Service unavailable (503)GatewayTimeoutError: Gateway timeout (504)
Usage:
from amp.admin.errors import DatasetNotFoundError, InvalidManifestError
try:
client.datasets.get_version('_', 'nonexistent', '1.0.0')
except DatasetNotFoundError:
print("Dataset not found")
try:
client.datasets.register('_', 'bad', '1.0.0', {})
except InvalidManifestError as e:
print(f"Manifest invalid: {e.message}")Fluent API for building SQL queries and generating manifests.
Module: amp.client
Add a dependency to the query.
with_dependency(alias: str, reference: str) -> QueryBuilderParameters:
alias(str): Dependency alias used in SQL (e.g.,"eth")reference(str): Full dependency reference (e.g.,"_/eth_firehose@1.0.0")
Returns: Self for chaining.
Example:
qb = (
client.sql("SELECT * FROM eth.blocks")
.with_dependency('eth', '_/eth_firehose@1.0.0')
)Generate a dataset manifest from the query.
to_manifest(table_name: str, network: str = 'mainnet') -> dictParameters:
table_name(str): Name for the output tablenetwork(str, optional): Network identifier. Default:"mainnet".
Returns: Manifest dictionary.
Example:
manifest = (
client.sql("SELECT * FROM eth.blocks")
.with_dependency('eth', '_/eth_firehose@1.0.0')
.to_manifest('blocks', 'mainnet')
)Register the query as a dataset and return a deployment context.
register_as(
namespace: str,
name: str,
version: str,
table_name: str,
network: str = 'mainnet'
) -> DeploymentContextParameters:
namespace(str): Dataset namespacename(str): Dataset nameversion(str): Version stringtable_name(str): Output table namenetwork(str, optional): Network identifier. Default:"mainnet".
Returns: DeploymentContext for chaining deployment.
Example:
job = (
client.sql("SELECT * FROM eth.blocks")
.with_dependency('eth', '_/eth_firehose@1.0.0')
.register_as('_', 'my_dataset', '1.0.0', 'blocks')
.deploy(parallelism=4, wait=True)
)Context for deploying a registered dataset with fluent API.
Module: amp.admin.deployment
Deploy the registered dataset.
deploy(
end_block: Optional[str] = None,
parallelism: Optional[int] = None,
wait: bool = False,
poll_interval: float = 5.0,
timeout: float = 3600.0
) -> models.JobInfoParameters:
end_block(str, optional): Block to stop at. If None, runs continuously.parallelism(int, optional): Number of parallel workers.wait(bool, optional): If True, blocks until job completes. Default: False.poll_interval(float, optional): Seconds between polls if waiting. Default: 5.0.timeout(float, optional): Maximum wait time if waiting. Default: 3600.0.
Returns: JobInfo - if wait=False, returns initial job info; if wait=True, returns final job info.
Raises:
TimeoutError: If waiting and job doesn't complete within timeout.
Example:
# Deploy and return immediately
context = client.sql(...).register_as(...)
job = context.deploy(parallelism=4)
print(f"Started job {job.id}")
# Deploy and wait for completion
job = context.deploy(parallelism=4, wait=True, timeout=1800)
print(f"Completed with status: {job.status}")Putting it all together:
from amp import Client
from amp.admin.errors import AdminAPIError, TimeoutError
# Initialize client
client = Client(
query_url="grpc://localhost:8815",
admin_url="http://localhost:8080",
auth_token="my-token"
)
try:
# Build and test query
query = client.sql("""
SELECT block_num, hash, timestamp
FROM eth.blocks
WHERE block_num > 1000000
""")
# Test locally
df = query.to_pandas()
print(f"Query returns {len(df)} rows")
# Validate schema
schema = client.schema.get_output_schema(query.query, True)
print(f"Schema: {schema.schema}")
# Register and deploy
job = (
query
.with_dependency('eth', '_/eth_firehose@1.0.0')
.register_as('_', 'eth_blocks_filtered', '1.0.0', 'blocks', 'mainnet')
.deploy(
end_block='latest',
parallelism=4,
wait=True,
timeout=1800.0
)
)
print(f"Deployment completed: {job.status}")
except AdminAPIError as e:
print(f"API Error: {e.error_code} - {e.message}")
except TimeoutError:
print("Deployment timed out")
finally:
client.close()