from pathlib import Path
import xarray as xr
import warnings # use to warn user about missing files
warnings.formatwarning = my_formatwarning
# "reference data"
# It is often just a "baseline case",
# but could be some totally external data (reanalysis or observation or other model)
# When it is another simulation, it gets treated like another "case"
# When it is external data expect:
# - "climo" files (12 monthly climos in the file)
# - one variable per "climo"
# - source can differ for each variable, requires label
# - resolution can differ for each variable, requires regridded file(s)
# - the variable name and units in the file may differ from CAM; use defaults.yaml to set conversion
# - there could be multiple instances of a variable from different sources (e.g. different observations)
# NOTE: the last item (multiple instances of a variable) is not allowed in AdfObs.var_obs_dict
# Since ADF is not able to handle this case, for now it is excluded the AdfData class.
# NOTE: To make the "baseline case" vs "external data" cases as similar as possible,
# below construct the "baseline case" version to be similar to "external data".
# - provide a dictionary of (variable: file-path)
# + For external data, that dictionay is from AdfObs.var_obs_dict,
# which provides a dict of all the available variables.
# + For reference simulation, look for files that match the diag_var_list
# NOTE: There is currently a "base_nickname" allowed from AdfInfo.
# Set AdfData.ref_nickname to that.
# Could be altered from "Obs" to be the data source label.
[docs]
class AdfData:
"""A class instantiated with an AdfDiag object.
Methods provide means to load data.
This class does not interact with plotting,
just provides access to data locations and loading data.
A future need is to add some kind of frequency/sampling
parameters to allow for non-h0 files.
"""
def __init__(self, adfobj):
self.adf = adfobj # provides quick access to the AdfDiag object
# paths
self.model_rgrid_loc = adfobj.get_basic_info("cam_regrid_loc", required=True)
# variables (and info for unit transform)
# use self.adf.diag_var_list and self.adf.self.adf.variable_defaults
# case names and nicknames
self.case_names = adfobj.get_cam_info("cam_case_name", required=True)
self.test_nicknames = adfobj.case_nicknames["test_nicknames"]
self.base_nickname = adfobj.case_nicknames["base_nickname"]
self.ref_nickname = self.base_nickname
# define reference data
self.set_reference() # specify "ref_labels" -> called "data_list" in zonal_mean (name of data source)
[docs]
def set_reference(self):
"""Set attributes for reference (aka baseline) data location, names, and variables."""
if self.adf.compare_obs:
self.ref_var_loc = {v: self.adf.var_obs_dict[v]['obs_file'] for v in self.adf.var_obs_dict}
self.ref_labels = {v: self.adf.var_obs_dict[v]['obs_name'] for v in self.adf.var_obs_dict}
self.ref_var_nam = {v: self.adf.var_obs_dict[v]['obs_var'] for v in self.adf.var_obs_dict}
self.ref_case_label = "Obs"
if not self.adf.var_obs_dict:
warnings.warn("\t WARNING: reference is observations, but no observations found to plot against.")
else:
self.ref_var_loc = {}
self.ref_var_nam = {}
self.ref_labels = {}
# when using a reference simulation, allow a "special" attribute with the case name:
self.ref_case_label = self.adf.get_baseline_info("cam_case_name", required=True)
for v in self.adf.diag_var_list:
self.ref_var_nam[v] = v
self.ref_labels[v] = self.adf.get_baseline_info("cam_case_name", required=True)
f = self.get_reference_climo_file(v)
if f:
self.ref_var_loc[v] = f
[docs]
def set_ref_var_loc(self):
"""Set reference climo file locations"""
for v in self.adf.diag_var_list:
f = self.get_reference_climo_file(v)
self.ref_var_loc[v] = f
# Time series files
#------------------
# Test case(s)
[docs]
def get_timeseries_file(self, case, field):
"""Return list of test time series files"""
ts_locs = self.adf.get_cam_info("cam_ts_loc", required=True) # list of paths (could be multiple cases)
caseindex = (self.case_names).index(case)
ts_loc = Path(ts_locs[caseindex])
ts_filenames = f'{case}.*.{field}.*nc'
ts_files = sorted(ts_loc.glob(ts_filenames))
return ts_files
# Reference case (baseline/obs)
[docs]
def get_ref_timeseries_file(self, field):
"""Return list of reference time series files"""
if self.adf.compare_obs:
warnings.warn("\t WARNING: ADF does not currently expect observational time series files.")
return None
else:
ts_loc = Path(self.adf.get_baseline_info("cam_ts_loc", required=True))
ts_filenames = f'{self.ref_case_label}.*.{field}.*nc'
ts_files = sorted(ts_loc.glob(ts_filenames))
return ts_files
[docs]
def load_timeseries_dataset(self, fils):
"""Return DataSet from time series file(s) and assign time to midpoint of interval"""
if (len(fils) == 0):
warnings.warn("\t WARNING: Input file list is empty.")
return None
elif (len(fils) > 1):
ds = xr.open_mfdataset(fils, decode_times=False)
else:
sfil = str(fils[0])
if not Path(sfil).is_file():
warnings.warn(f"\t WARNING: Expecting to find file: {sfil}")
return None
ds = xr.open_dataset(sfil, decode_times=False)
if ds is None:
warnings.warn(f"\t WARNING: invalid data on load_dataset")
# assign time to midpoint of interval (even if it is already)
if 'time_bnds' in ds:
t = ds['time_bnds'].mean(dim='nbnd')
t.attrs = ds['time'].attrs
ds = ds.assign_coords({'time':t})
elif 'time_bounds' in ds:
t = ds['time_bounds'].mean(dim='nbnd')
t.attrs = ds['time'].attrs
ds = ds.assign_coords({'time':t})
else:
warnings.warn("\t INFO: Timeseries file does not have time bounds info.")
return xr.decode_cf(ds)
[docs]
def load_timeseries_da(self, case, variablename):
"""Return DataArray from time series file(s).
Uses defaults file to convert units.
"""
add_offset, scale_factor = self.get_value_converters(case, variablename)
fils = self.get_timeseries_file(case, variablename)
if not fils:
warnings.warn(f"\t WARNING: Did not find case time series file(s), variable: {variablename}")
return None
return self.load_da(fils, variablename, add_offset=add_offset, scale_factor=scale_factor)
[docs]
def load_reference_timeseries_da(self, field):
"""Return a DataArray time series to be used as reference
(aka baseline) for variable field.
"""
fils = self.get_ref_timeseries_file(field)
if not fils:
warnings.warn(f"\t WARNING: Did not find reference time series file(s), variable: {field}")
return None
#Change the variable name from CAM standard to what is
# listed in variable defaults for this observation field
if self.adf.compare_obs:
field = self.ref_var_nam[field]
add_offset = 0
scale_factor = 1
else:
add_offset, scale_factor = self.get_value_converters(self.ref_case_label, field)
return self.load_da(fils, field, add_offset=add_offset, scale_factor=scale_factor)
#------------------
# Climatology files
#------------------
# Test case(s)
[docs]
def load_climo_da(self, case, variablename):
"""Return DataArray from climo file"""
add_offset, scale_factor = self.get_value_converters(case, variablename)
fils = self.get_climo_file(case, variablename)
return self.load_da(fils, variablename, add_offset=add_offset, scale_factor=scale_factor)
[docs]
def load_climo_file(self, case, variablename):
"""Return Dataset for climo of variablename"""
fils = self.get_climo_file(case, variablename)
if not fils:
warnings.warn(f"\t WARNING: Did not find climo file for variable: {variablename}. Will try to skip.")
return None
return self.load_dataset(fils)
[docs]
def get_climo_file(self, case, variablename):
"""Retrieve the climo file path(s) for variablename for a specific case."""
a = self.adf.get_cam_info("cam_climo_loc", required=True) # list of paths (could be multiple cases)
caseindex = (self.case_names).index(case) # the entry for specified case
model_cl_loc = Path(a[caseindex])
return sorted(model_cl_loc.glob(f"{case}_{variablename}_climo.nc"))
# Reference case (baseline/obs)
[docs]
def load_reference_climo_da(self, case, variablename):
"""Return DataArray from reference (aka baseline) climo file"""
add_offset, scale_factor = self.get_value_converters(case, variablename)
fils = self.get_reference_climo_file(variablename)
return self.load_da(fils, variablename, add_offset=add_offset, scale_factor=scale_factor)
[docs]
def get_reference_climo_file(self, var):
"""Return a list of files to be used as reference (aka baseline) for variable var."""
if self.adf.compare_obs:
fils = self.ref_var_loc.get(var, None)
return [fils] if fils is not None else None
ref_loc = self.adf.get_baseline_info("cam_climo_loc")
# NOTE: originally had this looking for *_baseline.nc
fils = sorted(Path(ref_loc).glob(f"{self.ref_case_label}_{var}_climo.nc"))
if fils:
return fils
return None
#------------------
# Regridded files
#------------------
# Test case(s)
[docs]
def get_regrid_file(self, case, field):
"""Return list of test regridded files"""
model_rg_loc = Path(self.adf.get_basic_info("cam_regrid_loc", required=True))
rlbl = self.ref_labels[field] # rlbl = "reference label" = the name of the reference data that defines target grid
return sorted(model_rg_loc.glob(f"{rlbl}_{case}_{field}_regridded.nc"))
[docs]
def load_regrid_dataset(self, case, field):
"""Return a data set to be used as reference (aka baseline) for variable field."""
fils = self.get_regrid_file(case, field)
if not fils:
warnings.warn(f"\t WARNING: Did not find regrid file(s) for case: {case}, variable: {field}")
return None
return self.load_dataset(fils)
[docs]
def load_regrid_da(self, case, field):
"""Return a data array to be used as reference (aka baseline) for variable field."""
add_offset, scale_factor = self.get_value_converters(case, field)
fils = self.get_regrid_file(case, field)
if not fils:
warnings.warn(f"\t WARNING: Did not find regrid file(s) for case: {case}, variable: {field}")
return None
return self.load_da(fils, field, add_offset=add_offset, scale_factor=scale_factor)
# Reference case (baseline/obs)
[docs]
def get_ref_regrid_file(self, case, field):
"""Return list of reference regridded files"""
if self.adf.compare_obs:
obs_loc = self.ref_var_loc.get(field, None)
if obs_loc:
fils = [str(obs_loc)]
else:
fils = []
else:
model_rg_loc = Path(self.adf.get_basic_info("cam_regrid_loc", required=True))
fils = sorted(model_rg_loc.glob(f"{case}_{field}_baseline.nc"))
return fils
[docs]
def load_reference_regrid_dataset(self, case, field):
"""Return a data set to be used as reference (aka baseline) for variable field."""
fils = self.get_ref_regrid_file(case, field)
if not fils:
warnings.warn(f"\t WARNING: Did not find regridded file(s) for case: {case}, variable: {field}")
return None
return self.load_dataset(fils)
[docs]
def load_reference_regrid_da(self, case, field):
"""Return a data array to be used as reference (aka baseline) for variable field."""
add_offset, scale_factor = self.get_value_converters(case, field)
fils = self.get_ref_regrid_file(case, field)
if not fils:
warnings.warn(f"\t WARNING: Did not find regridded file(s) for case: {case}, variable: {field}")
return None
#Change the variable name from CAM standard to what is
# listed in variable defaults for this observation field
if self.adf.compare_obs:
field = self.ref_var_nam[field]
return self.load_da(fils, field, add_offset=add_offset, scale_factor=scale_factor)
#------------------
# DataSet and DataArray load
#---------------------------
# Load DataSet
[docs]
def load_dataset(self, fils):
"""Return xarray DataSet from file(s)"""
if (len(fils) == 0):
warnings.warn("\t WARNING: Input file list is empty.")
return None
elif (len(fils) > 1):
ds = xr.open_mfdataset(fils, combine='by_coords')
else:
sfil = str(fils[0])
if not Path(sfil).is_file():
warnings.warn(f"\t WARNING: Expecting to find file: {sfil}")
return None
ds = xr.open_dataset(sfil)
if ds is None:
warnings.warn(f"\t WARNING: invalid data on load_dataset")
return ds
# Load DataArray
[docs]
def load_da(self, fils, variablename, **kwargs):
"""Return xarray DataArray from files(s) w/ optional scale factor, offset, and/or new units"""
ds = self.load_dataset(fils)
if ds is None:
warnings.warn(f"\t WARNING: Load failed for {variablename}")
return None
da = (ds[variablename]).squeeze()
scale_factor = kwargs.get('scale_factor', 1)
add_offset = kwargs.get('add_offset', 0)
da = da * scale_factor + add_offset
if variablename in self.adf.variable_defaults:
vres = self.adf.variable_defaults[variablename]
da.attrs['units'] = vres.get("new_unit", da.attrs.get('units', 'none'))
else:
da.attrs['units'] = 'none'
return da
# Get variable conversion defaults, if applicable
[docs]
def get_value_converters(self, case, variablename):
"""
Get variable defaults if applicable
- This is to get any scale factors or off-sets
Returns
-------
add_offset - int/float
scale_factor - int/float
"""
add_offset = 0
scale_factor = 1
res = self.adf.variable_defaults
if variablename in res:
vres = res[variablename]
if variablename in self.ref_labels:
if (case == self.ref_labels[variablename]) and (self.adf.compare_obs):
scale_factor = vres.get("obs_scale_factor",1)
add_offset = vres.get("obs_add_offset", 0)
else:
scale_factor = vres.get("scale_factor",1)
add_offset = vres.get("add_offset", 0)
return add_offset, scale_factor
#------------------