diff --git a/changes/3547.misc.md b/changes/3547.misc.md new file mode 100644 index 0000000000..e9f6a7cd0a --- /dev/null +++ b/changes/3547.misc.md @@ -0,0 +1,4 @@ +Moved concurrency-limiting functionality to store classes. The global configuration object no longer +controls concurrency limits. Concurrency limits, if applicable, must now be specified when constructing a store. +Users who have subclassed the `Store` class should implement concurrency-limiting logic if appropriate for +their storage backend. \ No newline at end of file diff --git a/docs/user-guide/config.md b/docs/user-guide/config.md index 044f170c02..63574295e9 100644 --- a/docs/user-guide/config.md +++ b/docs/user-guide/config.md @@ -32,7 +32,7 @@ Configuration options include the following: - Whether empty chunks are written to storage `array.write_empty_chunks` - Enable experimental rectilinear chunks `array.rectilinear_chunks` - Whether missing chunks are filled with the array's fill value on read `array.read_missing_chunks` (default `True`). Set to `False` to raise a [`ChunkNotFoundError`][zarr.errors.ChunkNotFoundError] instead. -- Async and threading options, e.g. `async.concurrency` and `threading.max_workers` +- Threading options, e.g.`threading.max_workers`. - Selections of implementations of codecs, codec pipelines and buffers - Enabling GPU support with `zarr.config.enable_gpu()`. See GPU support for more. diff --git a/docs/user-guide/performance.md b/docs/user-guide/performance.md index 6d3ec063d2..d05d0149ce 100644 --- a/docs/user-guide/performance.md +++ b/docs/user-guide/performance.md @@ -190,32 +190,7 @@ scenarios. ### Concurrent I/O operations -Zarr uses asynchronous I/O internally to enable concurrent reads and writes across multiple chunks. -The level of concurrency is controlled by the `async.concurrency` configuration setting, which -determines the maximum number of concurrent I/O operations. - -The default value is 10, which is a conservative value. You may get improved performance by tuning -the concurrency limit. You can adjust this value based on your specific needs: - -```python -import zarr - -# Set concurrency for the current session -zarr.config.set({'async.concurrency': 128}) - -# Or use environment variable -# export ZARR_ASYNC_CONCURRENCY=128 -``` - -Higher concurrency values can improve throughput when: -- Working with remote storage (e.g., S3, GCS) where network latency is high -- Reading/writing many small chunks in parallel -- The storage backend can handle many concurrent requests - -Lower concurrency values may be beneficial when: -- Working with local storage with limited I/O bandwidth -- Memory is constrained (each concurrent operation requires buffer space) -- Using Zarr within a parallel computing framework (see below) +For latency-sensitive storage backends like HTTP and cloud object storage, Zarr uses asynchronous I/O internally to enable concurrent reads and writes across multiple chunks. Zarr does not impose its own concurrency limits — storage backends are expected to manage their own concurrency constraints (e.g., connection pool sizes, rate limits). If you need to limit concurrency for a particular backend, configure it at the storage layer (e.g., via fsspec or obstore options). ### Thread pool size (`threading.max_workers`) @@ -241,30 +216,21 @@ concurrently. ### Using Zarr with Dask -[Dask](https://www.dask.org/) is a popular parallel computing library that works well with Zarr for processing large arrays. When using Zarr with Dask, it's important to consider the interaction between Dask's thread pool and Zarr's concurrency settings. +[Dask](https://www.dask.org/) is a popular parallel computing library that works well with Zarr for processing large arrays. When using Zarr with Dask, it's important to consider the interaction between Dask's thread pool and Zarr's internal thread pool. -**Important**: When using many Dask threads, you may need to reduce both Zarr's `async.concurrency` and `threading.max_workers` settings to avoid creating too many concurrent operations. The total number of concurrent I/O operations can be roughly estimated as: - -``` -total_concurrency ≈ dask_threads × zarr_async_concurrency -``` - -For example, if you're running Dask with 10 threads and Zarr's default concurrency of 64, you could potentially have up to 640 concurrent operations, which may overwhelm your storage system or cause memory issues. - -**Recommendation**: When using Dask with many threads, configure Zarr's concurrency settings: +**Recommendation**: When using Dask with many threads, reduce Zarr's internal thread pool to avoid thread contention: ```python import zarr import dask.array as da -# If using Dask with many threads (e.g., 8-16), reduce Zarr's concurrency settings +# Limit Zarr's internal thread pool zarr.config.set({ - 'async.concurrency': 4, # Limit concurrent async operations 'threading.max_workers': 4, # Limit Zarr's internal thread pool }) # Open Zarr array -z = zarr.open_array('data/large_array.zarr', mode='r') +z = zarr.open_array("data/large_array.zarr", mode='r') # Create Dask array from Zarr array arr = da.from_array(z, chunks=z.chunks) @@ -273,13 +239,6 @@ arr = da.from_array(z, chunks=z.chunks) result = arr.mean(axis=0).compute() ``` -**Configuration guidelines for Dask workloads**: - -- `async.concurrency`: Controls the maximum number of concurrent async I/O operations. Start with a lower value (e.g., 4-8) when using many Dask threads. -- `threading.max_workers`: Controls Zarr's internal thread pool size for blocking operations (defaults to CPU count). Reduce this to avoid thread contention with Dask's scheduler. - -You may need to experiment with different values to find the optimal balance for your workload. Monitor your system's resource usage and adjust these settings based on whether your storage system or CPU is the bottleneck. - ### Thread safety and process safety Zarr arrays are designed to be thread-safe for concurrent reads and writes from multiple threads within the same process. However, proper synchronization is required when writing to overlapping regions from multiple threads. diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index eed2119aff..5c0a584086 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from abc import abstractmethod from collections.abc import Mapping from typing import TYPE_CHECKING, Literal, Protocol, TypeGuard, runtime_checkable @@ -8,8 +9,7 @@ from zarr.abc.metadata import Metadata from zarr.core.buffer import Buffer, NDBuffer -from zarr.core.common import NamedConfig, concurrent_map -from zarr.core.config import config +from zarr.core.common import NamedConfig if TYPE_CHECKING: from collections.abc import Awaitable, Callable, Iterable @@ -246,11 +246,7 @@ async def decode_partial( ------- Iterable[NDBuffer | None] """ - return await concurrent_map( - list(batch_info), - self._decode_partial_single, - config.get("async.concurrency"), - ) + return await asyncio.gather(*[self._decode_partial_single(*info) for info in batch_info]) class ArrayBytesCodecPartialEncodeMixin: @@ -283,11 +279,7 @@ async def encode_partial( The ByteSetter is used to write the necessary bytes and fetch bytes for existing chunk data. The chunk spec contains information about the chunk. """ - await concurrent_map( - list(batch_info), - self._encode_partial_single, - config.get("async.concurrency"), - ) + await asyncio.gather(*[self._encode_partial_single(*info) for info in batch_info]) class CodecPipeline: @@ -490,11 +482,7 @@ async def _batching_helper[CI: CodecInput, CO: CodecOutput]( func: Callable[[CI, ArraySpec], Awaitable[CO | None]], batch_info: Iterable[tuple[CI | None, ArraySpec]], ) -> list[CO | None]: - return await concurrent_map( - list(batch_info), - _noop_for_none(func), - config.get("async.concurrency"), - ) + return await asyncio.gather(*[_noop_for_none(func)(chunk, spec) for chunk, spec in batch_info]) def _noop_for_none[CI: CodecInput, CO: CodecOutput]( diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 600df17ee5..e37ee2fc9e 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -679,13 +679,8 @@ async def getsize_prefix(self, prefix: str) -> int: # improve tail latency and might reduce memory pressure (since not all keys # would be in memory at once). - # avoid circular import - from zarr.core.common import concurrent_map - from zarr.core.config import config - - keys = [(x,) async for x in self.list_prefix(prefix)] - limit = config.get("async.concurrency") - sizes = await concurrent_map(keys, self.getsize, limit=limit) + keys = [x async for x in self.list_prefix(prefix)] + sizes = await asyncio.gather(*[self.getsize(key) for key in keys]) return sum(sizes) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 4736805b9d..b9a4842268 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import json import warnings from asyncio import gather @@ -20,7 +21,6 @@ import numpy as np from typing_extensions import deprecated -import zarr from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec, Codec from zarr.abc.numcodec import Numcodec, _is_numcodec from zarr.codecs._v2 import V2Codec @@ -63,7 +63,6 @@ _default_zarr_format, _warn_order_kwarg, ceildiv, - concurrent_map, parse_shapelike, product, ) @@ -4661,28 +4660,26 @@ async def from_array( if write_data: if isinstance(data, Array): - async def _copy_array_region( - chunk_coords: tuple[int, ...] | slice, _data: AnyArray - ) -> None: + async def _copy_array_region(chunk_coords: tuple[slice, ...], _data: AnyArray) -> None: arr = await _data.async_array.getitem(chunk_coords) await result.setitem(chunk_coords, arr) # Stream data from the source array to the new array - await concurrent_map( - [(region, data) for region in result._iter_shard_regions()], - _copy_array_region, - zarr.core.config.config.get("async.concurrency"), + # Store handles concurrency limiting internally + await asyncio.gather( + *[_copy_array_region(region, data) for region in result._iter_shard_regions()] ) else: - async def _copy_arraylike_region(chunk_coords: slice, _data: NDArrayLike) -> None: - await result.setitem(chunk_coords, _data[chunk_coords]) + async def _copy_arraylike_region( + chunk_coords: tuple[slice, ...], _data: npt.ArrayLike + ) -> None: + await result.setitem(chunk_coords, _data[chunk_coords]) # type: ignore[call-overload, index] # Stream data from the source array to the new array - await concurrent_map( - [(region, data) for region in result._iter_shard_regions()], - _copy_arraylike_region, - zarr.core.config.config.get("async.concurrency"), + # Store handles concurrency limiting internally + await asyncio.gather( + *[_copy_arraylike_region(region, data) for region in result._iter_shard_regions()] ) return result @@ -6303,13 +6300,12 @@ async def _resize( async def _delete_key(key: str) -> None: await (array.store_path / key).delete() - await concurrent_map( - [ - (array.metadata.encode_chunk_key(chunk_coords),) + # Store handles concurrency limiting internally + await asyncio.gather( + *[ + _delete_key(array.metadata.encode_chunk_key(chunk_coords)) for chunk_coords in old_chunk_coords.difference(new_chunk_coords) - ], - _delete_key, - zarr_config.get("async.concurrency"), + ] ) # Write new metadata diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 4cecc3a6d1..f97b11386b 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from dataclasses import dataclass, field from itertools import islice, pairwise from typing import TYPE_CHECKING, Any @@ -16,7 +17,6 @@ GetResult, SupportsSyncCodec, ) -from zarr.core.common import concurrent_map from zarr.core.config import config from zarr.core.indexing import SelectorTuple, is_scalar from zarr.errors import ZarrUserWarning @@ -378,13 +378,12 @@ async def read_batch( results.append(GetResult(status="missing")) else: batch_info_list = list(batch_info) - chunk_bytes_batch = await concurrent_map( - [ - (byte_getter, array_spec.prototype) + # Store handles concurrency limiting internally + chunk_bytes_batch = await asyncio.gather( + *[ + byte_getter.get(array_spec.prototype) for byte_getter, array_spec, *_ in batch_info_list - ], - lambda byte_getter, prototype: byte_getter.get(prototype), - config.get("async.concurrency"), + ] ) chunk_array_batch = await self.decode_batch( [ @@ -485,16 +484,15 @@ async def _read_key( return await byte_setter.get(prototype=prototype) chunk_bytes_batch: Iterable[Buffer | None] - chunk_bytes_batch = await concurrent_map( - [ - ( + # Store handles concurrency limiting internally + chunk_bytes_batch = await asyncio.gather( + *[ + _read_key( None if is_complete_chunk else byte_setter, chunk_spec.prototype, ) for byte_setter, chunk_spec, chunk_selection, _, is_complete_chunk in batch_info - ], - _read_key, - config.get("async.concurrency"), + ] ) chunk_array_decoded = await self.decode_batch( [ @@ -552,15 +550,14 @@ async def _write_key(byte_setter: ByteSetter, chunk_bytes: Buffer | None) -> Non else: await byte_setter.set(chunk_bytes) - await concurrent_map( - [ - (byte_setter, chunk_bytes) + # Store handles concurrency limiting internally + await asyncio.gather( + *[ + _write_key(byte_setter, chunk_bytes) for chunk_bytes, (byte_setter, *_) in zip( chunk_bytes_batch, batch_info, strict=False ) - ], - _write_key, - config.get("async.concurrency"), + ] ) async def decode( @@ -587,13 +584,12 @@ async def read( out: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> tuple[GetResult, ...]: - batch_results = await concurrent_map( - [ - (single_batch_info, out, drop_axes) + # Process mini-batches concurrently - stores handle I/O concurrency internally + batch_results = await asyncio.gather( + *[ + self.read_batch(single_batch_info, out, drop_axes) for single_batch_info in batched(batch_info, self.batch_size) - ], - self.read_batch, - config.get("async.concurrency"), + ] ) results: list[GetResult] = [] for batch in batch_results: @@ -606,13 +602,12 @@ async def write( value: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: - await concurrent_map( - [ - (single_batch_info, value, drop_axes) + # Process mini-batches concurrently - stores handle I/O concurrency internally + await asyncio.gather( + *[ + self.write_batch(single_batch_info, value, drop_axes) for single_batch_info in batched(batch_info, self.batch_size) - ], - self.write_batch, - config.get("async.concurrency"), + ] ) diff --git a/src/zarr/core/common.py b/src/zarr/core/common.py index a16257df7c..df53d4dda5 100644 --- a/src/zarr/core/common.py +++ b/src/zarr/core/common.py @@ -1,13 +1,11 @@ from __future__ import annotations -import asyncio import functools import math import operator import warnings from collections.abc import Iterable, Mapping, Sequence from enum import Enum -from itertools import starmap from typing import ( TYPE_CHECKING, Any, @@ -26,7 +24,7 @@ from zarr.errors import ZarrRuntimeWarning if TYPE_CHECKING: - from collections.abc import Awaitable, Callable, Iterator + from collections.abc import Iterator ZARR_JSON = "zarr.json" @@ -92,24 +90,6 @@ def ceildiv(a: float, b: float) -> int: return math.ceil(a / b) -async def concurrent_map[T: tuple[Any, ...], V]( - items: Iterable[T], - func: Callable[..., Awaitable[V]], - limit: int | None = None, -) -> list[V]: - if limit is None: - return await asyncio.gather(*list(starmap(func, items))) - - else: - sem = asyncio.Semaphore(limit) - - async def run(item: tuple[Any]) -> V: - async with sem: - return await func(*item) - - return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items]) - - def enum_names[E: Enum](enum: type[E]) -> Iterator[str]: for item in enum: yield item.name diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index 7dcbc78e31..f801eb208d 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -100,7 +100,7 @@ def enable_gpu(self) -> ConfigSet: "target_shard_size_bytes": None, "rectilinear_chunks": False, }, - "async": {"concurrency": 10, "timeout": None}, + "async": {"timeout": None}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { diff --git a/src/zarr/core/group.py b/src/zarr/core/group.py index b810041e7b..a988581df9 100644 --- a/src/zarr/core/group.py +++ b/src/zarr/core/group.py @@ -1439,13 +1439,10 @@ async def _members( ) raise ValueError(msg) - # enforce a concurrency limit by passing a semaphore to all the recursive functions - semaphore = asyncio.Semaphore(config.get("async.concurrency")) async for member in _iter_members_deep( self, max_depth=max_depth, skip_keys=skip_keys, - semaphore=semaphore, use_consolidated_for_children=use_consolidated_for_children, ): yield member @@ -3348,14 +3345,11 @@ async def create_nodes( The created nodes in the order they are created. """ - # Note: the only way to alter this value is via the config. If that's undesirable for some reason, - # then we should consider adding a keyword argument this this function - semaphore = asyncio.Semaphore(config.get("async.concurrency")) create_tasks: list[Coroutine[None, None, str]] = [] for key, value in nodes.items(): # make the key absolute - create_tasks.extend(_persist_metadata(store, key, value, semaphore=semaphore)) + create_tasks.extend(_persist_metadata(store, key, value)) created_object_keys = [] @@ -3501,28 +3495,16 @@ def _ensure_consistent_zarr_format( ) -async def _getitem_semaphore( - node: AsyncGroup, key: str, semaphore: asyncio.Semaphore | None -) -> AnyAsyncArray | AsyncGroup: +async def _getitem(node: AsyncGroup, key: str) -> AnyAsyncArray | AsyncGroup: """ - Wrap Group.getitem with an optional semaphore. - - If the semaphore parameter is an - asyncio.Semaphore instance, then the getitem operation is performed inside an async context - manager provided by that semaphore. If the semaphore parameter is None, then getitem is invoked - without a context manager. + Fetch a child node from a group by key. """ - if semaphore is not None: - async with semaphore: - return await node.getitem(key) - else: - return await node.getitem(key) + return await node.getitem(key) async def _iter_members( node: AsyncGroup, skip_keys: tuple[str, ...], - semaphore: asyncio.Semaphore | None, ) -> AsyncGenerator[tuple[str, AnyAsyncArray | AsyncGroup], None]: """ Iterate over the arrays and groups contained in a group. @@ -3533,8 +3515,6 @@ async def _iter_members( The group to traverse. skip_keys : tuple[str, ...] A tuple of keys to skip when iterating over the possible members of the group. - semaphore : asyncio.Semaphore | None - An optional semaphore to use for concurrency control. Yields ------ @@ -3545,10 +3525,7 @@ async def _iter_members( keys = [key async for key in node.store.list_dir(node.path)] keys_filtered = tuple(filter(lambda v: v not in skip_keys, keys)) - node_tasks = tuple( - asyncio.create_task(_getitem_semaphore(node, key, semaphore), name=key) - for key in keys_filtered - ) + node_tasks = tuple(asyncio.create_task(_getitem(node, key), name=key) for key in keys_filtered) for fetched_node_coro in asyncio.as_completed(node_tasks): try: @@ -3575,7 +3552,6 @@ async def _iter_members_deep( *, max_depth: int | None, skip_keys: tuple[str, ...], - semaphore: asyncio.Semaphore | None = None, use_consolidated_for_children: bool = True, ) -> AsyncGenerator[tuple[str, AnyAsyncArray | AsyncGroup], None]: """ @@ -3590,8 +3566,6 @@ async def _iter_members_deep( The maximum depth of recursion. skip_keys : tuple[str, ...] A tuple of keys to skip when iterating over the possible members of the group. - semaphore : asyncio.Semaphore | None - An optional semaphore to use for concurrency control. use_consolidated_for_children : bool, default True Whether to use the consolidated metadata of child groups loaded from the store. Note that this only affects groups loaded from the @@ -3610,7 +3584,7 @@ async def _iter_members_deep( new_depth = None else: new_depth = max_depth - 1 - async for name, node in _iter_members(group, skip_keys=skip_keys, semaphore=semaphore): + async for name, node in _iter_members(group, skip_keys=skip_keys): is_group = isinstance(node, AsyncGroup) if ( is_group @@ -3624,9 +3598,7 @@ async def _iter_members_deep( yield name, node if is_group and do_recursion: node = cast("AsyncGroup", node) - to_recurse[name] = _iter_members_deep( - node, max_depth=new_depth, skip_keys=skip_keys, semaphore=semaphore - ) + to_recurse[name] = _iter_members_deep(node, max_depth=new_depth, skip_keys=skip_keys) for prefix, subgroup_iter in to_recurse.items(): async for name, node in subgroup_iter: @@ -3836,9 +3808,7 @@ async def get_node(store: Store, path: str, zarr_format: ZarrFormat) -> AnyAsync raise ValueError(f"Unexpected zarr format: {zarr_format}") # pragma: no cover -async def _set_return_key( - *, store: Store, key: str, value: Buffer, semaphore: asyncio.Semaphore | None = None -) -> str: +async def _set_return_key(*, store: Store, key: str, value: Buffer) -> str: """ Write a value to storage at the given key. The key is returned. Useful when saving values via routines that return results in execution order, @@ -3853,15 +3823,8 @@ async def _set_return_key( The key to save the value to. value : Buffer The value to save. - semaphore : asyncio.Semaphore | None - An optional semaphore to use to limit the number of concurrent writes. """ - - if semaphore is not None: - async with semaphore: - await store.set(key, value) - else: - await store.set(key, value) + await store.set(key, value) return key @@ -3869,7 +3832,6 @@ def _persist_metadata( store: Store, path: str, metadata: ArrayV2Metadata | ArrayV3Metadata | GroupMetadata, - semaphore: asyncio.Semaphore | None = None, ) -> tuple[Coroutine[None, None, str], ...]: """ Prepare to save a metadata document to storage, returning a tuple of coroutines that must be awaited. @@ -3877,7 +3839,7 @@ def _persist_metadata( to_save = metadata.to_buffer_dict(default_buffer_prototype()) return tuple( - _set_return_key(store=store, key=_join_paths([path, key]), value=value, semaphore=semaphore) + _set_return_key(store=store, key=_join_paths([path, key]), value=value) for key, value in to_save.items() ) diff --git a/src/zarr/core/sync.py b/src/zarr/core/sync.py index 7bcb0bf034..3acae6e5a4 100644 --- a/src/zarr/core/sync.py +++ b/src/zarr/core/sync.py @@ -13,7 +13,7 @@ from zarr.core.config import config if TYPE_CHECKING: - from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine + from collections.abc import AsyncIterator, Coroutine from typing import Any logger = logging.getLogger(__name__) @@ -210,17 +210,3 @@ async def iter_to_list() -> list[T]: return [item async for item in async_iterator] return self._sync(iter_to_list()) - - -async def _with_semaphore[T]( - func: Callable[[], Awaitable[T]], semaphore: asyncio.Semaphore | None = None -) -> T: - """ - Await the result of invoking the no-argument-callable ``func`` within the context manager - provided by a Semaphore, if one is provided. Otherwise, just await the result of invoking - ``func``. - """ - if semaphore is None: - return await func() - async with semaphore: - return await func() diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index f9e4ed375d..88bf6807ae 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -121,6 +121,7 @@ class FsspecStore(Store): def __init__( self, fs: AsyncFileSystem, + *, read_only: bool = False, path: str = "/", allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, diff --git a/src/zarr/storage/_local.py b/src/zarr/storage/_local.py index 96f1e61746..b2b557391e 100644 --- a/src/zarr/storage/_local.py +++ b/src/zarr/storage/_local.py @@ -19,12 +19,12 @@ ) from zarr.core.buffer import Buffer from zarr.core.buffer.core import default_buffer_prototype -from zarr.core.common import AccessModeLiteral, concurrent_map if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterable, Iterator from zarr.core.buffer import BufferPrototype + from zarr.core.common import AccessModeLiteral def _get(path: Path, prototype: BufferPrototype, byte_range: ByteRequest | None) -> Buffer: @@ -110,14 +110,19 @@ class LocalStore(Store): root: Path - def __init__(self, root: Path | str, *, read_only: bool = False) -> None: - super().__init__(read_only=read_only) + def __init__( + self, + root: Path | str, + *, + read_only: bool = False, + ) -> None: if isinstance(root, str): root = Path(root) if not isinstance(root, Path): raise TypeError( f"'root' must be a string or Path instance. Got an instance of {type(root)} instead." ) + super().__init__(read_only=read_only) self.root = root def with_read_only(self, read_only: bool = False) -> Self: @@ -262,12 +267,9 @@ async def get_partial_values( key_ranges: Iterable[tuple[str, ByteRequest | None]], ) -> list[Buffer | None]: # docstring inherited - args = [] - for key, byte_range in key_ranges: - assert isinstance(key, str) - path = self.root / key - args.append((_get, path, prototype, byte_range)) - return await concurrent_map(args, asyncio.to_thread, limit=None) # TODO: fix limit + return await asyncio.gather( + *[self.get(key, prototype, byte_range) for key, byte_range in key_ranges] + ) async def set(self, key: str, value: Buffer) -> None: # docstring inherited diff --git a/src/zarr/storage/_memory.py b/src/zarr/storage/_memory.py index 1194894b9d..a958530e1d 100644 --- a/src/zarr/storage/_memory.py +++ b/src/zarr/storage/_memory.py @@ -1,12 +1,12 @@ from __future__ import annotations +import asyncio from logging import getLogger from typing import TYPE_CHECKING, Any, Self from zarr.abc.store import ByteRequest, Store from zarr.core.buffer import Buffer, gpu from zarr.core.buffer.core import default_buffer_prototype -from zarr.core.common import concurrent_map from zarr.storage._utils import _normalize_byte_range_index if TYPE_CHECKING: @@ -145,12 +145,10 @@ async def get_partial_values( key_ranges: Iterable[tuple[str, ByteRequest | None]], ) -> list[Buffer | None]: # docstring inherited - - # All the key-ranges arguments goes with the same prototype - async def _get(key: str, byte_range: ByteRequest | None) -> Buffer | None: - return await self.get(key, prototype=prototype, byte_range=byte_range) - - return await concurrent_map(key_ranges, _get, limit=None) + # In-memory operations are fast and don't need concurrency limiting + return await asyncio.gather( + *[self.get(key, prototype, byte_range) for key, byte_range in key_ranges] + ) async def exists(self, key: str) -> bool: # docstring inherited diff --git a/src/zarr/storage/_obstore.py b/src/zarr/storage/_obstore.py index ffea523f9f..dba77b0357 100644 --- a/src/zarr/storage/_obstore.py +++ b/src/zarr/storage/_obstore.py @@ -6,7 +6,7 @@ from collections import defaultdict from itertools import chain from operator import itemgetter -from typing import TYPE_CHECKING, Self, TypedDict +from typing import TYPE_CHECKING, Self from zarr.abc.store import ( ByteRequest, @@ -15,15 +15,13 @@ Store, SuffixByteRequest, ) -from zarr.core.common import concurrent_map -from zarr.core.config import config from zarr.storage._utils import _relativize_path if TYPE_CHECKING: from collections.abc import AsyncGenerator, Coroutine, Iterable, Sequence from typing import Any - from obstore import ListResult, ListStream, ObjectMeta, OffsetRange, SuffixRange + from obstore import ListResult, ListStream, ObjectMeta from obstore.store import ObjectStore as _UpstreamObjectStore from zarr.core.buffer import Buffer, BufferPrototype @@ -66,7 +64,12 @@ def __eq__(self, value: object) -> bool: return self.store == value.store # type: ignore[no-any-return] - def __init__(self, store: T_Store, *, read_only: bool = False) -> None: + def __init__( + self, + store: T_Store, + *, + read_only: bool = False, + ) -> None: if not store.__class__.__module__.startswith("obstore"): raise TypeError(f"expected ObjectStore class, got {store!r}") super().__init__(read_only=read_only) @@ -101,41 +104,7 @@ async def get( import obstore as obs try: - if byte_range is None: - resp = await obs.get_async(self.store, key) - return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] - elif isinstance(byte_range, RangeByteRequest): - bytes = await obs.get_range_async( - self.store, key, start=byte_range.start, end=byte_range.end - ) - return prototype.buffer.from_bytes(bytes) # type: ignore[arg-type] - elif isinstance(byte_range, OffsetByteRequest): - resp = await obs.get_async( - self.store, key, options={"range": {"offset": byte_range.offset}} - ) - return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] - elif isinstance(byte_range, SuffixByteRequest): - # some object stores (Azure) don't support suffix requests. In this - # case, our workaround is to first get the length of the object and then - # manually request the byte range at the end. - try: - resp = await obs.get_async( - self.store, key, options={"range": {"suffix": byte_range.suffix}} - ) - return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type] - except obs.exceptions.NotSupportedError: - head_resp = await obs.head_async(self.store, key) - file_size = head_resp["size"] - suffix_len = byte_range.suffix - buffer = await obs.get_range_async( - self.store, - key, - start=file_size - suffix_len, - length=suffix_len, - ) - return prototype.buffer.from_bytes(buffer) # type: ignore[arg-type] - else: - raise ValueError(f"Unexpected byte_range, got {byte_range}") + return await self._get_impl(key, prototype, byte_range, obs) except _ALLOWED_EXCEPTIONS: return None @@ -145,7 +114,80 @@ async def get_partial_values( key_ranges: Iterable[tuple[str, ByteRequest | None]], ) -> list[Buffer | None]: # docstring inherited - return await _get_partial_values(self.store, prototype=prototype, key_ranges=key_ranges) + # We override to batch RangeByteRequests per-file using get_ranges_async for performance + import obstore as obs + + key_ranges = list(key_ranges) + # Group bounded range requests by path for batched fetching + per_file_bounded: dict[str, list[tuple[int, RangeByteRequest]]] = defaultdict(list) + other_requests: list[tuple[int, str, ByteRequest | None]] = [] + + for idx, (path, byte_range) in enumerate(key_ranges): + if isinstance(byte_range, RangeByteRequest): + per_file_bounded[path].append((idx, byte_range)) + else: + other_requests.append((idx, path, byte_range)) + + buffers: list[Buffer | None] = [None] * len(key_ranges) + + async def _fetch_ranges(path: str, requests: list[tuple[int, RangeByteRequest]]) -> None: + """Batch multiple range requests for the same file using get_ranges_async.""" + starts = [r.start for _, r in requests] + ends = [r.end for _, r in requests] + responses = await obs.get_ranges_async(self.store, path=path, starts=starts, ends=ends) + for (idx, _), response in zip(requests, responses, strict=True): + buffers[idx] = prototype.buffer.from_bytes(response) # type: ignore[arg-type] + + async def _fetch_one(idx: int, path: str, byte_range: ByteRequest | None) -> None: + """Fetch a single non-range request.""" + with contextlib.suppress(*_ALLOWED_EXCEPTIONS): + buffers[idx] = await self._get_impl(path, prototype, byte_range, obs) + + futs: list[Coroutine[Any, Any, None]] = [] + for path, requests in per_file_bounded.items(): + futs.append(_fetch_ranges(path, requests)) + for idx, path, byte_range in other_requests: + futs.append(_fetch_one(idx, path, byte_range)) + + await asyncio.gather(*futs) + return buffers + + async def _get_impl( + self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None, obs: Any + ) -> Buffer: + """Implementation of get without semaphore decoration.""" + if byte_range is None: + resp = await obs.get_async(self.store, key) + return prototype.buffer.from_bytes(await resp.bytes_async()) + elif isinstance(byte_range, RangeByteRequest): + bytes = await obs.get_range_async( + self.store, key, start=byte_range.start, end=byte_range.end + ) + return prototype.buffer.from_bytes(bytes) + elif isinstance(byte_range, OffsetByteRequest): + resp = await obs.get_async( + self.store, key, options={"range": {"offset": byte_range.offset}} + ) + return prototype.buffer.from_bytes(await resp.bytes_async()) + elif isinstance(byte_range, SuffixByteRequest): + try: + resp = await obs.get_async( + self.store, key, options={"range": {"suffix": byte_range.suffix}} + ) + return prototype.buffer.from_bytes(await resp.bytes_async()) + except obs.exceptions.NotSupportedError: + head_resp = await obs.head_async(self.store, key) + file_size = head_resp["size"] + suffix_len = byte_range.suffix + buffer = await obs.get_range_async( + self.store, + key, + start=file_size - suffix_len, + length=suffix_len, + ) + return prototype.buffer.from_bytes(buffer) + else: + raise ValueError(f"Unexpected byte_range, got {byte_range}") async def exists(self, key: str) -> bool: # docstring inherited @@ -208,8 +250,12 @@ async def delete_dir(self, prefix: str) -> None: prefix += "/" metas = await obs.list(self.store, prefix).collect_async() - keys = [(m["path"],) for m in metas] - await concurrent_map(keys, self.delete, limit=config.get("async.concurrency")) + + async def _delete_one(path: str) -> None: + with contextlib.suppress(FileNotFoundError): + await obs.delete_async(self.store, path) + + await asyncio.gather(*[_delete_one(m["path"]) for m in metas]) @property def supports_listing(self) -> bool: @@ -268,242 +314,3 @@ async def _transform_list_dir( list_result["common_prefixes"], map(itemgetter("path"), list_result["objects"]) ): yield _relativize_path(path=path, prefix=prefix) - - -class _BoundedRequest(TypedDict): - """Range request with a known start and end byte. - - These requests can be multiplexed natively on the Rust side with - `obstore.get_ranges_async`. - """ - - original_request_index: int - """The positional index in the original key_ranges input""" - - start: int - """Start byte offset.""" - - end: int - """End byte offset.""" - - -class _OtherRequest(TypedDict): - """Offset or suffix range requests. - - These requests cannot be concurrent on the Rust side, and each need their own call - to `obstore.get_async`, passing in the `range` parameter. - """ - - original_request_index: int - """The positional index in the original key_ranges input""" - - path: str - """The path to request from.""" - - range: OffsetRange | None - # Note: suffix requests are handled separately because some object stores (Azure) - # don't support them - """The range request type.""" - - -class _SuffixRequest(TypedDict): - """Offset or suffix range requests. - - These requests cannot be concurrent on the Rust side, and each need their own call - to `obstore.get_async`, passing in the `range` parameter. - """ - - original_request_index: int - """The positional index in the original key_ranges input""" - - path: str - """The path to request from.""" - - range: SuffixRange - """The suffix range.""" - - -class _Response(TypedDict): - """A response buffer associated with the original index that it should be restored to.""" - - original_request_index: int - """The positional index in the original key_ranges input""" - - buffer: Buffer - """The buffer returned from obstore's range request.""" - - -async def _make_bounded_requests( - store: _UpstreamObjectStore, - path: str, - requests: list[_BoundedRequest], - prototype: BufferPrototype, - semaphore: asyncio.Semaphore, -) -> list[_Response]: - """Make all bounded requests for a specific file. - - `obstore.get_ranges_async` allows for making concurrent requests for multiple ranges - within a single file, and will e.g. merge concurrent requests. This only uses one - single Python coroutine. - """ - import obstore as obs - - starts = [r["start"] for r in requests] - ends = [r["end"] for r in requests] - async with semaphore: - responses = await obs.get_ranges_async(store, path=path, starts=starts, ends=ends) - - buffer_responses: list[_Response] = [] - for request, response in zip(requests, responses, strict=True): - buffer_responses.append( - { - "original_request_index": request["original_request_index"], - "buffer": prototype.buffer.from_bytes(response), # type: ignore[arg-type] - } - ) - - return buffer_responses - - -async def _make_other_request( - store: _UpstreamObjectStore, - request: _OtherRequest, - prototype: BufferPrototype, - semaphore: asyncio.Semaphore, -) -> list[_Response]: - """Make offset or full-file requests. - - We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all - futures can be gathered together. - """ - import obstore as obs - - async with semaphore: - if request["range"] is None: - resp = await obs.get_async(store, request["path"]) - else: - resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) - buffer = await resp.bytes_async() - - return [ - { - "original_request_index": request["original_request_index"], - "buffer": prototype.buffer.from_bytes(buffer), # type: ignore[arg-type] - } - ] - - -async def _make_suffix_request( - store: _UpstreamObjectStore, - request: _SuffixRequest, - prototype: BufferPrototype, - semaphore: asyncio.Semaphore, -) -> list[_Response]: - """Make suffix requests. - - This is separated out from `_make_other_request` because some object stores (Azure) - don't support suffix requests. In this case, our workaround is to first get the - length of the object and then manually request the byte range at the end. - - We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all - futures can be gathered together. - """ - import obstore as obs - - async with semaphore: - try: - resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) - buffer = await resp.bytes_async() - except obs.exceptions.NotSupportedError: - head_resp = await obs.head_async(store, request["path"]) - file_size = head_resp["size"] - suffix_len = request["range"]["suffix"] - buffer = await obs.get_range_async( - store, - request["path"], - start=file_size - suffix_len, - length=suffix_len, - ) - - return [ - { - "original_request_index": request["original_request_index"], - "buffer": prototype.buffer.from_bytes(buffer), # type: ignore[arg-type] - } - ] - - -async def _get_partial_values( - store: _UpstreamObjectStore, - prototype: BufferPrototype, - key_ranges: Iterable[tuple[str, ByteRequest | None]], -) -> list[Buffer | None]: - """Make multiple range requests. - - ObjectStore has a `get_ranges` method that will additionally merge nearby ranges, - but it's _per_ file. So we need to split these key_ranges into **per-file** key - ranges, and then reassemble the results in the original order. - - We separate into different requests: - - - One call to `obstore.get_ranges_async` **per target file** - - One call to `obstore.get_async` for each other request. - """ - key_ranges = list(key_ranges) - per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list) - other_requests: list[_OtherRequest] = [] - suffix_requests: list[_SuffixRequest] = [] - - for idx, (path, byte_range) in enumerate(key_ranges): - if byte_range is None: - other_requests.append( - { - "original_request_index": idx, - "path": path, - "range": None, - } - ) - elif isinstance(byte_range, RangeByteRequest): - per_file_bounded_requests[path].append( - {"original_request_index": idx, "start": byte_range.start, "end": byte_range.end} - ) - elif isinstance(byte_range, OffsetByteRequest): - other_requests.append( - { - "original_request_index": idx, - "path": path, - "range": {"offset": byte_range.offset}, - } - ) - elif isinstance(byte_range, SuffixByteRequest): - suffix_requests.append( - { - "original_request_index": idx, - "path": path, - "range": {"suffix": byte_range.suffix}, - } - ) - else: - raise ValueError(f"Unsupported range input: {byte_range}") - - semaphore = asyncio.Semaphore(config.get("async.concurrency")) - - futs: list[Coroutine[Any, Any, list[_Response]]] = [] - for path, bounded_ranges in per_file_bounded_requests.items(): - futs.append( - _make_bounded_requests(store, path, bounded_ranges, prototype, semaphore=semaphore) - ) - - for request in other_requests: - futs.append(_make_other_request(store, request, prototype, semaphore=semaphore)) # noqa: PERF401 - - for suffix_request in suffix_requests: - futs.append(_make_suffix_request(store, suffix_request, prototype, semaphore=semaphore)) # noqa: PERF401 - - buffers: list[Buffer | None] = [None] * len(key_ranges) - - for responses in await asyncio.gather(*futs): - for resp in responses: - buffers[resp["original_request_index"]] = resp["buffer"] - - return buffers diff --git a/src/zarr/storage/_utils.py b/src/zarr/storage/_utils.py index 8939ead30b..a7e0b9cf13 100644 --- a/src/zarr/storage/_utils.py +++ b/src/zarr/storage/_utils.py @@ -3,15 +3,6 @@ import importlib import re from pathlib import Path - -if importlib.util.find_spec("upath"): - from upath.core import UPath -else: - - class UPath: # type: ignore[no-redef] - pass - - from typing import TYPE_CHECKING from zarr.abc.store import OffsetByteRequest, RangeByteRequest, SuffixByteRequest @@ -23,6 +14,14 @@ class UPath: # type: ignore[no-redef] from zarr.core.buffer import Buffer +if importlib.util.find_spec("upath"): + from upath.core import UPath +else: + + class UPath: # type: ignore[no-redef] + pass + + def normalize_path(path: str | bytes | Path | None) -> str: if path is None: result = "" diff --git a/tests/test_common.py b/tests/test_common.py index 0dedde1d6b..09cb6df2f8 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -31,10 +31,6 @@ def test_access_modes() -> None: assert set(ANY_ACCESS_MODE) == set(get_args(AccessModeLiteral)) -# todo: test -def test_concurrent_map() -> None: ... - - # todo: test def test_to_thread() -> None: ... diff --git a/tests/test_config.py b/tests/test_config.py index 4e293e968f..cce736f4dd 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -57,7 +57,7 @@ def test_config_defaults_set() -> None: "target_shard_size_bytes": None, "rectilinear_chunks": False, }, - "async": {"concurrency": 10, "timeout": None}, + "async": {"timeout": None}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { @@ -103,7 +103,6 @@ def test_config_defaults_set() -> None: ] ) assert config.get("array.order") == "C" - assert config.get("async.concurrency") == 10 assert config.get("async.timeout") is None assert config.get("codec_pipeline.batch_size") == 1 assert config.get("json_indent") == 2 @@ -111,7 +110,7 @@ def test_config_defaults_set() -> None: @pytest.mark.parametrize( ("key", "old_val", "new_val"), - [("array.order", "C", "F"), ("async.concurrency", 10, 128), ("json_indent", 2, 0)], + [("array.order", "C", "F"), ("json_indent", 2, 0)], ) def test_config_defaults_can_be_overridden(key: str, old_val: Any, new_val: Any) -> None: assert config.get(key) == old_val diff --git a/tests/test_group.py b/tests/test_group.py index e53b0b9ea0..23ea8645f8 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -23,7 +23,6 @@ from zarr.core import sync_group from zarr.core._info import GroupInfo from zarr.core.buffer import default_buffer_prototype -from zarr.core.config import config as zarr_config from zarr.core.dtype.common import unpack_dtype_json from zarr.core.dtype.npy.int import UInt8 from zarr.core.group import ( @@ -1736,29 +1735,6 @@ async def test_create_nodes( assert node_spec == {k: v.metadata for k, v in observed_nodes.items()} -@pytest.mark.parametrize("store", ["memory"], indirect=True) -def test_create_nodes_concurrency_limit(store: MemoryStore) -> None: - """ - Test that the execution time of create_nodes can be constrained by the async concurrency - configuration setting. - """ - set_latency = 0.02 - num_groups = 10 - groups = {str(idx): GroupMetadata() for idx in range(num_groups)} - - latency_store = LatencyStore(store, set_latency=set_latency) - - # check how long it takes to iterate over the groups - # if create_nodes is sensitive to IO latency, - # this should take (num_groups * get_latency) seconds - # otherwise, it should take only marginally more than get_latency seconds - with zarr_config.set({"async.concurrency": 1}): - start = time.time() - _ = tuple(sync_group.create_nodes(store=latency_store, nodes=groups)) - elapsed = time.time() - start - assert elapsed > num_groups * set_latency - - @pytest.mark.parametrize( ("a_func", "b_func"), [ @@ -2248,38 +2224,6 @@ def test_group_members_performance(store: Store) -> None: assert elapsed < (num_groups * get_latency) -@pytest.mark.parametrize("store", ["memory"], indirect=True) -def test_group_members_concurrency_limit(store: MemoryStore) -> None: - """ - Test that the execution time of Group.members can be constrained by the async concurrency - configuration setting. - """ - get_latency = 0.02 - - # use the input store to create some groups - group_create = zarr.group(store=store) - num_groups = 10 - - # Create some groups - for i in range(num_groups): - group_create.create_group(f"group{i}") - - latency_store = LatencyStore(store, get_latency=get_latency) - # create a group with some latency on get operations - group_read = zarr.group(store=latency_store) - - # check how long it takes to iterate over the groups - # if .members is sensitive to IO latency, - # this should take (num_groups * get_latency) seconds - # otherwise, it should take only marginally more than get_latency seconds - with zarr_config.set({"async.concurrency": 1}): - start = time.time() - _ = group_read.members() - elapsed = time.time() - start - - assert elapsed > num_groups * get_latency - - @pytest.mark.parametrize("option", ["array", "group", "invalid"]) def test_build_metadata_v3(option: Literal["array", "group", "invalid"]) -> None: """