Source code for pcmdi_metrics.io.xcdat_openxml

import glob
import os
import sys
from typing import Union

import xarray as xr
import xcdat as xc
import xmltodict

from pcmdi_metrics.io.xcdat_dataset_io import get_calendar


[docs] def xcdat_open( infile: Union[str, list], data_var: str = None, decode_times: bool = True, chunks=None, ) -> xr.Dataset: """ Open input file (netCDF, or xml generated by cdscan) Parameters ---------- infile : Union[str, list] list of string, or string, for path of file(s) to open using xcdat. data_var : str, optional key of the non-bounds data variable to keep in the Dataset, alongside any existing bounds data variables. By default None, which loads all data variables. decode_times : bool, optional If True, attempt to decode times encoded in the standard NetCDF datetime format into cftime.datetime objects. Otherwise, leave them encoded as numbers. This keyword may not be supported by all the backends, by default True. chunks : int, "auto", dict, or None, optional The chunk size used to load data into dask arrays. Returns ------- xr.Dataset xarray dataset opened via xcdat Examples -------- >>> from pcmdi_metrics.io import xcdat_open # Open a single netCDF file >>> ds = xcdat_open('mydata.nc') # Open multiple files >>> ds = xcdat_open(['mydata1.nc', 'mydata2.nc']) # Open multipe netCDF files # Open multiple files using a wildcard >>> ds = xcdat_open('mydata*.nc') # Open with specifing the variable 'ts' >>> ds = xcdat_open(['mydata1.nc', 'mydata2.nc'], data_var='ts') # Open an xml file >>> ds = xcdat_open('mydata.xml') """ if isinstance(infile, list) or "*" in infile: try: ds = xc.open_mfdataset( infile, data_var=data_var, decode_times=decode_times, chunks=chunks ) except ( ValueError ): # Could be due to non-cf-compliant calendar or other attribute ds = xc.open_mfdataset( infile, data_var=data_var, decode_times=False, chunks=chunks ) ds = fix_noncompliant_attr(ds) else: if infile.split(".")[-1].lower() == "xml": try: ds = _xcdat_openxml( infile, data_var=data_var, decode_times=decode_times, chunks=chunks ) except ( ValueError ): # Could be due to non-cf-compliant calendar or other attribute ds = _xcdat_openxml( infile, data_var=data_var, decode_times=False, chunks=chunks ) ds = fix_noncompliant_attr(ds) else: try: ds = xc.open_dataset( infile, data_var=data_var, decode_times=decode_times, chunks=chunks ) except ( ValueError ): # Could be due to non-cf-compliant calendar or other attribute ds = xc.open_dataset( infile, data_var=data_var, decode_times=False, chunks=chunks ) ds = fix_noncompliant_attr(ds) return ds.bounds.add_missing_bounds()
def fix_noncompliant_attr(ds: xr.Dataset) -> xr.Dataset: """Fix dataset attributes that do not meet cf standards Parameters ---------- ds: xr.Dataset xarray dataset to fix Returns ------- xr.Dataset xarray dataset with updated attributes """ # Add any calendar fixes here cal = get_calendar(ds) cal = cal.replace("-", "_") ds.time.attrs["calendar"] = cal ds = xc.decode_time(ds) return ds def _xcdat_openxml( xmlfile: str, data_var: str = None, decode_times: bool = True, chunks=None ) -> xr.Dataset: """Open input file (xml generated by cdscan) Parameters ---------- infile: str path of xml file to open using xcdat data_var: str, optional key of the non-bounds data variable to keep in the Dataset, alongside any existing bounds data variables, by default None, which loads all data variables decode_times : bool, optional If True, attempt to decode times encoded in the standard NetCDF datetime format into cftime.datetime objects. Otherwise, leave them encoded as numbers. This keyword may not be supported by all the backends, by default True Returns ------- xr.Dataset xarray dataset opened via xcdat """ if not os.path.exists(xmlfile): sys.exit(f"ERROR: File not exist: {xmlfile}") with open(xmlfile, encoding="utf-8") as fd: doc = xmltodict.parse(fd.read()) ncfile_list = glob.glob(os.path.join(doc["dataset"]["@directory"], "*.nc")) if len(ncfile_list) > 1: ds = xc.open_mfdataset( ncfile_list, data_var=data_var, decode_times=decode_times, chunks=chunks ) else: ds = xc.open_dataset( ncfile_list[0], data_var=data_var, decode_times=decode_times, chunks=chunks ) return ds