This guide explains how to use the parallel streaming feature in amp-python to efficiently load large historical datasets and seamlessly transition to live streaming.
Parallel streaming allows you to:
- Parallelize historical data loading across multiple workers for faster backfills
- Automatically transition from parallel catchup to continuous streaming
- Configure partition sizes to optimize for your data characteristics
- Resume from specific block heights when restarting
Load a specific block range using multiple workers:
from amp.client import Client
from amp.streaming.parallel import ParallelConfig
# Connect to Amp server
client = Client("grpc://your-amp-server:80")
# Configure PostgreSQL connection
client.configure_connection(
name='my_postgres',
loader='postgresql',
config={
'host': 'localhost',
'database': 'blockchain_data',
'user': 'postgres',
'password': 'password'
}
)
# Configure parallel execution
parallel_config = ParallelConfig(
num_workers=4, # Use 4 parallel workers
table_name='eth_firehose.blocks', # Source table in Amp
min_block=0, # Start from block 0
max_block=1000000, # Load up to block 1M
block_column='block_num' # Column to partition on
)
# Execute parallel streaming query
query = "SELECT * FROM eth_firehose.blocks"
results = client.sql(query).load(
connection='my_postgres',
destination='blocks',
stream=True, # Enable streaming mode
parallel_config=parallel_config # Enable parallel execution
)
# Process results from all partitions
total_rows = 0
for result in results:
if result.success:
partition_id = result.metadata.get('partition_id', 'N/A')
print(f"Partition {partition_id}: {result.rows_loaded:,} rows loaded")
total_rows += result.rows_loaded
else:
print(f"Error: {result.error}")
print(f"\nTotal: {total_rows:,} rows loaded")Automatically detect the current block height, catch up in parallel, then transition to continuous streaming:
# Configure hybrid mode (max_block=None enables auto-detection)
parallel_config = ParallelConfig(
num_workers=4,
table_name='eth_firehose.blocks',
min_block=0,
max_block=None, # Auto-detect current max and transition to streaming
block_column='block_num'
)
# Execute hybrid streaming
results = client.sql(query).load(
connection='my_postgres',
destination='blocks',
stream=True,
parallel_config=parallel_config
)
# Process both parallel and continuous results
parallel_complete = False
for result in results:
if 'partition_id' in result.metadata:
# Parallel catchup phase
print(f"Catchup partition {result.metadata['partition_id']}: {result.rows_loaded:,} rows")
else:
# Continuous streaming phase
if not parallel_complete:
print("\n🎯 Parallel catchup complete! Now streaming live data...")
parallel_complete = True
print(f"Live batch: {result.rows_loaded:,} rows")@dataclass
class ParallelConfig:
# Required
num_workers: int # Number of parallel workers (recommend 2-8)
table_name: str # Table name in Amp server
block_column: str = 'block_num' # Column to partition on
# Optional - Block range
min_block: Optional[int] = None # Start block (default: 0)
max_block: Optional[int] = None # End block (None = auto-detect for hybrid mode)
# Optional - Partitioning
partition_size: Optional[int] = None # Blocks per partition (auto-calculated if not set)
# Optional - Performance tuning
batch_size: int = 10000 # Rows per batch within each partitionThe optimal number of workers depends on:
- Network bandwidth: More workers = more concurrent connections
- Database write capacity: Target database must handle parallel writes
- Data characteristics: Sparse data may benefit from fewer, larger partitions
Recommendations:
- Small datasets (<1M blocks): 2-4 workers
- Medium datasets (1M-10M blocks): 4-8 workers
- Large datasets (>10M blocks): 8+ workers (monitor database load)
By default, partition size is calculated as: (max_block - min_block) / num_workers
You can override this for finer control:
# Load 10M blocks with 100k blocks per partition
parallel_config = ParallelConfig(
num_workers=8,
table_name='eth_firehose.blocks',
min_block=0,
max_block=10_000_000,
partition_size=100_000, # Custom partition size
block_column='block_num'
)This creates 100 partitions (10M / 100k), processed by 8 workers concurrently.
Load a specific historical range and exit when complete:
parallel_config = ParallelConfig(
num_workers=8,
table_name='eth_firehose.blocks',
min_block=0,
max_block=20_000_000, # Specific end block
block_column='block_num'
)
results = client.sql(query).load(
connection='my_postgres',
destination='blocks',
stream=True,
parallel_config=parallel_config
)
# Consume all results
for result in results:
pass # Results are automatically loaded to database
print("Historical backfill complete!")Resume a previously interrupted load:
# Assume we previously loaded up to block 5M
checkpoint_block = 5_000_000
parallel_config = ParallelConfig(
num_workers=8,
table_name='eth_firehose.blocks',
min_block=checkpoint_block, # Resume from checkpoint
max_block=20_000_000,
block_column='block_num'
)Start near current block height and stream indefinitely:
# Only load recent history, then stream live
parallel_config = ParallelConfig(
num_workers=4,
table_name='eth_firehose.blocks',
min_block=20_000_000, # Start from recent block
max_block=None, # Auto-detect and transition to live streaming
block_column='block_num'
)
results = client.sql(query).load(
connection='my_postgres',
destination='blocks',
stream=True,
parallel_config=parallel_config
)
# Run indefinitely (Ctrl+C to stop)
try:
for result in results:
if result.success:
print(f"Loaded {result.rows_loaded:,} rows")
except KeyboardInterrupt:
print("\nStopped by user")Note on Reorg Buffer: When transitioning from parallel catchup to continuous streaming, the system automatically starts continuous streaming from detected_max_block - reorg_buffer (default: 200 blocks). This overlap ensures that any reorgs that occurred during the parallel catchup phase are detected and handled properly. With reorg detection enabled, duplicate blocks are automatically handled correctly. The reorg_buffer can be customized via ParallelConfig(reorg_buffer=N).
Currently, parallel streaming has the following limitations:
-
Block-based partitioning only: Only supports partitioning by block number columns (
block_numor_block_num). Tables without block numbers cannot use parallel execution. -
Schema detection requires data: Pre-flight schema detection requires at least 1 row in the source table. Empty tables will skip pre-flight creation and let workers handle it.
-
Static partitioning: Partitions are created upfront based on the block range. The system does not support dynamic repartitioning during execution.
-
Thread-level parallelism: Uses Python threads (ThreadPoolExecutor), not processes. For CPU-bound transformations, performance may be limited by the GIL.
-
Single table queries: The partitioning strategy works best with queries against a single table. Complex joins or unions may require careful query structuring.
-
Reorg buffer configuration: The
reorg_bufferparameter (default: 200 blocks) is configurable but applies uniformly. Per-chain customization requires separateParallelConfiginstances.
Expected speedup with parallel loading:
| Workers | Speedup | Notes |
|---|---|---|
| 1 | 1x | Baseline (sequential) |
| 2 | 1.8-1.9x | Good for modest datasets |
| 4 | 3.5-3.8x | Optimal for most use cases |
| 8 | 6-7x | Best for large datasets |
| 16+ | 8-12x | Diminishing returns, increased overhead |
Actual speedup depends on:
- Network latency between client, Amp server, and target database
- Target database write throughput
- Data density per block
Memory consumption is proportional to:
num_workers * batch_size * row_size
Example: With 8 workers, 10k batch size, and 1KB rows:
8 * 10,000 * 1KB ≈ 80MBpeak memory
Memory is released after each batch is written to the target database.
Use Parallel Streaming When:
- Loading historical data (>100k blocks)
- Initial backfills or catchups
- Target database can handle concurrent writes
- Network bandwidth is not the bottleneck
Use Sequential Streaming When:
- Near real-time (within last ~100 blocks)
- Target system has write concurrency limits
- Data is very sparse (few records per block)
- Memory constrained environments
When using hybrid mode (max_block=None), the system automatically handles the transition from parallel catchup to continuous streaming with a built-in reorg buffer:
Timeline Example:
─────────────────────────────────────────────────────────────────────
T0: System detects current max block = 10,000,000
T1: Parallel workers start loading blocks 0 → 10,000,000
T2: Parallel workers complete (takes ~5 minutes)
(Meanwhile, new blocks 10,000,001-10,000,050 have arrived)
T3: Continuous streaming starts from block 9,999,800 (10M - 200)
↓
Blocks 9,999,800 → 10,000,000 loaded TWICE (parallel + streaming)
└─ Reorg detection handles any inconsistencies
└─ Database upsert/merge handles duplicates
Result: Zero data gaps, all reorgs caught ✓
─────────────────────────────────────────────────────────────────────
Why 200 blocks (default)?
- Ethereum average reorg depth: 1-5 blocks
- 200 blocks = ~40 minutes of history
- Provides safety margin for deep reorgs that occurred during catchup
- Small performance cost (200 blocks re-loaded) vs high data integrity value
Customizing the Buffer:
The reorg buffer is fully configurable via ParallelConfig:
parallel_config = ParallelConfig(
num_workers=4,
table_name='eth_firehose.blocks',
min_block=0,
max_block=None, # Hybrid mode
reorg_buffer=500, # Increase for networks with deeper reorgs (e.g., testnets)
block_column='block_num'
)For advanced use cases, you can combine parallel loading with additional WHERE clause filters:
# Only load specific event types in parallel
query = """
SELECT * FROM eth_firehose.logs
WHERE topic0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
"""
parallel_config = ParallelConfig(
num_workers=4,
table_name='eth_firehose.logs',
min_block=0,
max_block=20_000_000,
block_column='block_num'
)
# The parallel executor will add block range filters to your WHERE clause
results = client.sql(query).load(
connection='my_postgres',
destination='transfer_events',
stream=True,
parallel_config=parallel_config
)Each worker streams data to the same destination table, so ensure your loader configuration supports concurrent writes:
PostgreSQL: Use connection pooling (automatically configured)
config = {
'host': 'localhost',
'database': 'blockchain_data',
'max_connections': 20, # Ensure enough connections for all workers
'batch_size': 10000
}Redis: Supports concurrent writes by default
config = {
'host': 'localhost',
'port': 6379
}DeltaLake: Use appropriate table isolation level
config = {
'table_path': './data/blocks',
'partition_by': ['block_num'],
'optimize_after_write': False # Optimize once after all workers complete
}Enable INFO level logging to monitor progress:
import logging
logging.basicConfig(level=logging.INFO)
# You'll see output like:
# INFO: Worker 0 processing blocks 0 to 250000
# INFO: Worker 1 processing blocks 250000 to 500000
# INFO: Partition 0 completed: 1,234,567 rows in 45.2sEach LoadResult includes metadata about the partition:
for result in results:
if 'partition_id' in result.metadata:
print(f"Partition: {result.metadata['partition_id']}")
print(f"Block range: {result.metadata.get('block_range', 'N/A')}")
print(f"Duration: {result.duration:.2f}s")
print(f"Throughput: {result.ops_per_second:.0f} rows/s")If a worker encounters an error, it will:
- Return a LoadResult with
success=Falseanderrormessage - Not retry automatically (to avoid infinite loops)
- Allow other partitions to continue
failed_partitions = []
for result in results:
if not result.success:
failed_partitions.append(result.metadata.get('partition_id'))
print(f"Partition {result.metadata['partition_id']} failed: {result.error}")
if failed_partitions:
print(f"\nFailed partitions: {failed_partitions}")
# Implement retry logic as neededIf the Amp server connection fails:
- All workers will fail with a connection error
- The iterator will yield error results and terminate
Press Ctrl+C to stop streaming:
try:
for result in results:
# Process results...
pass
except KeyboardInterrupt:
print("\nShutdown requested, waiting for workers to finish current batches...")
# Workers will complete current partitions and exit gracefullyIssue: Workers appear stuck after loading first batch
Cause: Query has SETTINGS stream = true which makes workers wait for new data
Solution: Don't include SETTINGS stream = true in the query when using parallel_config - the parallel executor handles this automatically
Issue: Process crashes with OOM
Cause: Too many workers or batch size too large
Solution: Reduce num_workers or batch_size:
parallel_config = ParallelConfig(
num_workers=4, # Reduced from 8
table_name='eth_firehose.blocks',
min_block=0,
max_block=1_000_000,
block_column='block_num'
)
# Also reduce batch_size in load() call
results = client.sql(query).load(
connection='my_postgres',
destination='blocks',
stream=True,
parallel_config=parallel_config,
batch_size=5000 # Reduced from 10000
)Issue: OperationalError: connection pool exhausted
Cause: Not enough database connections for number of workers
Solution: Increase max_connections in loader config:
config = {
'host': 'localhost',
'database': 'blockchain_data',
'user': 'postgres',
'password': 'password',
'max_connections': num_workers * 3 # At least 3x num_workers
}Issue: Some workers finish much faster than others Cause: Data is not evenly distributed across block ranges Solution: Use smaller partition sizes to create more partitions:
parallel_config = ParallelConfig(
num_workers=4,
table_name='eth_firehose.blocks',
min_block=0,
max_block=1_000_000,
partition_size=50_000, # Creates 20 partitions instead of 4
block_column='block_num'
)This allows workers to dynamically pick up new partitions as they finish.
- Streaming Guide - Sequential streaming documentation
- Loader Configuration - Target database configuration
- Performance Benchmarks - Performance test results