Skip to content

Commit 36613f4

Browse files
authored
Merge pull request #923 from davidhassell/parallel-read
Enable parallel netCDF dataset reads
2 parents 05d37d7 + 040cf0e commit 36613f4

22 files changed

Lines changed: 124 additions & 211 deletions

Changelog.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@ Version NEXTVERSION
33

44
**2026-??-??**
55

6+
* New default backend for netCDF-4 in `cf.read` that allows parallel
7+
reading: ``h5netcdf-pyfive``
8+
(https://github.com/NCAS-CMS/cf-python/issues/912)
9+
* New optional backend for netCDF-3 in `cf.read` that allows parallel
10+
reading: ``netcdf_file``
11+
(https://github.com/NCAS-CMS/cf-python/issues/912)
12+
* Changed dependency: ``cfdm>=1.13.1.0, <1.13.2.0``
613
* Fix for subspacing with cyclic `cf.wi` and `cf.wo` arguments
714
(https://github.com/NCAS-CMS/cf-python/issues/887)
815

cf/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,12 @@
153153
H5netcdfArray,
154154
NetCDFArray,
155155
NetCDF4Array,
156+
PyfiveArray,
156157
PointTopologyArray,
157158
RaggedContiguousArray,
158159
RaggedIndexedArray,
159160
RaggedIndexedContiguousArray,
161+
ScipyNetcdfFileArray,
160162
SubsampledArray,
161163
UMArray,
162164
ZarrArray,

cf/cfimplementation.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
H5netcdfArray,
3636
NetCDF4Array,
3737
PointTopologyArray,
38+
PyfiveArray,
3839
RaggedContiguousArray,
3940
RaggedIndexedArray,
4041
RaggedIndexedContiguousArray,
42+
ScipyNetcdfFileArray,
4143
SubsampledArray,
4244
ZarrArray,
4345
)
@@ -147,7 +149,9 @@ def set_construct(self, parent, construct, axes=None, copy=True, **kwargs):
147149
GatheredArray=GatheredArray,
148150
H5netcdfArray=H5netcdfArray,
149151
NetCDF4Array=NetCDF4Array,
152+
ScipyNetcdfFileArray=ScipyNetcdfFileArray,
150153
PointTopologyArray=PointTopologyArray,
154+
PyfiveArray=PyfiveArray,
151155
Quantization=Quantization,
152156
RaggedContiguousArray=RaggedContiguousArray,
153157
RaggedIndexedArray=RaggedIndexedArray,
@@ -204,7 +208,9 @@ def implementation():
204208
'GatheredArray': cf.data.array.gatheredarray.GatheredArray,
205209
'H5netcdfArray': cf.data.array.h5netcdfarray.H5netcdfArray,
206210
'NetCDF4Array': cf.data.array.netcdf4array.NetCDF4Array,
211+
'ScipyNetcdfFileArray': cf.data.array.scipynetcdffilearray.ScipyNetcdfFileArray,
207212
'PointTopologyArray': <class 'cf.data.array.pointtopologyarray.PointTopologyArray'>,
213+
'PyfiveArray': cf.data.array.pyfivearray.PyfiveArray,
208214
'Quantization': cf.quantization.Quantization,
209215
'RaggedContiguousArray': cf.data.array.raggedcontiguousarray.RaggedContiguousArray,
210216
'RaggedIndexedArray': cf.data.array.raggedindexedarray.RaggedIndexedArray,

cf/data/array/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
from .h5netcdfarray import H5netcdfArray
77
from .netcdfarray import NetCDFArray
88
from .netcdf4array import NetCDF4Array
9+
from .scipynetcdffilearray import ScipyNetcdfFileArray
910
from .pointtopologyarray import PointTopologyArray
11+
from .pyfivearray import PyfiveArray
1012
from .raggedcontiguousarray import RaggedContiguousArray
1113
from .raggedindexedarray import RaggedIndexedArray
1214
from .raggedindexedcontiguousarray import RaggedIndexedContiguousArray

cf/data/array/h5netcdfarray.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ class H5netcdfArray(
99
Container,
1010
cfdm.H5netcdfArray,
1111
):
12-
"""A netCDF array accessed with `h5netcdf`.
12+
"""A netCDF array accessed with `h5netcdf` using the `h5py` backend.
1313
1414
.. versionadded:: 3.16.3
1515

cf/data/array/pyfivearray.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import cfdm
2+
3+
from ...mixin_container import Container
4+
from .mixin import ActiveStorageMixin
5+
6+
7+
class PyfiveArray(
8+
ActiveStorageMixin,
9+
Container,
10+
cfdm.PyfiveArray,
11+
):
12+
"""A netCDF array accessed with `pyfive`.
13+
14+
.. versionadded:: NEXTVERSION
15+
16+
"""
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import cfdm
2+
3+
from ...mixin_container import Container
4+
5+
6+
class ScipyNetcdfFileArray(
7+
Container,
8+
cfdm.ScipyNetcdfFileArray,
9+
):
10+
"""A netCDF-3 array accessed with `scipy.io.netcdf_file`.
11+
12+
.. versionadded:: NEXTVERSION
13+
14+
"""

cf/data/collapse/collapse_active.py

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -186,35 +186,35 @@ def active_chunk_function(method, *args, **kwargs):
186186
# with `cf.active_storage(True)`
187187
from activestorage import Active
188188

189-
filename = x.get_filename()
190-
address = x.get_address()
191-
max_requests = active_storage_max_requests()
189+
info = is_log_level_info(logger)
190+
191+
max_requests = active_storage_max_requests().value
192+
storage_options = None
193+
address = None
194+
dataset = x.get_variable(None)
195+
if dataset is None:
196+
# Dateaset is a string, not a variable object.
197+
storage_options = x.get_storage_options()
198+
address = x.get_address()
199+
dataset = x.get_filename()
200+
192201
active_kwargs = {
193-
"uri": "/".join(filename.split("/")[3:]),
202+
"dataset": dataset,
194203
"ncvar": address,
195-
"storage_options": x.get_storage_options(),
204+
"axis": axis,
205+
"storage_options": storage_options,
196206
"active_storage_url": url,
197-
"storage_type": "s3",
198207
"max_threads": max_requests,
199208
}
200-
# WARNING: The "uri", "storage_options", and "storage_type" keys
201-
# of the `active_kwargs` dictionary are currently
202-
# formatted according to the whims of the `Active` class
203-
# (i.e. the pyfive branch of PyActiveStorage). Future
204-
# versions of `Active` will have a better API, that will
205-
# require improvements to `active_kwargs`.
206209

207210
index = x.index()
208-
209-
details = (
210-
f"{method!r} (file={filename}, address={address}, url={url}, "
211-
f"Dask chunk={index})"
212-
)
213-
214-
info = is_log_level_info(logger)
215211
if info:
216212
# Do some detailed logging
217213
start = time.time()
214+
details = (
215+
f"{method!r} (dataset={dataset!r}, ncvar={address}, "
216+
f"Dask chunk={index})"
217+
)
218218
logger.info(
219219
f"STARTED active storage {details}: {datetime.datetime.now()}"
220220
) # pragma: no cover
@@ -227,8 +227,7 @@ def active_chunk_function(method, *args, **kwargs):
227227
# reduction on the remote server
228228
#
229229
# WARNING: The `_version` API of `Active` is likely to change from
230-
# the current version (i.e. the pyfive branch of
231-
# PyActiveStorage)
230+
# the current version
232231
active._version = 2
233232

234233
# ----------------------------------------------------------------

cf/data/utils.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -423,10 +423,12 @@ def collapse(
423423
kwargs["ddof"] = ddof
424424

425425
# The applicable chunk function will have its own call to
426-
# 'cfdm_to_memory', so we can set '_force_to_memory=False'. Also,
427-
# setting _force_to_memory=False will ensure that any active
428-
# storage operations are not compromised.
429-
dx = d.to_dask_array(_force_to_memory=False)
426+
# 'cfdm_to_memory', so we can set '_force_to_memory=False'.
427+
# Setting _force_to_memory=False will also ensure that any active
428+
# storage operations are not compromised. We can set
429+
# _force_mask_hardness=False because collapse operations do not
430+
# need to ever unset masked values.
431+
dx = d.to_dask_array(_force_mask_hardness=False, _force_to_memory=False)
430432
dx = func(dx, **kwargs)
431433
d._set_dask(dx)
432434

cf/flags.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,5 +454,5 @@ def sort(self):
454454

455455
for attr in ("_flag_values", "_flag_meanings", "_flag_masks"):
456456
if hasattr(self, attr):
457-
array = getattr(self, attr).view()
458-
array[...] = array[indices]
457+
array = getattr(self, attr)[indices]
458+
setattr(self, attr, array)

0 commit comments

Comments
 (0)