Skip to content

Latest commit

 

History

History
601 lines (462 loc) · 19.3 KB

File metadata and controls

601 lines (462 loc) · 19.3 KB

Parallel Streaming for High-Volume Data Loading

Overview

This document describes the parallel streaming architecture implemented in amp-python to enable high-throughput data loading for massive datasets (billions of records). The system partitions streaming queries by block number ranges and executes multiple concurrent streams to maximize throughput.

Target Use Case: Loading billions of blockchain records with 8-16 parallel streams to achieve 8-16x throughput improvement.

Scope: This feature is designed exclusively for streaming queries (continuous data loads), not regular batch loads.

Key Design Decisions

1. Streaming Queries Only

Parallel partitioning is only supported for streaming queries, not regular load() operations. This simplifies the implementation and focuses on the primary use case of loading large-scale blockchain data.

2. Block Range Partitioning Only

We only support partitioning by block_num or _block_num columns, as all blockchain data in amp includes these columns. This avoids the complexity of supporting generic partitioning strategies (time-based, hash-based, etc.) that we don't currently need.

3. CTE-Based Query Wrapping

Instead of using brittle regex-based query rewriting, we leverage DataFusion's CTE (Common Table Expression) inlining. We wrap the user's query in a CTE that adds block range filtering:

-- User's query
SELECT * FROM blocks

-- Wrapped for partition 0 (blocks 0-1M)
WITH partition AS (
  SELECT * FROM (SELECT * FROM blocks)
  WHERE block_num >= 0 AND block_num < 1000000
)
SELECT * FROM partition

DataFusion's query optimizer inlines the CTE and executes it efficiently, effectively the same as if the user had written the filter themselves.

Benefits:

  • ✅ No query parsing/rewriting required
  • ✅ Works with any user query structure
  • ✅ Leverages DataFusion's query optimization
  • ✅ Robust and maintainable

4. ThreadPoolExecutor Over asyncio

We use ThreadPoolExecutor for parallelism instead of asyncio for several reasons:

  • PyArrow Flight is synchronous: No native async support, would require wrapping in threads anyway
  • Minimal code changes: ~500 lines vs ~3000+ for full asyncio migration
  • No breaking changes: Existing code continues to work
  • Equivalent performance: For I/O-bound workloads (our case), threads perform as well as asyncio
  • Thread-safe loaders: PostgreSQL already uses ThreadedConnectionPool, other loaders use connection-per-thread

See the "Technical Analysis" section below for detailed comparison.


Architecture Design

System Components

┌──────────────────────────────────────────────────────────────────┐
│              Client.query_and_load_streaming()                    │
│                    (parallel=True, num_workers=8)                 │
└───────────────────────────────┬──────────────────────────────────┘
                                │
                    ┌───────────▼────────────┐
                    │ ParallelStreamExecutor │
                    │  - Create partitions   │
                    │  - Wrap with CTEs      │
                    │  - Manage workers      │
                    │  - Aggregate results   │
                    └───────────┬────────────┘
                                │
            ┌───────────────────┼───────────────────┐
            │                   │                   │
    ┌───────▼────────┐  ┌──────▼───────┐  ┌───────▼────────┐
    │   Worker 1     │  │   Worker 2   │  │   Worker N     │
    │                │  │              │  │                │
    │ CTE-wrapped    │  │ CTE-wrapped  │  │ CTE-wrapped    │
    │ query          │  │ query        │  │ query          │
    │ (blocks 0-1M)  │  │ (blocks 1-2M)│  │ (blocks N-N+1M)│
    │      ↓         │  │      ↓       │  │      ↓         │
    │ Flight Stream  │  │ Flight Stream│  │ Flight Stream  │
    │      ↓         │  │      ↓       │  │      ↓         │
    │ Loader         │  │ Loader       │  │ Loader         │
    │ Instance 1     │  │ Instance 2   │  │ Instance N     │
    └────────────────┘  └──────────────┘  └────────────────┘
            │                   │                   │
            └───────────────────┼───────────────────┘
                                │
                    ┌───────────▼────────────┐
                    │   Destination Store    │
                    │   (PostgreSQL, etc.)   │
                    │   Handles concurrent   │
                    │   writes natively      │
                    └────────────────────────┘

Core Classes

1. QueryPartition

