import glob
import os
import sys
from typing import Union
import xarray as xr
import xcdat as xc
import xmltodict
from pcmdi_metrics.io.xcdat_dataset_io import get_calendar
[docs]
def xcdat_open(
infile: Union[str, list],
data_var: str = None,
decode_times: bool = True,
chunks=None,
) -> xr.Dataset:
"""
Open input file (netCDF, or xml generated by cdscan)
Parameters
----------
infile : Union[str, list]
list of string, or string, for path of file(s) to open using xcdat.
data_var : str, optional
key of the non-bounds data variable to keep in the Dataset, alongside any existing bounds data variables.
By default None, which loads all data variables.
decode_times : bool, optional
If True, attempt to decode times encoded in the standard NetCDF datetime format into cftime.datetime objects.
Otherwise, leave them encoded as numbers. This keyword may not be supported by all the backends, by default True.
chunks : int, "auto", dict, or None, optional
The chunk size used to load data into dask arrays.
Returns
-------
xr.Dataset
xarray dataset opened via xcdat
Examples
--------
>>> from pcmdi_metrics.io import xcdat_open
# Open a single netCDF file
>>> ds = xcdat_open('mydata.nc')
# Open multiple files
>>> ds = xcdat_open(['mydata1.nc', 'mydata2.nc']) # Open multipe netCDF files
# Open multiple files using a wildcard
>>> ds = xcdat_open('mydata*.nc')
# Open with specifing the variable 'ts'
>>> ds = xcdat_open(['mydata1.nc', 'mydata2.nc'], data_var='ts')
# Open an xml file
>>> ds = xcdat_open('mydata.xml')
"""
if isinstance(infile, list) or "*" in infile:
try:
ds = xc.open_mfdataset(
infile, data_var=data_var, decode_times=decode_times, chunks=chunks
)
except (
ValueError
): # Could be due to non-cf-compliant calendar or other attribute
ds = xc.open_mfdataset(
infile, data_var=data_var, decode_times=False, chunks=chunks
)
ds = fix_noncompliant_attr(ds)
else:
if infile.split(".")[-1].lower() == "xml":
try:
ds = _xcdat_openxml(
infile, data_var=data_var, decode_times=decode_times, chunks=chunks
)
except (
ValueError
): # Could be due to non-cf-compliant calendar or other attribute
ds = _xcdat_openxml(
infile, data_var=data_var, decode_times=False, chunks=chunks
)
ds = fix_noncompliant_attr(ds)
else:
try:
ds = xc.open_dataset(
infile, data_var=data_var, decode_times=decode_times, chunks=chunks
)
except (
ValueError
): # Could be due to non-cf-compliant calendar or other attribute
ds = xc.open_dataset(
infile, data_var=data_var, decode_times=False, chunks=chunks
)
ds = fix_noncompliant_attr(ds)
return ds.bounds.add_missing_bounds()
def fix_noncompliant_attr(ds: xr.Dataset) -> xr.Dataset:
"""Fix dataset attributes that do not meet cf standards
Parameters
----------
ds: xr.Dataset
xarray dataset to fix
Returns
-------
xr.Dataset
xarray dataset with updated attributes
"""
# Add any calendar fixes here
cal = get_calendar(ds)
cal = cal.replace("-", "_")
ds.time.attrs["calendar"] = cal
ds = xc.decode_time(ds)
return ds
def _xcdat_openxml(
xmlfile: str, data_var: str = None, decode_times: bool = True, chunks=None
) -> xr.Dataset:
"""Open input file (xml generated by cdscan)
Parameters
----------
infile: str
path of xml file to open using xcdat
data_var: str, optional
key of the non-bounds data variable to keep in the Dataset, alongside any existing bounds data variables, by default None, which loads all data variables
decode_times : bool, optional
If True, attempt to decode times encoded in the standard NetCDF datetime format into cftime.datetime objects. Otherwise, leave them encoded as numbers. This keyword may not be supported by all the backends, by default True
Returns
-------
xr.Dataset
xarray dataset opened via xcdat
"""
if not os.path.exists(xmlfile):
sys.exit(f"ERROR: File not exist: {xmlfile}")
with open(xmlfile, encoding="utf-8") as fd:
doc = xmltodict.parse(fd.read())
ncfile_list = glob.glob(os.path.join(doc["dataset"]["@directory"], "*.nc"))
if len(ncfile_list) > 1:
ds = xc.open_mfdataset(
ncfile_list, data_var=data_var, decode_times=decode_times, chunks=chunks
)
else:
ds = xc.open_dataset(
ncfile_list[0], data_var=data_var, decode_times=decode_times, chunks=chunks
)
return ds