Source code for ecmwf_models.era5.reshuffle

# -*- coding: utf-8 -*-
"""
Image to time series conversion tools.
"""
import warnings

import os
import pandas as pd
import numpy as np
from datetime import time, datetime, timedelta
from typing import Tuple, Union
from pygeogrids.grids import CellGrid
import inspect

from repurpose.img2ts import Img2Ts
from c3s_sm.misc import update_ts_summary_file, read_summary_yml

from repurpose.process import ImageBaseConnection

from ecmwf_models.era5.img import ERA5NcDs, ERA5GrbDs
from ecmwf_models.grid import ERA_RegularImgGrid, ERA5_RegularImgLandGrid
from ecmwf_models.utils import (parse_filetype, parse_product,
                                get_first_last_image_date)


[docs]class Reshuffler: def __init__(self, input_root, outputpath, variables=None, h_steps=(0, 6, 12, 18), product=None, land_points=False): """ Parameters ---------- input_root: str Input path where ERA image data was downloaded to. outputpath : str Output path, where the reshuffled netcdf time series are stored. variables: tuple or str Variables to read from the passed images and convert into time series format. h_steps : list or tuple, optional (default: (0, 6, 12, 18)) Hours at which images are read for each day and used for reshuffling, therefore this defines the sub-daily temporal resolution of the time series that are generated. product : str, optional (default: None) Either era5 or era5-land, if None is passed we guess the product from the downloaded image files. land_points: bool, optional (default: False) Reshuffle only land points. Uses the ERA5 land mask to create a land grid. The land grid is fixed to 0.25*0.25 or 0.1*0.1 deg for now. """ self.input_root = input_root self.outputpath = outputpath self.land_points = land_points self.variables = variables self.h_steps = h_steps self.filetype = parse_filetype(input_root) self.product = product or parse_product(input_root)
[docs] def find_first_last_file_date(self) -> Tuple[str, str]: """ Derive time stamp of the first and last available image """ first_date = get_first_last_image_date(self.input_root, start_from_last=False) last_date = get_first_last_image_date(self.input_root, start_from_last=True) return first_date, last_date
[docs] def load_grid(self, bbox: Tuple = None, cellsize=5.) -> CellGrid: """ Generate ERA5 and ERA5-Land grid in the given bounding box. Ensures that GPIs are consistent with global grid. """ if self.land_points: if self.product == "era5": grid = ERA5_RegularImgLandGrid(resolution=0.25, bbox=bbox, cellsize=cellsize) elif self.product == "era5-land": grid = ERA5_RegularImgLandGrid(resolution=0.1, bbox=bbox, cellsize=cellsize) else: raise NotImplementedError( self.product, "Land grid not implemented for product.") else: if self.product.lower() in ['era5-land', 'era5-land-t']: grid = ERA_RegularImgGrid(resolution=0.1, bbox=bbox, cellsize=cellsize) elif self.product.lower() in ['era5', 'era5-t']: grid = ERA_RegularImgGrid(resolution=0.25, bbox=bbox, cellsize=cellsize) else: raise NotImplementedError(self.product, "Grid not implemented for product.") return grid
[docs] def get_img_reader(self, grid: Union[CellGrid, None], h_steps: tuple) -> Union[ERA5GrbDs, ERA5NcDs]: """ Set up the Multi Image reader class """ if self.filetype == "grib": input_dataset = ERA5GrbDs( root_path=self.input_root, parameter=self.variables, subgrid=grid, array_1D=True, h_steps=h_steps, product=self.product, mask_seapoints=False, ) elif self.filetype == "netcdf": input_dataset = ERA5NcDs( root_path=self.input_root, parameter=self.variables, subgrid=grid, array_1D=True, h_steps=h_steps, product=self.product, mask_seapoints=False, ) else: raise Exception("Unknown file format") return input_dataset
[docs] def reshuffle(self, startdate=None, enddate=None, bbox=None, cellsize=5.0, imgbuffer=50, n_proc=1): """ Reshuffle method applied to ERA images for conversion into netcdf time series format. Note: ERA5 and ERA5-Land files are preferred over their T-counterpart, in case that multiple files for a time stamp are present! Parameters ---------- startdate : str Start date, from which images are read and time series are generated. Format YYYY-mm-dd enddate : str, optional (default: None) End date, from which images are read and time series are generated. Format YYYY-mm-dd. If None is passed, then the last available image date is used. variables: tuple or str Variables to read from the passed images and convert into time series format. product : str, optional (default: None) Either era5 or era5-land, if None is passed we guess the product from the downloaded image files. bbox: tuple optional (default: None) (min_lon, min_lat, max_lon, max_lat) - wgs84. To load only a subset of the global grid / file. cellsize: float, optional (default: 5.0) Cell chunking of the time series in degrees. imgbuffer: int, optional (default: 50) How many images to read at once before writing time series. This number affects how many images are stored in memory and should be chosen according to the available amount of memory and the size of a single image. n_proc: int, optional Number of parallel processes """ if (startdate is None) or (enddate is None): _first, _last = self.find_first_last_file_date() startdate = _first if startdate is None else startdate enddate = _last if enddate is None else enddate startdate = pd.to_datetime(startdate).to_pydatetime().date() enddate = pd.to_datetime(enddate).to_pydatetime().date() if startdate > enddate: warnings.warn( f"Start date {startdate} is greater than end date {enddate}. " f"Abort." ) return # get time series attributes from first day of data. first_date_time = datetime.combine( startdate, time(self.h_steps[0], 0)) if bbox is None: # If a subset was downloaded, otherwise repurpose would resample input_dataset = self.get_img_reader(None, self.h_steps) _img = input_dataset.read(first_date_time) bbox = [round(float(_img.lon.min()), 3), round(float(_img.lat.min()), 3), round(float(_img.lon.max()), 3), round(float(_img.lat.max()), 3)] grid = self.load_grid(bbox, cellsize=cellsize) input_dataset = self.get_img_reader(grid, self.h_steps) global_attr = { f"product": f"{self.product.upper()} (from {self.filetype})" } # get time series attributes from first day of data. data = input_dataset.read(first_date_time) ts_attributes = data.metadata props = { 'product': self.product, 'filetype': self.filetype, 'last_update': datetime.now() } kwargs = { 'input_root': self.input_root, 'outputpath': self.outputpath, 'variables': None if self.variables is None else list(self.variables), 'land_points': self.land_points, 'startdate': startdate, 'enddate': enddate, 'h_steps': list(self.h_steps), 'bbox': None if bbox is None else list(bbox), 'imgbuffer': imgbuffer, 'cellsize': cellsize, } props["img2ts_kwargs"] = kwargs os.makedirs(self.outputpath, exist_ok=True) reshuffler = Img2Ts( input_dataset=ImageBaseConnection(input_dataset), outputpath=self.outputpath, startdate=str(startdate), enddate=str(enddate), input_grid=grid, imgbuffer=imgbuffer, ts_dtypes=np.dtype("float32"), global_attr=global_attr, zlib=True, unlim_chunksize=1000, ts_attributes=ts_attributes, backend='loky', n_proc=n_proc, ) reshuffler.calc() update_ts_summary_file(self.outputpath, props)
[docs]def extend_ts(ts_path, **img2ts_kwargs): """ Append any new data from the image path to the time series data. This function is only applied to time series file that were created using the img2ts function. This will use the start from the previously written metadata, and process only parameters that are already present in the time series files. Parameters ---------- ts_path: str Directory containing time series files to extend. It is also expected that there is a `overview.yml` file in this directory. img2ts_kwargs: All kwargs are optional, if they are not given, we use them from the previously created `overview.yml` file. If they are passed here, they will override the values from the yml file. """ if not os.path.isfile(os.path.join(ts_path, 'overview.yml')): raise FileNotFoundError( "No overview.yml file found in the passed directory. " "This file is required to use the same settings to extend an " "existing record. NOTE: Use the `era5 download` or " "`era5land download` programs first.") ts_props = read_summary_yml(ts_path) for k, v in ts_props['img2ts_kwargs'].items(): if k not in img2ts_kwargs: if k == 'startdate': old_enddate = pd.to_datetime( ts_props["img2ts_kwargs"]["enddate"]).to_pydatetime() startdate = old_enddate + timedelta(days=1) img2ts_kwargs['startdate'] = startdate elif k in ['enddate', 'outputpath']: # fields from yml to ignore continue else: img2ts_kwargs[k] = v if 'enddate' not in img2ts_kwargs: img2ts_kwargs['enddate'] = None # All available images reshuffler_kwargs = {} for n in inspect.getfullargspec(Reshuffler).args[1:]: if n in img2ts_kwargs: reshuffler_kwargs[n] = img2ts_kwargs[n] reshuffle_kwargs = {} for n in inspect.getfullargspec(Reshuffler.reshuffle).args[1:]: if n in img2ts_kwargs: reshuffle_kwargs[n] = img2ts_kwargs[n] reshuffler = Reshuffler(outputpath=ts_path, **reshuffler_kwargs) reshuffler.reshuffle(**reshuffle_kwargs)