@dataclass
class QueryPartition:
    """Represents a partition of a query for parallel execution"""

    partition_id: int
    start_block: int
    end_block: int
    block_column: str = 'block_num'

    @property
    def metadata(self) -> Dict[str, Any]:
        """Metadata about this partition"""
        return {
            'start_block': self.start_block,
            'end_block': self.end_block,
            'block_column': self.block_column,
            'partition_size': self.end_block - self.start_block
        }

2. ParallelConfig

@dataclass
class ParallelConfig:
    """Configuration for parallel streaming execution"""

    num_workers: int  # Number of parallel workers (required)
    table_name: str  # Name of the table to partition (required)
    min_block: int = 0  # Minimum block number (defaults to 0)
    max_block: Optional[int] = None  # Maximum block number (None = auto-detect and continue streaming)
    partition_size: Optional[int] = None  # Blocks per partition (auto-calculated if not set)
    block_column: str = 'block_num'  # Column name to partition on
    stop_on_error: bool = False  # Stop all workers on first error

3. BlockRangePartitionStrategy

class BlockRangePartitionStrategy:
    """
    Strategy for partitioning streaming queries by block_num ranges using CTEs.
    """

    def create_partitions(self, config: ParallelConfig) -> List[QueryPartition]:
        """
        Create query partitions based on configuration.

        Divides the block range [min_block, max_block) into equal partitions.
        If partition_size is specified, creates as many partitions as needed.
        Otherwise, divides evenly across num_workers.
        """
        ...

    def wrap_query_with_partition(self, user_query: str, partition: QueryPartition) -> str:
        """
        Wrap user query with CTE that filters by partition block range.

        Example:
            user_query: "SELECT * FROM blocks"
            partition: QueryPartition(0, 0, 1000000, 'block_num')

            Returns:
            WITH partition AS (
              SELECT * FROM (SELECT * FROM blocks)
              WHERE block_num >= 0 AND block_num < 1000000
            )
            SELECT * FROM partition
        """
        ...

4. ParallelStreamExecutor

class ParallelStreamExecutor:
    """
    Executes parallel streaming queries using ThreadPoolExecutor.

    Manages:
    - Query partitioning by block ranges using CTEs
    - Worker thread pool execution
    - Result aggregation
    - Error handling
    - Progress tracking
    """

    def execute_parallel_streaming(
        self,
        user_query: str,
        destination: str,
        connection_name: str,
        load_config: Optional[Dict[str, Any]] = None
    ) -> Iterator[LoadResult]:
        """
        Execute parallel streaming load with CTE-based partitioning.

        1. Create partitions based on block range configuration
        2. Wrap user query with partition CTEs for each worker
        3. Submit worker tasks to thread pool
        4. Stream results as they complete
        5. Aggregate final statistics
        """
        ...

Usage Examples

Example 1: Simple Parallel Streaming

from amp.client import Client
from amp.streaming.parallel import ParallelConfig

# Initialize client
client = Client("grpc://amp-server:8815")

# Configure PostgreSQL connection
client.configure_connection(
    name="postgres",
    loader="postgresql",
    config={
        "host": "localhost",
        "port": 5432,
        "database": "blockchain",
        "user": "loader",
        "password": "***",
        "max_connections": 16  # Pool size for workers
    }
)

# User's simple query
query = "SELECT * FROM blocks"

# Configure parallel execution
parallel_config = ParallelConfig(
    num_workers=8,
    min_block=0,
    max_block=1_000_000,
    block_column='block_num'
)

# Execute parallel streaming
results = client.query_and_load_streaming(
    query=query,
    destination="blocks_table",
    connection_name="postgres",
    parallel=True,
    parallel_config=parallel_config
)

# Monitor progress
for result in results:
    if result.success:
        print(f"Partition {result.metadata['partition_id']}: "
              f"{result.rows_loaded:,} rows in {result.duration:.1f}s")

Example 2: Custom Partition Size

# Instead of dividing by num_workers, specify partition size
parallel_config = ParallelConfig(
    num_workers=8,  # Max concurrent workers
    partition_size=100_000,  # Process 100K blocks per partition
    min_block=0,
    max_block=1_000_000,
    block_column='block_num'
)

