import tempfile
from datetime import datetime
import logging
import os
import pandas as pd
import xarray as xr
import zipfile
import shutil
import numpy as np
from ecmwf_models.globals import (IMG_FNAME_TEMPLATE,
IMG_FNAME_DATETIME_FORMAT, EXPVER, SUBDIRS)
from ecmwf_models.globals import (
Cdo,
cdo_available,
CdoNotFoundError,
pygrib,
pygrib_available,
PygribNotFoundError,
)
[docs]def create_dt_fpath(dt, root, fname, subdirs=[]):
"""
Create filepaths from root + fname and a list of subdirectories.
fname and subdirs will be put through dt.strftime.
Parameters
----------
dt: datetime.datetime
date as basis for the URL
root: string
root of the filenpath
fname: string
filename to use
subdirs: list, optional
list of strings.
Each element represents a subdirectory.
For example the list ['%Y', '%m'] would lead to a URL of
``root/YYYY/MM/fname`` or for a dt of datetime(2000,12,31)
``root/2000/12/fname``
Returns
-------
fpath: string
Full filename including path
"""
dt_subdirs = []
for subdir in subdirs:
dt_subdirs.append(dt.strftime(subdir))
dt_fname = dt.strftime(fname)
flist = [root] + dt_subdirs + [dt_fname]
fpath = os.path.join(*flist)
return fpath
[docs]def unzip_nc(
input_zip,
output_nc,
):
"""
Unzip and merge all netcdf files downloaded from CDS. If the zip file
contains only 1 netcdf file, it only be extracted.
Parameters
----------
input_zip: str
Path to the downloaded zip file containing one or more (datastream)
netcdf files.
output_nc: str
Path to the netcdf file to write
"""
with tempfile.TemporaryDirectory() as tmpdir:
with zipfile.ZipFile(input_zip, "r") as zip_ref:
zip_ref.extractall(tmpdir)
ncfiles = [
os.path.join(tmpdir, f)
for f in os.listdir(tmpdir)
if f.endswith(".nc")
]
if len(ncfiles) == 1:
shutil.move(ncfiles[0], output_nc)
else:
# Sometimes CDS returns multiple netcdf files, merge them
ds = [xr.open_dataset(os.path.join(tmpdir, f)) for f in ncfiles]
expvers = []
for d in ds:
if 'expver' in d.coords:
expvers.append(d.coords['expver'].values.astype(int))
if len(expvers) > 0:
expvers = np.array(expvers).max(axis=0)
for d in ds:
d.coords['expver'] = np.array(
[f"{e: 04}" for e in expvers])
ds = xr.combine_by_coords(
ds, combine_attrs="override", compat='override')
ds.to_netcdf(
output_nc,
encoding={
v: {
'zlib': True,
'complevel': 6
} for v in ds.data_vars
})
os.remove(input_zip)
[docs]def save_ncs_from_nc(
input_nc,
output_path,
product_name,
grid=None,
keep_original=True,
remap_method="bil",
keep_prelim=True,
):
"""
Split the downloaded netcdf file into daily files and add to folder
structure necessary for reshuffling.
Parameters
----------
input_nc : str
Filepath of the downloaded .nc file
output_path : str
Where to save the resulting netcdf files
product_name : str
Name of the ECMWF model (only for filename generation)
keep_original: bool
keep the original downloaded data too, before it is sliced into
individual images.
keep_prelim: bool, optional (default: True)
True to keep preliminary data from ERA5T with a different file name, or
False drop these files and only keep the final records.
"""
_filename_templ = IMG_FNAME_TEMPLATE.format(
product="{product}",
type='AN',
datetime=IMG_FNAME_DATETIME_FORMAT,
ext='nc')
with xr.open_dataset(input_nc, mask_and_scale=True) as nc_in:
if 'valid_time' in nc_in.dims:
nc_in = nc_in.rename_dims({"valid_time": 'time'})
if 'valid_time' in nc_in.variables:
nc_in = nc_in.rename_vars({"valid_time": 'time'})
if grid is not None:
if not cdo_available:
raise CdoNotFoundError()
cdo = Cdo()
gridpath = os.path.join(output_path, "grid.txt")
weightspath = os.path.join(output_path, "remap_weights.nc")
if not os.path.exists(gridpath):
with open(gridpath, "w") as f:
for k, v in grid.items():
f.write(f"{k} = {v}\n")
for i, time in enumerate(nc_in["time"].values):
subset = nc_in.sel({"time": time})
# Expver identifies preliminary data
if 'expver' in subset:
ex = np.atleast_1d(subset['expver'].values)
if len(ex) == 1:
expver = str(ex[0])
else:
expver = str(ex[i])
subset = subset.drop_vars('expver')
try:
ext = EXPVER[expver]
except KeyError:
ext = ''
else:
ext = ''
if len(ext) > 0 and not keep_prelim:
logging.info(f"Dropping preliminary data {time}")
continue
if len(ext) > 0:
filename_templ = _filename_templ.format(product=product_name +
'-' + ext)
else:
filename_templ = _filename_templ.format(product=product_name)
if 'number' in subset.variables:
subset = subset.drop_vars('number')
timestamp = pd.Timestamp(time).to_pydatetime()
filepath = create_dt_fpath(
timestamp,
root=output_path,
fname=filename_templ,
subdirs=SUBDIRS,
)
if not os.path.exists(os.path.dirname(filepath)):
os.makedirs(os.path.dirname(filepath))
if grid is not None:
if not os.path.exists(weightspath):
# create weights file
getattr(cdo, "gen" + remap_method)(
gridpath, input=subset, output=weightspath)
subset = cdo.remap(
",".join([gridpath, weightspath]),
input=subset,
returnXDataset=True,
)
# same compression for all variables
var_encode = {"zlib": True, "complevel": 6}
# explicitly use netcdf4 engine
subset.to_netcdf(
filepath,
encoding={var: var_encode for var in subset.variables},
engine="netcdf4")
if not keep_original:
os.remove(input_nc)
if grid is not None:
cdo.cleanTempDir()
[docs]def save_gribs_from_grib(
input_grib,
output_path,
product_name,
keep_original=True,
keep_prelim=True,
):
"""
Split the downloaded grib file into daily files and add to folder structure
necessary for reshuffling.
Parameters
----------
input_grib : str
Filepath of the downloaded .grb file
output_path : str
Where to save the resulting grib files
product_name : str
Name of the ECMWF model (only for filename generation)
keep_original: bool
keep the original downloaded data too, before it is sliced into
individual images.
keep_prelim: bool, optional (default: True)
True to keep preliminary data from ERA5T with a different file name, or
False drop these files and only keep the final records.
"""
if not pygrib_available:
raise PygribNotFoundError()
grib_in = pygrib.open(input_grib)
_filename_templ = IMG_FNAME_TEMPLATE.format(
product="{product}",
type='AN',
datetime=IMG_FNAME_DATETIME_FORMAT,
ext='grb')
grib_in.seek(0)
prev_date = None
for grb in grib_in:
filedate = datetime(grb["year"], grb["month"], grb["day"], grb["hour"])
expver = grb['expver']
try:
ext = EXPVER[expver]
except KeyError:
ext = ''
if len(ext) > 0 and not keep_prelim:
logging.info(f"Dropping preliminary data {filedate}")
continue
if len(ext) > 0:
filename_templ = _filename_templ.format(product=product_name +
'-' + ext)
else:
filename_templ = _filename_templ.format(product=product_name)
filepath = create_dt_fpath(
filedate, root=output_path, fname=filename_templ, subdirs=SUBDIRS)
if not os.path.exists(os.path.dirname(filepath)):
os.makedirs(os.path.dirname(filepath))
if prev_date != filedate: # to overwrite old files
mode = 'wb'
prev_date = filedate
else:
mode = "ab"
with open(filepath, mode) as grb_out:
grb_out.write(grb.tostring())
grib_in.close()
if not keep_original:
os.remove(input_grib)