Source code for ecmwf_models.interface

# The MIT License (MIT)
#
# Copyright (c) 2019, TU Wien, Department of Geodesy and Geoinformation
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import warnings
import os
from pygeobase.io_base import ImageBase, MultiTemporalImageBase
from pygeobase.object_base import Image
import numpy as np
from datetime import timedelta

from pygeogrids.netcdf import load_grid
from pynetcf.time_series import GriddedNcOrthoMultiTs
from ecmwf_models.grid import (
    ERA_RegularImgGrid,
    get_grid_resolution,
    ERA_IrregularImgGrid,
)
from ecmwf_models.utils import lookup
import glob

import xarray as xr

try:
    import pygrib
except ImportError:
    warnings.warn("pygrib has not been imported")
"""
Base classes for reading downloaded ERA netcdf and grib images and stacks
"""


[docs]class ERANcImg(ImageBase): """ Reader for a single ERA netcdf file. Parameters ---------- filename: str Path to the image file to read. product : str 'era5' or 'era5-land' or 'eraint' parameter: list or str, optional (default: ['swvl1', 'swvl2']) Name of parameters to read from the image file. subgrid: pygeogrids.CellGrid, optional (default: None) Read only data for points of this grid and not global values. mask_seapoints : bool, optional (default: False) Read the land-sea mask to mask points over water and set them to nan. This option needs the 'lsm' parameter to be in the file! array_1D: bool, optional (default: False) Read data as list, instead of 2D array, used for reshuffling. mode : str, optional (default: 'r') Mode in which to open the file, changing this can cause data loss. This argument should not be changed! """ def __init__( self, filename, product, parameter=["swvl1", "swvl2"], subgrid=None, mask_seapoints=False, array_1D=False, mode="r", ): super(ERANcImg, self).__init__(filename, mode=mode) if type(parameter) == str: parameter = [parameter] # look up short names self.parameter = lookup(product, parameter)["short_name"].values self.mask_seapoints = mask_seapoints self.array_1D = array_1D self.subgrid = subgrid if self.subgrid and not self.array_1D: warnings.warn( "Reading spatial subsets as 2D arrays ony works if there " "is an equal number of points in each line")
[docs] def read(self, timestamp=None): """ Read data from the loaded image file. Parameters --------- timestamp : datetime, optional (default: None) Specific date (time) to read the data for. """ return_img = {} return_metadata = {} try: dataset = xr.open_dataset( self.filename, engine="netcdf4", mask_and_scale=True) except IOError as e: print(e) print(" ".join([self.filename, "can not be opened"])) raise e res_lat, res_lon = get_grid_resolution( dataset.variables["latitude"][:], dataset.variables["longitude"][:]) grid = ( ERA_RegularImgGrid(res_lat, res_lon) if not self.subgrid else self.subgrid) if self.mask_seapoints: if "lsm" not in dataset.variables.keys(): raise IOError("No land sea mask parameter (lsm) in" " passed image for masking.") else: sea_mask = dataset.variables["lsm"].values for name in dataset.variables: if name in self.parameter: variable = dataset[name] if 'expver' in variable.dims and (variable.data.ndim == 3): warnings.warn( f"Found experimental data in {self.filename}") param_data = variable.data[-1] for vers_data in variable.data: if not np.all(np.isnan(vers_data)): param_data = vers_data else: param_data = variable.data if self.mask_seapoints: param_data = np.ma.array( param_data, mask=np.logical_not(sea_mask), fill_value=np.nan, ) param_data = param_data.filled() param_data = param_data.flatten() return_metadata[name] = variable.attrs return_img.update( dict([(str(name), param_data[grid.activegpis])])) try: return_img[name] except KeyError: path, thefile = os.path.split(self.filename) warnings.warn( "Cannot load variable {var} from file {thefile}. " "Filling image with NaNs.".format( var=name, thefile=thefile)) return_img[name] = np.empty(grid.activegpis.size).fill( np.nan) dataset.close() if self.array_1D: return Image( grid.activearrlon, grid.activearrlat, return_img, return_metadata, timestamp, ) else: nlat = np.unique(grid.activearrlat).size nlon = np.unique(grid.activearrlon).size for key in return_img: return_img[key] = return_img[key].reshape((nlat, nlon)) return Image( grid.activearrlon.reshape(nlat, nlon), grid.activearrlat.reshape(nlat, nlon), return_img, return_metadata, timestamp, )
[docs] def write(self, data): raise NotImplementedError()
[docs] def flush(self): pass
[docs] def close(self): pass
[docs]class ERANcDs(MultiTemporalImageBase): """ Class for reading ERA 5 images in nc format. Parameters ---------- root_path: str Root path where image data is stored. parameter: list or str, optional (default: ['swvl1', 'swvl2']) Parameter or list of parameters to read from image files. subgrid: pygeogrids.CellGrid, optional (default: None) Read only data for points of this grid and not global values. mask_seapoints : bool, optional (default: False) Use the land-sea-mask parameter in the file to mask points over water. h_steps : list, optional (default: [0,6,12,18]) List of full hours for which images exist. array_1D: bool, optional (default: False) Read data as list, instead of 2D array, used for reshuffling. """ def __init__( self, root_path, product, parameter=("swvl1", "swvl2"), subgrid=None, mask_seapoints=False, h_steps=(0, 6, 12, 18), array_1D=False, ): self.h_steps = h_steps subpath_templ = ["%Y", "%j"] if type(parameter) == str: parameter = [parameter] ioclass_kws = { 'product': product, 'parameter': parameter, 'subgrid': subgrid, 'mask_seapoints': mask_seapoints, 'array_1D': array_1D } # the goal is to use expERA5*.nc if necessary, but perfer ERA5*.nc self.fn_templ_priority = [ 'ERA5_*_{datetime}.nc', 'expERA5_*_{datetime}.nc' ] super(ERANcDs, self).__init__( root_path, ERANcImg, fname_templ='*_{datetime}.nc', datetime_format="%Y%m%d_%H%M", subpath_templ=subpath_templ, exact_templ=False, ioclass_kws=ioclass_kws) def _search_files(self, timestamp, custom_templ=None, str_param=None, custom_datetime_format=None): """ override the original filename generation to allow multiple files for time stamp """ if custom_templ is not None: raise NotImplementedError else: fname_templ = self.fname_templ if custom_datetime_format is not None: dFormat = {self.dtime_placeholder: custom_datetime_format} else: dFormat = {self.dtime_placeholder: self.datetime_format} sub_path = '' if self.subpath_templ is not None: for s in self.subpath_templ: sub_path = os.path.join(sub_path, timestamp.strftime(s)) fname_templ = fname_templ.format(**dFormat) if str_param is not None: fname_templ = fname_templ.format(**str_param) search_file = os.path.join(self.path, sub_path, timestamp.strftime(fname_templ)) if self.exact_templ: raise NotImplementedError else: filename = glob.glob(search_file) if len(filename) > 1: for templ in self.fn_templ_priority: fname_templ = templ.format(**dFormat) if str_param is not None: fname_templ = fname_templ.format(**str_param) search_file = os.path.join(self.path, sub_path, timestamp.strftime(fname_templ)) filename = glob.glob(search_file) if len(filename) == 1: break if not filename: filename = [] return filename
[docs] def tstamps_for_daterange(self, start_date, end_date): """ Get datetimes in the correct sub-daily resolution between 2 dates Parameters ---------- start_date: datetime Start datetime end_date: datetime End datetime Returns ---------- timestamps : list List of datetimes """ img_offsets = np.array([timedelta(hours=h) for h in self.h_steps]) timestamps = [] diff = end_date - start_date for i in range(diff.days + 1): daily_dates = start_date + timedelta(days=i) + img_offsets timestamps.extend(daily_dates.tolist()) return timestamps
[docs]class ERAGrbImg(ImageBase): """ Base class for reader for a single ERA Grib file. Parameters ---------- filename: str Path to the image file to read. product : str ERA5 or ERAINT parameter: list or str, optional (default: ['swvl1', 'swvl2']) Name of parameters to read from the image file. subgrid: pygeogrids.CellGrid, optional (default:None) Read only data for points of this grid and not global values. mask_seapoints : bool, optional (default: False) Read the land-sea mask to mask points over water and set them to nan. This option needs the 'lsm' parameter to be in the file! array_1D: bool, optional (default: False) Read data as list, instead of 2D array, used for reshuffling. mode : str, optional (default: 'r') Mode in which to open the file, changing this can cause data loss. This argument should not be changed! """ def __init__( self, filename, product, parameter=("swvl1", "swvl2"), subgrid=None, mask_seapoints=False, array_1D=True, mode="r", ): super(ERAGrbImg, self).__init__(filename, mode=mode) if type(parameter) == str: parameter = [parameter] self.parameter = lookup( product, parameter)["short_name"].values # look up short names self.product = product self.mask_seapoints = mask_seapoints self.array_1D = array_1D self.subgrid = subgrid
[docs] def read(self, timestamp=None): """ Read data from the loaded image file. Parameters --------- timestamp : datetime, optional (default: None) Specific date (time) to read the data for. """ grbs = pygrib.open(self.filename) grid = self.subgrid return_img = {} return_metadata = {} var_msg_lut = {p: None for p in self.parameter} sea_mask = None for N in range(grbs.messages): n = N + 1 message = grbs.message(n) param_name = str(message.cfVarNameECMF) if param_name == "lsm": if self.mask_seapoints and sea_mask is None: sea_mask = message.values.flatten() if param_name not in self.parameter: continue else: var_msg_lut[param_name] = n # available variables shape = None for param_name, n in var_msg_lut.items(): if n is None: continue return_metadata[param_name] = {} message = grbs.message(n) param_data = message.values.flatten() if not shape: shape = param_data.shape return_img[param_name] = param_data if grid is None: lats, lons = message.latlons() try: res_lat, res_lon = get_grid_resolution(lats, lons) grid = ERA_RegularImgGrid(res_lat, res_lon) except ValueError: # when grid not regular lons_gt_180 = np.where(lons > 180.0) lons[lons_gt_180] = lons[lons_gt_180] - 360 grid = ERA_IrregularImgGrid(lons, lats) return_metadata[param_name]["units"] = message["units"] return_metadata[param_name]["long_name"] = \ message["parameterName"] if "levels" in message.keys(): return_metadata[param_name]["depth"] = "{:} cm".format( message["levels"]) if self.mask_seapoints: if sea_mask is None: raise IOError( "No land sea mask parameter (lsm) in passed image" " for masking.") else: # mask the loaded data for name in return_img.keys(): param_data = return_img[name] param_data = np.ma.array( param_data, mask=np.logical_not(sea_mask), fill_value=np.nan, ) param_data = param_data.filled() return_img[name] = param_data grbs.close() # missing variables for param_name, n in var_msg_lut.items(): if n is not None: continue param_data = np.full(shape, np.nan) warnings.warn("Cannot load variable {var} from file {thefile}. " "Filling image with NaNs.".format( var=param_name, thefile=self.filename)) return_img[param_name] = param_data return_metadata[param_name] = {} return_metadata[param_name]["long_name"] = lookup( self.product, [param_name]).iloc[0]["long_name"] if self.array_1D: return Image( grid.activearrlon, grid.activearrlat, return_img, return_metadata, timestamp, ) else: nlat = np.unique(grid.activearrlat).size nlon = np.unique(grid.activearrlon).size for key in return_img: return_img[key] = return_img[key].reshape((nlat, nlon)) return Image( grid.activearrlon.reshape(nlat, nlon), grid.activearrlat.reshape(nlat, nlon), return_img, return_metadata, timestamp, )
[docs] def write(self, data): raise NotImplementedError()
[docs] def flush(self): pass
[docs] def close(self): pass
[docs]class ERAGrbDs(MultiTemporalImageBase): """ Reader for a stack of ERA grib files. Parameters ---------- root_path: string Root path where the data is stored product : str ERA5 or ERAINT parameter: list or str, optional (default: ['swvl1', 'swvl2']) Parameter or list of parameters to read expand_grid: bool, optional (default: True) If the reduced gaussian grid should be expanded to a full gaussian grid. """ def __init__( self, root_path, product, parameter=("swvl1", "swvl2"), subgrid=None, mask_seapoints=False, h_steps=(0, 6, 12, 18), array_1D=True, ): self.h_steps = h_steps subpath_templ = ["%Y", "%j"] if type(parameter) == str: parameter = [parameter] ioclass_kws = { "product": product, "parameter": parameter, "subgrid": subgrid, "mask_seapoints": mask_seapoints, "array_1D": array_1D, } super(ERAGrbDs, self).__init__( root_path, ERAGrbImg, fname_templ="*_{datetime}.grb", datetime_format="%Y%m%d_%H%M", subpath_templ=subpath_templ, exact_templ=False, ioclass_kws=ioclass_kws, )
[docs] def tstamps_for_daterange(self, start_date, end_date): """ Get datetimes in the correct sub-daily resolution between 2 dates Parameters ---------- start_date: datetime Start datetime end_date: datetime End datetime Returns ---------- timestamps : list List of datetimes """ img_offsets = np.array([timedelta(hours=h) for h in self.h_steps]) timestamps = [] diff = end_date - start_date for i in range(diff.days + 1): daily_dates = start_date + timedelta(days=i) + img_offsets timestamps.extend(daily_dates.tolist()) return timestamps
[docs]class ERATs(GriddedNcOrthoMultiTs): """ Time series reader for all reshuffled ERA reanalysis products in time series format. Use the read_ts(lon, lat) resp. read_ts(gpi) function of this class to read data for locations! Parameters ---------- ts_path : str Directory where the netcdf time series files are stored grid_path : str, optional (default: None) Path to grid file, that is used to organize the location of time series to read. If None is passed, grid.nc is searched for in the ts_path. Optional keyword arguments that are passed to the Gridded Base when used ------------------------------------------------------------------------ parameters : list, optional (default: None) Specific variable names to read, if None are selected, all are read. offsets : dict, optional (default: None) Offsets (values) that are added to the parameters (keys) scale_factors : dict, optional (default: None) Offset (value) that the parameters (key) is multiplied with ioclass_kws: dict, (optional) Optional keyword arguments, passed to the OrthoMultiTs class when used ---------------------------------------------------------------------- read_bulk : boolean, optional (default: False) If set to True, the data of all locations is read into memory, and subsequent calls to read_ts then read from cache and not from disk. This makes reading complete files faster. read_dates : boolean, optional (default: False) If false, dates will not be read automatically but only on specific request useable for bulk reading because currently the netCDF num2date routine is very slow for big datasets. """ def __init__(self, ts_path, grid_path=None, **kwargs): if grid_path is None: grid_path = os.path.join(ts_path, "grid.nc") grid = load_grid(grid_path) super(ERATs, self).__init__(ts_path, grid, **kwargs)