Source code for pcmdi_metrics.utils.grid

import numpy as np
import xarray as xr
import xcdat as xc

from pcmdi_metrics.io import (
    get_grid,
    get_latitude_bounds_key,
    get_latitude_key,
    get_longitude_bounds_key,
    get_longitude_key,
)


[docs] def create_target_grid( lat1: float = -90.0, lat2: float = 90.0, lon1: float = 0.0, lon2: float = 360.0, target_grid_resolution: str = "2.5x2.5", grid_type: str = "uniform", ) -> xr.Dataset: """Generate a uniform grid for given latitude/longitude ranges and resolution Parameters ---------- lat1 : float, optional Starting latitude, by default -90. lat2 : float, optional Starting latitude, by default 90. lon1 : float, optional Starting latitude, by default 0. lon2 : float, optional Starting latitude, by default 360. target_grid_resolution : str, optional grid resolution in degree for lat and lon, by default "2.5x2.5" grid_type : str, optional type of the grid ('uniform' or 'gaussian'), by default "uniform" Returns ------- xr.Dataset generated grid Examples --------- Import: >>> from pcmdi_metrics.utils import create_target_grid Global uniform grid: >>> grid = create_target_grid(-90, 90, 0, 360, target_grid_resolution="5x5") Regional uniform grid: >>> grid = create_target_grid(30, 50, 100, 150, target_grid_resolution="0.5x0.5") """ # generate target grid res = target_grid_resolution.split("x") lat_res = float(res[0]) lon_res = float(res[1]) start_lat = lat1 + lat_res / 2.0 start_lon = lon1 + lon_res / 2.0 end_lat = lat2 - lat_res / 2 end_lon = lon2 - lon_res / 2 if grid_type == "uniform": grid = xc.create_uniform_grid( start_lat, end_lat, lat_res, start_lon, end_lon, lon_res ) elif grid_type == "gaussian": nlat = int(180 / lat_res) grid = xc.create_gaussian_grid(nlat) # If the longitude values include 0 and 360, then remove 360 to avoid having repeating grid if 0 in grid.lon.values and 360 in grid.lon.values: min_lon = grid.lon.values[0] # 0 # max_lon = grid.lon.values[-1] # 360 second_max_lon = grid.lon.values[-2] # 360-dlat grid = grid.sel(lon=slice(min_lon, second_max_lon)) # Reverse latitude if needed if grid.lat.values[0] > grid.lat.values[-1]: grid = grid.isel(lat=slice(None, None, -1)) grid = grid.sel(lat=slice(start_lat, end_lat), lon=slice(start_lon, end_lon)) else: raise ValueError( f"grid_type {grid_type} is undefined. Please use either 'uniform' or 'gaussian'" ) return grid
def __haversine(lat1, lon1, lat2, lon2): """ Calculate the great-circle distance (in meters) between two points on the Earth's surface given their latitude and longitude in decimal degrees. Parameters: - lat1 (float or array-like): Latitude of the first point in decimal degrees. - lon1 (float or array-like): Longitude of the first point in decimal degrees. - lat2 (float or array-like): Latitude of the second point in decimal degrees. - lon2 (float or array-like): Longitude of the second point in decimal degrees. Returns: - distance (float): Great-circle distance between the two points in meters. """ R = 6371000 # Earth radius in meters dlat = np.radians(lat2 - lat1) dlon = np.radians(lon2 - lon1) a = ( np.sin(dlat / 2) ** 2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon / 2) ** 2 ) c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a)) distance = R * c return distance def calculate_grid_area(ds: xr.Dataset) -> xr.DataArray: """ Calculate the area of each grid cell in a latitude-longitude dataset and compute area weights for each cell. Parameters: - ds (xarray.Dataset): Input dataset containing latitude and longitude bounds. Returns: - ds_new (xarray.Dataset): New dataset with additional variables: - 'grid_area': Area of each grid cell in square meters. - 'area_weights': Area weights for each grid cell, normalized to sum to 1. """ lat_key = get_latitude_key(ds) lon_key = get_longitude_key(ds) lat_bounds_key = get_latitude_bounds_key(ds) lon_bounds_key = get_longitude_bounds_key(ds) lat_bounds = ds[lat_bounds_key] lon_bounds = ds[lon_bounds_key] lat1, lat2 = lat_bounds[:, 0], lat_bounds[:, 1] lon1, lon2 = lon_bounds[:, 0], lon_bounds[:, 1] # Calculate distances using Haversine formula delta_lat = __haversine(lat1, lon1, lat2, lon1) # trapezoid height delta_lon1 = __haversine(lat1, lon1, lat1, lon2) # trapezoid lower base delta_lon2 = __haversine(lat2, lon1, lat2, lon2) # trapezoid upper base delta_lon = (delta_lon1 + delta_lon2) / 2.0 # Calculate area area = delta_lat * delta_lon # Create a new variable in the dataset with the calculated areas grid_area = xr.DataArray(area, dims=[lat_key, lon_key], attrs={"units": "m2"}) return grid_area def calculate_area_weights(grid_area): # Calculate area weighting for each grid total_area = grid_area.sum() area_weights = np.sqrt(grid_area / total_area) return area_weights
[docs] def regrid( ds: xr.Dataset, data_var: str, target_grid: xr.Dataset, regrid_tool: str = "regrid2", regrid_method: str = "bilinear", fill_zero: bool = False, ) -> xr.Dataset: """ Regrid the dataset to a given grid. Parameters ---------- ds : xr.Dataset Dataset to regrid. data_var : str Variable in the dataset. target_grid : xr.Dataset Grid to interpolate to. regrid_tool : str, optional Regrid option: "regrid2" or "xesmf". Default is "regrid2". regrid_method : str, optional Regrid method option that is required for xesmf regridder. Default is "bilinear". fill_zero : bool, optional Fill NaN value with zero if exists. Default is False. Returns ------- xr.Dataset Regridded dataset. Examples -------- >>> from pcmdi_metrics.utils import regrid """ target_grid = get_grid(target_grid) # To remove time dimension if exist # regrid if regrid_tool == "regrid2": ds_regridded = ds.regridder.horizontal(data_var, target_grid, tool=regrid_tool) elif regrid_tool in ["esmf", "xesmf"]: regrid_tool = "xesmf" regrid_method = "bilinear" ds_regridded = ds.regridder.horizontal( data_var, target_grid, tool=regrid_tool, method=regrid_method ) if fill_zero: ds_regridded = ds_regridded.fillna(0) ds_regridded = ds_regridded.bounds.add_missing_bounds() # just in case return ds_regridded