Source code for ecmwf_models.era5.download

# -*- coding: utf-8 -*-
"""
Module to download ERA5 from terminal in netcdf and grib format.
"""
import warnings
import os
import logging
from datetime import datetime, time, timedelta
import shutil
import cdsapi
import numpy as np
import pandas as pd

from c3s_sm.misc import read_summary_yml

from repurpose.process import parallel_process
from repurpose.misc import delete_empty_directories

from ecmwf_models.utils import (
    lookup,
    update_image_summary_file,
    default_variables,
    split_array,
    check_api_ready,
    get_first_last_image_date
)
from ecmwf_models.extract import (
    save_ncs_from_nc,
    save_gribs_from_grib,
    unzip_nc,
)


[docs]def split_chunk(timestamps, n_vars, n_hsteps, max_req_size=1000, reduce=False, daily_request=False): """ Split the passed time stamps into chunks for a valid request. One chunk can at most hold data for one month or one day, but cannot be larger than the maximum request size. Parameters ---------- timestamps: pd.DatetimeIndex List of daily timestamps to split into chunks n_vars: int Number of variables in each request. max_req_size: int, optional (default: 1000) Maximum size of a request that the CDS API can handle reduce: bool, optional (default: False) Return only the start and end of each subperiod instead of all time stamps. daily_request: bool, optional (default: False) Only submit daily requests, otherwise monthly requests are allowed (if the max_req_size is not reached). Returns ------- chunks: list List of start and end dates that contain a chunk that the API can handle. """ n = int(max_req_size / n_vars / n_hsteps) def yield_chunk(): for _, chunk_year in timestamps.groupby(timestamps.year).items(): for _, chunk_month in chunk_year.groupby(chunk_year.month).items(): if daily_request: for _, chunk_day in chunk_month.groupby( chunk_month.day).items(): yield chunk_day else: yield chunk_month # each chunk contains either time stamps for one month, or for less, # if the request of one month would be too large. all_chunks = [] for chunk in yield_chunk(): if len(chunk) > n: chunks = split_array(chunk, n) else: chunks = np.array([chunk]) for chunk in chunks: if reduce: all_chunks.append(np.array([chunk[0], chunk[-1]])) else: all_chunks.append(chunk) return all_chunks
[docs]def download_era5( c, years, months, days, h_steps, variables, target, grb=False, bbox=None, product="era5", dry_run=False, cds_kwds={}, ): """ Download era5 reanalysis data for single levels of a defined time span Parameters ---------- c : cdsapi.Client Client to pass the request to years : list Years for which data is downloaded ,e.g. [2017, 2018] months : list Months for which data is downloaded, e.g. [4, 8, 12] days : list Days for which data is downloaded (range(31)=All days) e.g. [10, 20, 31] h_steps: list List of full hours to download data at the selected dates e.g [0, 12] variables : list, optional (default: None) List of variables to pass to the client, if None are passed, the default variables will be downloaded. target : str File name, where the data is stored. grb : bool, optional (default: False) Download data in grib format instead of netcdf bbox: Tuple[int,int,int,int], optional (default: None) Bounding box of the area to download (min_lon, min_lat, max_lon, max_lat) - wgs84. None will download global images. product : str ERA5 data product to download, either era5 or era5-land dry_run: bool, optional (default: False) Do not download anything, this is just used for testing the functionality cds_kwds: dict, optional Additional arguments to be passed to the CDS API retrieve request. Returns --------- success : bool Return True after downloading finished """ if dry_run: return request = { "data_format": "grib" if grb else "netcdf", "download_format": "zip", "variable": variables, "year": [str(y) for y in years], "month": [str(m).zfill(2) for m in months], "day": [str(d).zfill(2) for d in days], "time": [time(h, 0).strftime("%H:%M") for h in h_steps], } if bbox is not None: # maxlat, minlon, minlat, maxlon request["area"] = [bbox[3], bbox[0], bbox[1], bbox[2]] request.update(cds_kwds) if product == "era5": request["product_type"] = ["reanalysis"] c.retrieve("reanalysis-era5-single-levels", request, target) elif product == "era5-land": c.retrieve("reanalysis-era5-land", request, target) else: raise ValueError( product, "Unknown product, choose either 'era5' or 'era5-land'") return True
[docs]class CDSStatusTracker: """ Track the status of the CDS download by using the CDS callback functions """ statuscode_ok = 0 statuscode_error = -1 statuscode_unavailable = 10 statuscode_terms_not_accepted = 11 def __init__(self, logger=logging.getLogger()): self.download_statuscode = self.statuscode_ok self.logger = logger
[docs] def handle_error_function(self, *args, **kwargs): if len(args) == 1: error_str = str(args[0]) if '\n' in error_str: message_prefix, message_body = error_str.split('\n', 1) else: message_prefix = error_str message_body = '' else: message_prefix = args[0] message_body = args[1] if self.download_statuscode != self.statuscode_unavailable: if (message_prefix.startswith("Reason:") and message_body == "Request returned no data"): self.download_statuscode = self.statuscode_unavailable elif ("Not all the required licences have been accepted" in message_body): self.download_statuscode = self.statuscode_terms_not_accepted else: self.download_statuscode = self.statuscode_error self.logger.error(*args, **kwargs)
[docs]def download_and_move( target_path, startdate, enddate, product="era5", variables=None, keep_original=False, h_steps=(0, 6, 12, 18), grb=False, bbox=None, dry_run=False, grid=None, remap_method="bil", cds_kwds=None, stepsize="month", n_max_request=1000, keep_prelim=True, cds_token=None, ) -> int: """ Downloads the data from the ECMWF servers and moves them to the target path. This is done in 30 day increments between start and end date. The files are then extracted into separate grib files per parameter and stored in yearly folders under the target_path. Parameters ---------- target_path : str Path where the files are stored to startdate: datetime first date to download enddate: datetime last date to download product : str, optional (default: era5) Either era5 or era5-land variables : list, optional (default: None) Name of variables to download, see the documentation for all variable names. If None is chosen, then the 'default' variables are downloaded. keep_original: bool (default: False) If True, keep the original downloaded data stack as received from CDS after slicing individual time stamps. h_steps: tuple, optional (default: (0, 6, 12, 18)) List of full hours to download data for at the selected dates e.g [0, 12] would download at 0:00 and 12:00. Only full hours are possible. grb: bool, optional (default: False) Download data as grib files instead of netcdf. Note that downloading in grib format, does not allow on-the-fly resampling (`grid` argument) bbox: Tuple[int,int,int,int], optional (default: None) Bounding box of the area to download (min_lon, min_lat, max_lon, max_lat) - wgs84. None will download global images. dry_run: bool Do not download anything, this is just used for testing the functions grid : dict, optional (default: None) A grid on which to remap the data using CDO. This must be a dictionary using CDO's grid description format, e.g.:: grid = { "gridtype": "lonlat", "xsize": 720, "ysize": 360, "xfirst": -179.75, "yfirst": 89.75, "xinc": 0.5, "yinc": -0.5, } Default is to use no regridding. To use this option, it is necessary that CDO is installed. remap_method : str, optional (dafault: 'bil') Method to be used for regridding. Available methods are: - "bil": bilinear (default) - "bic": bicubic - "nn": nearest neighbour - "dis": distance weighted - "con": 1st order conservative remapping - "con2": 2nd order conservative remapping - "laf": largest area fraction remapping cds_kwds: dict, optional (default: None) Additional keyword arguments to be passed to the CDS API request. This might be useful in the future, when new server-side options are added which are not yet directly supported by this package. n_max_request : int, optional (default: 1000) Maximum size that a request can have to be processed by CDS. At the moment of writing this is 1000 (N_timstamps * N_variables in a request) but as this is a server side settings, it can change. keep_prelim: bool, optional (default: True) Keep preliminary data from ERA5T under a different file name. These data are not yet final and might change if an issue is detected. If False is chosen, then the preliminary data will be discarded and not stored. cds_token: str, optional (default: None) To identify with the CDS. Required if no .cdsapirc file exists in the home directory (see documentation). You can find your token/key on your CDS user profile page. Alternatively, the CDSAPI_KEY environment variable can be set manually instead of passing the token here. Returns ------- status_code: int Status code summary from all requests: 0 : All Downloaded data ok -1 : Error in at least one request -10 : No data available for requested time period """ h_steps = list(h_steps) product = product.lower() cds_kwds = cds_kwds or dict() if variables is None: variables = default_variables(product=product) else: # find the dl_names variables = lookup(name=product, variables=variables) variables = variables["dl_name"].values.tolist() # this logger name is also used by CDS API, don't change it logger = logging.getLogger('cdsapi') if dry_run: warnings.warn("Dry run does not create connection to CDS") c = None cds_status_tracker = None else: if cds_token is not None: os.environ["CDSAPI_KEY"] = cds_token check_api_ready() os.makedirs(target_path, exist_ok=True) cds_status_tracker = CDSStatusTracker(logger=logger) c = cdsapi.Client( error_callback=cds_status_tracker.handle_error_function) timestamps = pd.DatetimeIndex( np.array([ datetime(t.year, t.month, t.day) for t in pd.date_range(startdate, enddate, freq='D') ])) req_periods = split_chunk( timestamps, n_vars=len(variables), n_hsteps=len(h_steps), max_req_size=n_max_request, reduce=True, daily_request=True if stepsize == "day" else False) logger.info(f"Request is split into {len(req_periods)} chunks") logger.info(f"Target directory {target_path}") downloaded_data_path = os.path.join(target_path, "temp_downloaded") if not os.path.exists(downloaded_data_path): os.mkdir(downloaded_data_path) def _download(curr_start, curr_end): curr_start = pd.to_datetime(curr_start).to_pydatetime() curr_end = pd.to_datetime(curr_end).to_pydatetime() status_code = -1 fname = "{start}_{end}.{ext}".format( start=curr_start.strftime("%Y%m%d"), end=curr_end.strftime("%Y%m%d"), ext="zip" if grb is False else "grb") dl_file = os.path.join(downloaded_data_path, fname) finished, i = False, 0 while (not finished) and (i < 5): # try max 5 times try: finished = download_era5( c, years=[curr_start.year], months=[curr_start.month], days=range(curr_start.day, curr_end.day + 1), h_steps=h_steps, variables=variables, grb=grb, bbox=bbox, product=product, target=dl_file, dry_run=dry_run, cds_kwds=cds_kwds, ) status_code = 0 break except Exception as e: # noqa: E722 c.error(e) # If no data is available or terms were not accepted we don't # need to retry if (cds_status_tracker.download_statuscode in [ CDSStatusTracker.statuscode_unavailable, CDSStatusTracker.statuscode_terms_not_accepted ]): status_code = -10 break # delete the partly downloaded data and retry if os.path.isfile(dl_file): os.remove(dl_file) finished = False i += 1 continue if status_code == 0 and os.path.exists(dl_file): if grb: save_gribs_from_grib( dl_file, target_path, product_name=product.upper(), keep_original=keep_original, keep_prelim=keep_prelim) else: # Extract and merge nc files from zip dl_file_new = dl_file.replace('.zip', '.nc') unzip_nc(dl_file, dl_file_new) dl_file = dl_file_new save_ncs_from_nc( dl_file, target_path, product_name=product.upper(), grid=grid, remap_method=remap_method, keep_original=keep_original, keep_prelim=keep_prelim) return status_code # Since we download month/month or day/day we need to # collect all the status codes to return a valid # status code for the whole time period all_status_codes = parallel_process( _download, ITER_KWARGS={ 'curr_start': [p[0] for p in req_periods], 'curr_end': [p[1] for p in req_periods] }, logger_name='cdsapi', loglevel='DEBUG', n_proc=1, backend='multiprocessing') # remove temporary files if not keep_original: shutil.rmtree(downloaded_data_path) if grid is not None: gridpath = os.path.join(target_path, "grid.txt") if os.path.exists(gridpath): os.unlink(gridpath) weightspath = os.path.join(target_path, "remap_weights.nc") if os.path.exists(weightspath): os.unlink(weightspath) delete_empty_directories(target_path) dl_settings = { 'product': product, 'variables': variables, 'keep_original': keep_original, 'h_steps': h_steps, 'grb': grb, 'bbox': bbox, 'grid': grid, 'remap_method': remap_method, 'cds_kwds': cds_kwds, 'stepsize': stepsize, 'n_max_request': n_max_request, 'keep_prelim': keep_prelim, } update_image_summary_file(target_path, dl_settings) handlers = logger.handlers[:] for handler in handlers: logger.removeHandler(handler) handler.close() handlers.clear() # if any of the sub-periods was successful we want the function to return 0 consolidated_status_code = max(all_status_codes) return consolidated_status_code
[docs]def download_record_extension(path, dry_run=False, cds_token=None): """ Uses information from an existing record to download additional data from CDS. Parameters ---------- path: str Path where the image data to extend is stored. Must also contain a `summary.yml` file. dry_run: bool, optional Do not download anything, this is just used for testing the functions cds_token: str, optional (default: None) To identify with the CDS. Required if no `.cdsapirc` file exists in the home directory (see documentation). You can find your token/key on your CDS user profile page. Alternatively, the CDSAPI_KEY environment variable can be set manually instead of passing the token here. Returns ------- status_code: int Status code summary from all requests: 0 : All Downloaded data ok -1 : Error in at least one request -10 : No data available for requested time period """ props = read_summary_yml(path) last_img = get_first_last_image_date(path) startdate = pd.to_datetime(last_img).to_pydatetime() + timedelta(days=1) enddate = (pd.to_datetime(datetime.now().date()).to_pydatetime() - timedelta(days=1)) # yesterday logging.info(f"Downloading record extension from {startdate} to {enddate}") logging.info(f"Additional settings {props['download_settings']}") return download_and_move( path, startdate=startdate, enddate=enddate, cds_token=cds_token, dry_run=dry_run, **props['download_settings'] )