# This creates 10 partitions (1M blocks / 100K per partition)
# Workers will process them as they become available
results = client.query_and_load_streaming(
    query="SELECT * FROM blocks",
    destination="blocks_table",
    connection_name="postgres",
    parallel=True,
    parallel_config=parallel_config
)

Example 3: Hybrid Mode with Automatic Transition

When max_block=None, the system auto-detects the current max block, loads historical data in parallel, then transitions to continuous streaming with a reorg buffer:

parallel_config = ParallelConfig(
    num_workers=4,
    table_name='eth_firehose.blocks',
    min_block=0,
    max_block=None,  # Auto-detect and transition to streaming
    block_column='block_num'
)

results = client.sql("SELECT * FROM blocks").load(
    connection='postgres',
    destination='blocks',
    stream=True,
    parallel_config=parallel_config
)

for result in results:
    if 'partition_id' in result.metadata:
        print(f"Catchup: {result.rows_loaded:,} rows")
    else:
        print(f"Live: {result.rows_loaded:,} rows")

Reorg Buffer: The transition includes a 200-block overlap buffer. If the system detects max_block=10,000,000, continuous streaming starts from block 9,999,800 to catch any reorgs that occurred during parallel catchup.

Example 4: Different Block Column Name

# For datasets with _block_num instead of block_num
parallel_config = ParallelConfig(
    num_workers=4,
    min_block=1000000,
    max_block=2000000,
    block_column='_block_num'  # Custom column name
)

results = client.query_and_load_streaming(
    query="SELECT * FROM transactions",
    destination="txs_table",
    connection_name="postgres",
    parallel=True,
    parallel_config=parallel_config
)

Thread Safety Considerations

Loader Instance Design

Key principle: Each worker thread gets its own loader instance or uses thread-safe connection pooling.

PostgreSQL (ThreadedConnectionPool)

# PostgreSQL loader already uses ThreadedConnectionPool
class PostgreSQLLoader:
    def connect(self):
        self.pool = ThreadedConnectionPool(
            minconn=1,
            maxconn=self.config.max_connections  # Size for all workers
        )

    def load_batch(self, batch, table_name):
        conn = self.pool.getconn()  # Thread-safe: blocks if pool exhausted
        try:
            # ... do work ...
        finally:
            self.pool.putconn(conn)  # Return to pool

For parallel streaming: Set max_connections >= num_workers in loader config.

Other Loaders (Connection-per-worker)

# Each worker gets its own loader instance with independent connection
def worker_task(partition, config):
    loader = SomeLoader(config)  # New connection
    with loader:
        # Process partition
        ...

Ensuring Thread Safety

  1. No shared mutable state between workers
  2. Connection pooling for PostgreSQL (already implemented)
  3. Independent connections for other loaders
  4. Thread-safe stats aggregation using locks

Technical Analysis

Why ThreadPoolExecutor?

We evaluated two approaches for implementing parallelism:

  1. ThreadPoolExecutor (stdlib concurrent.futures)
  2. asyncio (stdlib async/await framework)

Decision: ThreadPoolExecutor

Critical Constraints

1. PyArrow Flight has NO native asyncio support

# PyArrow Flight API is 100% synchronous
reader = self.conn.do_get(info.endpoints[0].ticket)
chunk = reader.read_chunk()  # Blocking call, no async variant

Using asyncio would require wrapping every Flight call in asyncio.to_thread() or run_in_executor(), which uses threads internally anyway. This defeats the purpose of asyncio.

2. Workload is I/O-bound, not CPU-bound

┌─────────────────┐      ┌──────────────┐      ┌─────────────┐
│ Query Amp       │      │ Deserialize  │      │ Database    │
│ Server (I/O)    │─────▶│ Arrow (CPU)  │─────▶│ Write (I/O) │
│ Network Wait    │      │ GIL Released │      │ Network Wait│
└─────────────────┘      └──────────────┘      └─────────────┘
     70% time              15% time              60% time

For I/O-bound workloads:

  • Threads perform equally well as asyncio (both wait on I/O)
  • Python GIL is released during I/O operations
  • PyArrow C++ operations release GIL during compute

3. Code impact comparison

Approach Lines Changed Breaking Changes Complexity
ThreadPoolExecutor ~500 lines None Low
asyncio ~3000+ lines All user code High

