From b276fcdee681284396342f672bba545556430c28 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Tue, 7 Apr 2026 14:24:36 +0200 Subject: [PATCH] Add max_concurrency to Source for per-source parallelism control Sources can set max_concurrency to limit how many tasks run in parallel. Useful for rate-limited APIs. Defaults to None (system concurrency). --- ingestify/application/loader.py | 5 ++++- ingestify/domain/models/source.py | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 4d4752e..095b5b7 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -253,7 +253,10 @@ def run(self, selectors, dry_run: bool = False): selector=selector, ) - with TaskExecutor(dry_run=dry_run) as task_executor: + with TaskExecutor( + dry_run=dry_run, + processes=ingestion_plan.source.max_concurrency, + ) as task_executor: for ingestion_job_summary in ingestion_job.execute( self.store, task_executor=task_executor ): diff --git a/ingestify/domain/models/source.py b/ingestify/domain/models/source.py index f1bb94c..405b4f9 100644 --- a/ingestify/domain/models/source.py +++ b/ingestify/domain/models/source.py @@ -7,6 +7,10 @@ class Source(ABC): + # Override in subclass to limit how many tasks run in parallel. + # None means use the default (system concurrency). + max_concurrency: Optional[int] = None + def __init__(self, name: str, **kwargs): self.name = name