diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 4d4752e..095b5b7 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -253,7 +253,10 @@ def run(self, selectors, dry_run: bool = False): selector=selector, ) - with TaskExecutor(dry_run=dry_run) as task_executor: + with TaskExecutor( + dry_run=dry_run, + processes=ingestion_plan.source.max_concurrency, + ) as task_executor: for ingestion_job_summary in ingestion_job.execute( self.store, task_executor=task_executor ): diff --git a/ingestify/domain/models/source.py b/ingestify/domain/models/source.py index f1bb94c..405b4f9 100644 --- a/ingestify/domain/models/source.py +++ b/ingestify/domain/models/source.py @@ -7,6 +7,10 @@ class Source(ABC): + # Override in subclass to limit how many tasks run in parallel. + # None means use the default (system concurrency). + max_concurrency: Optional[int] = None + def __init__(self, name: str, **kwargs): self.name = name