ThreadPoolExecutor requires minimal changes:

  • New parallel module (~300 lines)
  • Client integration (~200 lines)
  • No changes to existing loaders
  • No breaking changes

asyncio would require:

  • Rewrite all loader base classes
  • Rewrite all 7+ loader implementations
  • Change all user-facing APIs
  • All user code breaks (must use async/await)

4. Thread-safe loaders already exist

Our loaders already support thread-safe patterns:

  • PostgreSQL: Uses ThreadedConnectionPool
  • Others: Connection-per-worker pattern

Performance Characteristics

Benchmarks (Preliminary)

Single Worker Baseline

Query: SELECT * FROM blocks (1M rows)
Loader: PostgreSQL (psycopg2)
Result: 1,000,000 rows in 45.2s = 22,123 rows/sec

Parallel Execution (4 workers)

Query: Same, partitioned into 4 ranges
Loader: PostgreSQL (connection pool size=4)
Result: 1,000,000 rows in 12.8s = 78,125 rows/sec
Speedup: 3.5x

Parallel Execution (8 workers)

Query: Same, partitioned into 8 ranges
Loader: PostgreSQL (connection pool size=8)
Result: 1,000,000 rows in 7.9s = 126,582 rows/sec
Speedup: 5.7x

Expected speedup factors:

  • 4 workers: 3.0-3.5x
  • 8 workers: 5.5-7.0x
  • 16 workers: 10-14x (for high-throughput destinations like Snowflake)

Best Practices

1. Choosing Number of Workers

# Start conservative
num_workers = 4  # Safe for most destinations

# Scale up based on destination capacity
if destination_has_high_write_throughput:
    num_workers = 8-16

Rule of thumb: Start with 4 workers, measure throughput, increase until gains plateau.

2. Partition Size Tuning

# Too small: High overhead from many small queries
partition_size = 1_000  # ❌ Too fine-grained

# Too large: Uneven load distribution
partition_size = 10_000_000  # ❌ Some workers finish early

# Just right: Balance between overhead and distribution
partition_size = 250_000  # ✅ Good for most cases

3. Connection Pool Sizing

For PostgreSQL:

# Set max_connections >= num_workers
config = {
    "max_connections": 16,  # For 8-16 parallel workers
    ...
}

4. Error Handling

# Stop on first error (fail-fast)
parallel_config = ParallelConfig(
    num_workers=8,
    stop_on_error=True  # Abort all workers on first failure
)

# Continue despite errors (best-effort)
parallel_config = ParallelConfig(
    num_workers=8,
    stop_on_error=False  # Collect all errors
)

Limitations and Caveats

1. Streaming Queries Only

Parallel partitioning is only supported for streaming queries, not regular load() operations. This keeps the implementation focused and simple.

2. Requires Block Number Column

All queries must include data with block_num or _block_num column (all blockchain data in amp has this).

3. Ordering Not Preserved

Workers complete in arbitrary order. If you need ordered results, sort after loading or use single-worker streaming.

4. Destination Must Support Concurrent Writes

Not all destinations benefit equally from parallelism:

Destination Parallel Benefit Notes
PostgreSQL ⭐⭐⭐⭐ Very Good Limited by connection pool
Redis ⭐⭐⭐ Good Memory-bound, diminishing returns
Delta Lake ⭐⭐⭐ Good File I/O can become bottleneck

Future Enhancements

Short Term

  • Auto-tuning of num_workers based on destination
  • Progress bars with tqdm
  • Better error recovery and retry logic

Medium Term

  • Adaptive partition sizing based on data distribution
  • Cross-partition reorg handling coordination
  • Metrics export (Prometheus/StatsD)

Long Term

  • Support for regular (non-streaming) loads if needed
  • Additional partitioning strategies (if use cases emerge)
  • Migration to asyncio (if ecosystem matures and provides clear benefits)

Conclusion

The parallel streaming implementation provides:

Significant speedup (3-8x typical, up to 16x for high-throughput destinations) ✅ Clean design using CTEs instead of query rewriting ✅ Minimal code changes (~500 lines, no breaking changes) ✅ Streaming-focused (our primary use case) ✅ Thread-safe with existing loaders ✅ Production-ready error handling

This pragmatic approach delivers immediate value for our blockchain data loading use case without over-engineering for scenarios we don't currently need.