Skip to content

Commit 4d4b4bc

Browse files
committed
Add esgpull downloader
1 parent 6ed11ca commit 4d4b4bc

7 files changed

Lines changed: 545 additions & 213 deletions

File tree

climateset/download/cmip6_downloader.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
CMIP6DownloaderConfig,
55
create_cmip6_downloader_config_from_file,
66
)
7+
from climateset.download.esgpull_utils import (
8+
esgpull_search_and_download_esgf_model_single_var,
9+
)
710
from climateset.download.utils import search_and_download_esgf_model_single_var
811
from climateset.utils import create_logger
912

@@ -80,7 +83,79 @@ def download_from_model_single_var(
8083
self.logger.info(f"Download results: {results_list}")
8184

8285

86+
class CMIP6DownloaderV2(AbstractDownloader):
87+
def __init__(self, config: CMIP6DownloaderConfig, distrib: bool = True):
88+
self.logger = LOGGER
89+
self.distrib = distrib
90+
self.config = config
91+
92+
def download(self):
93+
"""
94+
Function handling the download of all variables that are associated with a model's output.
95+
96+
Searches for all files associated with the respected variables and experiment that the downloader
97+
was initialized with.
98+
99+
A search connection is established and the search is iteratively constraint to meet all specifications.
100+
Data is downloaded and stored in a separate file for each year. The default format is netCDF4.
101+
102+
Resulting hierarchy:
103+
104+
`CMIPx/model_id/ensemble_member/experiment/variable/nominal_resolution/frequency/year.nc`
105+
106+
If the constraints cannot be met, per default behaviour for the downloader to select first other
107+
available value
108+
"""
109+
for model in self.config.models:
110+
self.logger.info(f"Downloading data for model: [{model}]")
111+
for variable in self.config.variables:
112+
self.logger.info(f"Downloading data for variable: [{variable}]")
113+
for experiment in self.config.experiments:
114+
self.logger.info(f"Downloading data for experiment: [{experiment}]")
115+
self.download_from_model_single_var(
116+
model=model, project=self.config.project, variable=variable, experiment=experiment
117+
)
118+
119+
def download_from_model_single_var(
120+
self,
121+
model: str,
122+
variable: str,
123+
experiment: str,
124+
project: str = CMIP6,
125+
default_frequency: str = "mon",
126+
preferred_version: str = "latest",
127+
default_grid_label: str = "gn",
128+
):
129+
"""
130+
Function handling the download of a single variable-experiment pair that is associated with a model's output
131+
(CMIP data).
132+
133+
Args:
134+
model (str): The model ID
135+
variable: variable ID
136+
experiment: experiment ID
137+
project: umbrella project id e.g. CMIPx
138+
default_frequency: default frequency to download
139+
preferred_version: data upload version, if 'latest', the newest version will get selected always
140+
default_grid_label: default gridding method in which the data is provided
141+
"""
142+
results_list = esgpull_search_and_download_esgf_model_single_var(
143+
model=model,
144+
variable=variable,
145+
experiment=experiment,
146+
project=project,
147+
default_frequency=default_frequency,
148+
default_grid_label=default_grid_label,
149+
preferred_version=preferred_version,
150+
ensemble_members=self.config.ensemble_members,
151+
max_ensemble_members=self.config.max_ensemble_members,
152+
data_dir=self.config.data_dir,
153+
distrib=self.distrib,
154+
)
155+
self.logger.info(f"Download results: {results_list}")
156+
157+
83158
def cmip6_download_from_config(config):
84159
config_object = create_cmip6_downloader_config_from_file(config)
85-
downloader = CMIP6Downloader(config=config_object)
160+
downloader = CMIP6DownloaderV2(config=config_object)
86161
downloader.download()
Lines changed: 45 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -1,131 +1,16 @@
1-
import asyncio
2-
import logging
3-
import shutil
41
from pathlib import Path
52

6-
from esgpull.models import Options, Query, Selection
7-
83
from climateset.download.abstract_downloader import AbstractDownloader
9-
from climateset.download.constraints import CMIP6Constraints, Input4MIPsConstraints
10-
from climateset.download.utils import isolated_esgpull_context
4+
from climateset.download.esgpull_utils import (
5+
esgpull_search_and_download_esgf_biomass_single_var,
6+
esgpull_search_and_download_esgf_model_single_var,
7+
esgpull_search_and_download_esgf_raw_single_var,
8+
)
119
from climateset.utils import create_logger
1210

13-
# Configure esgpull Selection to accept additional custom facets
14-
Selection.configure("target_mip", "version", replace=False)
15-
1611
LOGGER = create_logger(__name__)
1712

1813

19-
def _download_and_move_files(esg, files, dest_dir: Path, logger: logging.Logger):
20-
"""Downloads tracked files natively via esgpull (asyncio) and moves them from the isolated cache to the final
21-
requested target directory."""
22-
if not files:
23-
logger.info("No files to download.")
24-
return []
25-
26-
logger.info(f"Adding {len(files)} files to esgpull DB...")
27-
# Add tracked files to the isolated internal DB queue
28-
esg.db.add(*files)
29-
30-
async def _run_download():
31-
logger.info("Starting esg.download async...")
32-
return await esg.download(files, show_progress=False)
33-
34-
logger.info("Executing asyncio.run(_run_download())...")
35-
# Execute async native download
36-
downloaded, errors = asyncio.run(_run_download())
37-
logger.info(f"Download complete. Downloaded: {len(downloaded)}, Errors: {len(errors)}")
38-
39-
if errors:
40-
for err in errors:
41-
logger.error(f"Download error: {err}")
42-
43-
# Transfer from cache to strictly formatted project tree
44-
if isinstance(dest_dir, str):
45-
dest_dir = Path(dest_dir)
46-
47-
dest_dir.mkdir(parents=True, exist_ok=True)
48-
49-
moved_files = []
50-
data_cache_dir = esg.config.paths.data
51-
if data_cache_dir.exists():
52-
for nc_file in data_cache_dir.rglob("*.nc"):
53-
dest_file = dest_dir / nc_file.name
54-
logger.info(f"Moving {nc_file.name} to {dest_dir}")
55-
shutil.move(str(nc_file), str(dest_file))
56-
moved_files.append(dest_file)
57-
58-
return moved_files
59-
60-
61-
def _apply_facet_fallback(esg, query: Query, facet_name: str, preferred_value: str | None, logger: logging.Logger):
62-
"""Query the available facets and fall back if preferred_value is not found."""
63-
hints = esg.context.hints(query, file=False, facets=[facet_name])
64-
if hints and facet_name in hints[0] and hints[0][facet_name]:
65-
available_facets = list(hints[0][facet_name].keys())
66-
logger.info(f"Available {facet_name}: {available_facets}")
67-
68-
if preferred_value and preferred_value in available_facets:
69-
logger.info(f"Choosing {facet_name}: {preferred_value}")
70-
query.selection[facet_name] = [preferred_value]
71-
else:
72-
if preferred_value:
73-
logger.warning(f"Preferred {facet_name} '{preferred_value}' not available.")
74-
chosen = available_facets[0]
75-
logger.info(f"Choosing {facet_name} {chosen} instead.")
76-
query.selection[facet_name] = [chosen]
77-
else:
78-
logger.warning(f"No {facet_name} found.")
79-
80-
81-
def _apply_version_fallback(esg, query: Query, preferred_version: str | None, logger: logging.Logger):
82-
if preferred_version == "latest" or preferred_version is None:
83-
# Use latest=True in options. Since Options is an Enum-backed mapped model,
84-
# we can recreate it preserving the existing distrib option
85-
is_distrib = query.options.distrib.name == "true"
86-
query.options = Options(distrib=is_distrib, latest=True)
87-
logger.info("Choosing latest version.")
88-
else:
89-
hints = esg.context.hints(query, file=False, facets=["version"])
90-
if hints and "version" in hints[0] and hints[0]["version"]:
91-
available_versions = list(hints[0]["version"].keys())
92-
if preferred_version in available_versions:
93-
query.selection["version"] = [preferred_version]
94-
else:
95-
logger.warning(
96-
f"Preferred version {preferred_version} does not exist. Resuming with latest {available_versions[0]}"
97-
)
98-
query.selection["version"] = [available_versions[0]]
99-
100-
101-
def _apply_variants_filter(
102-
esg,
103-
query: Query,
104-
max_ensemble_members: int,
105-
ensemble_members: list[str],
106-
logger: logging.Logger,
107-
) -> list[str]:
108-
hints = esg.context.hints(query, file=False, facets=["variant_label"])
109-
if not hints or "variant_label" not in hints[0] or not hints[0]["variant_label"]:
110-
return []
111-
112-
variants = list(hints[0]["variant_label"].keys())
113-
logger.info(f"Available variants : {variants}\nLength : {len(variants)}")
114-
115-
if not ensemble_members:
116-
if max_ensemble_members > len(variants):
117-
logger.info("Less ensemble members available than maximum number desired. Including all variants.")
118-
return variants
119-
logger.info(
120-
f"{len(variants)} ensemble members available, desired (max {max_ensemble_members}). Choosing only the first {max_ensemble_members}."
121-
)
122-
return variants[:max_ensemble_members]
123-
124-
logger.info(f"Desired list of ensemble members given: {ensemble_members}")
125-
ensemble_member_final_list = list(set(variants) & set(ensemble_members))
126-
return ensemble_member_final_list
127-
128-
12914
class EsgpullDownloader(AbstractDownloader):
13015
def __init__(self, config=None, distrib: bool = False):
13116
self.config = config
@@ -139,7 +24,7 @@ def download(self):
13924
# so for now `download` can just invoke search to satisfy the interface.
14025
pass
14126

142-
def search_and_download_esgf_raw_single_var(
27+
def esgpull_search_and_download_esgf_raw_single_var(
14328
self,
14429
variable: str,
14530
institution_id: str,
@@ -149,33 +34,19 @@ def search_and_download_esgf_raw_single_var(
14934
preferred_version: str,
15035
data_dir: Path | str,
15136
):
152-
with isolated_esgpull_context(data_dir) as esg:
153-
initial_constraints = Input4MIPsConstraints(
154-
project=project, institution_id=institution_id, variable=variable
155-
).to_esgpull_query()
156-
157-
query = Query(selection=initial_constraints)
158-
query.options.distrib = self.distrib
159-
160-
_apply_facet_fallback(esg, query, "grid_label", default_grid_label, self.logger)
161-
_apply_facet_fallback(esg, query, "frequency", default_frequency, self.logger)
162-
163-
# Esgpull handles multi-values natively. Fetch targets if any.
164-
hints = esg.context.hints(query, file=False, facets=["target_mip"])
165-
if hints and "target_mip" in hints[0] and hints[0]["target_mip"]:
166-
target_mips = list(hints[0]["target_mip"].keys())
167-
self.logger.info(f"Available target mips: {target_mips}")
168-
query.selection["target_mip"] = target_mips
169-
170-
_apply_version_fallback(esg, query, preferred_version, self.logger)
171-
172-
files = esg.context.search(query, file=True)
173-
self.logger.info(f"Result len: {len(files)}")
174-
175-
dest_dir = Path(data_dir) / f"{project}/raw_input_vars/{institution_id}/{variable}"
176-
return _download_and_move_files(esg, files, dest_dir, self.logger)
37+
return esgpull_search_and_download_esgf_raw_single_var(
38+
variable=variable,
39+
institution_id=institution_id,
40+
project=project,
41+
default_grid_label=default_grid_label,
42+
default_frequency=default_frequency,
43+
preferred_version=preferred_version,
44+
data_dir=data_dir,
45+
distrib=self.distrib,
46+
logger=self.logger,
47+
)
17748

178-
def search_and_download_esgf_biomass_single_var(
49+
def esgpull_search_and_download_esgf_biomass_single_var(
17950
self,
18051
variable: str,
18152
variable_id: str,
@@ -186,28 +57,20 @@ def search_and_download_esgf_biomass_single_var(
18657
preferred_version: str,
18758
data_dir: Path | str,
18859
):
189-
with isolated_esgpull_context(data_dir) as esg:
190-
initial_constraints = Input4MIPsConstraints(
191-
project=project,
192-
institution_id=institution_id,
193-
variable=variable,
194-
variable_id=variable_id,
195-
).to_esgpull_query()
196-
197-
query = Query(selection=initial_constraints)
198-
query.options.distrib = self.distrib
199-
200-
_apply_facet_fallback(esg, query, "grid_label", default_grid_label, self.logger)
201-
_apply_facet_fallback(esg, query, "frequency", default_frequency, self.logger)
202-
_apply_version_fallback(esg, query, preferred_version, self.logger)
203-
204-
files = esg.context.search(query, file=True)
205-
self.logger.info(f"Result len: {len(files)}")
206-
207-
dest_dir = Path(data_dir) / f"{project}/meta_vars/{institution_id}/{variable}"
208-
return _download_and_move_files(esg, files, dest_dir, self.logger)
60+
return esgpull_search_and_download_esgf_biomass_single_var(
61+
variable=variable,
62+
variable_id=variable_id,
63+
institution_id=institution_id,
64+
project=project,
65+
default_grid_label=default_grid_label,
66+
default_frequency=default_frequency,
67+
preferred_version=preferred_version,
68+
data_dir=data_dir,
69+
distrib=self.distrib,
70+
logger=self.logger,
71+
)
20972

210-
def search_and_download_esgf_model_single_var(
73+
def esgpull_search_and_download_esgf_model_single_var(
21174
self,
21275
model: str,
21376
variable: str,
@@ -220,30 +83,17 @@ def search_and_download_esgf_model_single_var(
22083
ensemble_members: list[str],
22184
data_dir: Path | str,
22285
):
223-
with isolated_esgpull_context(data_dir) as esg:
224-
cmip_constraints = CMIP6Constraints(
225-
project=project, experiment_id=experiment, source_id=model, variable=variable
226-
).to_esgpull_query()
227-
228-
query = Query(selection=cmip_constraints)
229-
query.options.distrib = self.distrib
230-
231-
_apply_facet_fallback(esg, query, "frequency", default_frequency, self.logger)
232-
_apply_facet_fallback(esg, query, "grid_label", default_grid_label, self.logger)
233-
234-
ensemble_member_final_list = _apply_variants_filter(
235-
esg, query, max_ensemble_members, ensemble_members, self.logger
236-
)
237-
if not ensemble_member_final_list:
238-
self.logger.info("No items were found for this request.")
239-
return None
240-
241-
# Esgpull supports multi-value list queries seamlessly
242-
query.selection["variant_label"] = ensemble_member_final_list
243-
_apply_version_fallback(esg, query, preferred_version, self.logger)
244-
245-
files = esg.context.search(query, file=True)
246-
self.logger.info(f"Result len {len(files)}")
247-
248-
dest_dir = Path(data_dir) / f"{project}/{model}/{variable}"
249-
return _download_and_move_files(esg, files, dest_dir, self.logger)
86+
return esgpull_search_and_download_esgf_model_single_var(
87+
model=model,
88+
variable=variable,
89+
experiment=experiment,
90+
project=project,
91+
default_grid_label=default_grid_label,
92+
default_frequency=default_frequency,
93+
preferred_version=preferred_version,
94+
max_ensemble_members=max_ensemble_members,
95+
ensemble_members=ensemble_members,
96+
data_dir=data_dir,
97+
distrib=self.distrib,
98+
logger=self.logger,
99+
)

0 commit comments

Comments
 (0)