"""
Module: regional_map_multicase
Provides a plot with regional maps of specified variables for all cases (up to 10) in a row.
Since this is a specialty plot, it looks for several custom options to be provided in the YAML configuration file. For example, one might include this block in the YAML:
region_multicase:
region_spec: [slat, nlat, wlon, elon]
region_time_option: <calendar | zeroanchor> If calendar, will look for specified years. If zeroanchor will use a nyears starting from year_offset from the beginning of timeseries
region_start_year:
region_end_year:
region_nyear:
region_year_offset:
region_month: <NULL means look for season>
region_season: <NULL means use annual mean>
region_variables: <list of variables to try to use; allows for a subset of the total diag variables>
"""
#
# --- imports and configuration ---
#
from pathlib import Path
import warnings # use to warn user about missing files.
import numpy as np
import xarray as xr
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
from plotting_functions import pres_from_hybrid, prep_contour_plot
warnings.formatwarning = my_formatwarning
#
# --- Main Function Shares Name with Module: regional_map_multicase ---
#
[docs]
def regional_map_multicase(adfobj):
"""
regional_map_multicase
input -> ADF object
Sketch of workflow:
- check for regional options (see module docstring),
- get case names/locations (time series are used),
- determine plot location and type
- detect per-variable plot options
- Loop through variables: make one multi-panel plot per variable
"""
# Notify user that script has started:
msg = "\n Generating regional contour plots ..."
print(f"{msg}\n {'-' * (len(msg)-3)}")
# We need to know:
# - Variable to plot
# - Region to plot
# - Years to include in average
# - Months/Season to include in average
# - Cases ... reference on left, test(s) to the right --> IMPOSE UPPER LIMIT OF 10 PANELS
#
# Check if regional options were specified... can not proceed without them
#
regional_opts = adfobj.read_config_var("region_multicase")
if not regional_opts:
print(
"Regional options were not specified, so regional_map_multicase can not run. See documentation or config_cam_baseline_example.yaml for options to add to configuration file."
)
return
# case information
case_names = adfobj.get_cam_info("cam_case_name", required=True)
if len(case_names) > 10:
print("ERROR: regional_map_multicase is limited to <= 10 cases.")
return
case_loc = adfobj.get_cam_info("cam_ts_loc", required=True)
#
# Determine input "reference case"
#
if adfobj.get_basic_info("compare_obs"):
print("NotImplementedError: the regional contour plots do not have an observation option yet.")
return
# TODO: add observation (need time series), use `var_obs_dict`.
else:
#Extract required baseline run info:
data_name = adfobj.get_baseline_info("cam_case_name", required=True)
data_loc = adfobj.get_baseline_info("cam_ts_loc", required=True)
#
# Set plot options
#
res = adfobj.variable_defaults # dict of variable-specific plot preferences
basic_info_dict = adfobj.read_config_var("diag_basic_info")
plot_type = basic_info_dict.get("plot_type", "png")
redo_plot = adfobj.get_basic_info("redo_plot")
plot_loc = _get_plot_location(adfobj)
sampling_str = _get_sampling_string(regional_opts)
#
# Determine the list of variables to use (defaults to ADF's diag_var_list)
#
var_list = regional_opts.get("region_variables", adfobj.diag_var_list)
#
# LOOP OVER VARIABLES
#
for v in var_list:
#Notify user of variable being plotted:
print(f"\t - regional plots for {v}")
# the reference case
data_to_plot = {}
refcasetmp = _retrieve(
regional_opts, v, data_name, data_loc
) # get the baseline field
if refcasetmp is None:
wmsg = f"Variable '{v}' not found for case '{data_name}',"
wmsg += " so skipping that case/variable combo."
print(wmsg)
else:
#Add time mean values to dictionary:
data_to_plot[data_name] = refcasetmp.mean(dim="time")
#If the variable has a "lev" dimension, then it is likely
#has a vertical dimension, so go ahead and skip the variable.
#Note that this assume the variable has the same dimensions in
#both the ref case and the test case:
if "lev" in refcasetmp.dims:
print(f"Variable '{v}' has a 'lev' dimension, so no regional maps will be made.")
continue
#End if
#End if
# each of the other cases
for casenumber, case in enumerate(case_names):
casetmp = _retrieve(regional_opts, v, case, case_loc[casenumber])
if casetmp is None:
wmsg = f"Variable '{v}' not found for case '{case}',"
wmsg += " so skipping that case/variable combo."
print(wmsg)
else:
data_to_plot[case] = casetmp.mean(dim="time")
#End if
#End for
#The 'regional_map_multicase' function only works if at
#least two cases are available, so simply skip the variable
#in situations when that is not true:
if len(data_to_plot) < 2:
print(f"Less than two cases have variable `{v}', so no regional maps will be made.")
continue
#End if
plot = regional_map_multicase_plot(
adfobj, data_to_plot, opt=res.get(v)
) # res will be None if v not in the defaults
plot.suptitle(f"{v}, {sampling_str} Mean")
# save the output
plot_name = _construct_outfile_name(
plot_loc, plot_type, v, sampling_str
) # this has to be consistent with web site generator
#Save figure (must be done before website addition):
plot.savefig(plot_name, bbox_inches='tight')
#The website generator should be more flexible and allow
#for plots to be organized by season or variable. For now,
#just force the page to contain the variables by overloading
#the "season" option, even if the resulting HTML looks funny -JN:
adfobj.add_website_data(plot_name, "Regional_Maps", None,
season=v, multi_case=True)
#
# Helper functions
# - _retrieve
# - _get_month_number
# - _get_sampling_string
# - regional_map_multicase_plot
# - _get_plot_location
# - _construct_outfile_name
# - autoscale_title
# - simple_case_shortener
[docs]
def _retrieve(ropt, variable, casename, location, return_dataset=False):
"""Custom function that retrieves a variable.
Returns the variable as a DataArray, unless keyword return_dataset=True.
Applies the regional and time interval selections based in values in ropt.
ropt -> dict, regional options specified via YAML
variable -> str, the variable to get from files
casename -> str, case name for constructing file search
location -> Path, path to case
kwarg:
return_dataset -> bool, if true, return the dataset object, otherwise return the DataArray
with `variable`
"""
# note: this function assumes monthly data
fils = sorted(Path(location).glob(f"{casename}*.{variable}.*.nc"))
if len(fils) == 0:
#The variable doesn't appear to have been output during this
#particular case run, so return None:
return None
elif len(fils) > 1:
ds = xr.open_mfdataset(fils) # allows multi-file datasets
else:
ds = xr.open_dataset(fils[0])
# resampling is easier with DataArray
da = ds[variable]
# Apply regional selection logic
# -- assume lat/lon coords
da = da.sel(
lat=slice(ropt["region_spec"][0], ropt["region_spec"][1]),
lon=slice(ropt["region_spec"][2], ropt["region_spec"][3]),
)
# -- select time interval
if "region_time_option" in ropt:
if ropt["region_time_option"] == "calendar":
if ("region_start_year" in ropt) and ("region_end_year" in ropt):
da = da.loc[
{
"time": (da.time.dt.year >= ropt["region_start_year"])
& (da.time.dt.year <= ropt["region_end_year"])
}
]
elif ropt["region_time_option"] == "zeroanchor":
# This option is just in case one wants a specific length of sample after a given initial time,
# for example, a 20-year (nyear) sample after a 2-year (year_offset) spin up time.
# So if year_offset = 0, just will take the first nyear years.
# note: this is designed to be working with years, and does not care if years are complete in the data set or not.
min_year = da.time.dt.year.min()
da = da.loc[
{
"time": (da.time.dt.year >= min_year + ropt["region_year_offset"])
& (da.time.dt.year
<= min_year
+ ropt["region_year_offset"]
+ ropt["region_nyear"]
)
}
]
else:
print("ERROR: region_time_option must be calendar or zeroanchor")
# -- select month, season, or annual average (precedence in that order)
if "region_month" in ropt:
# print("-->FOUND region_month in ropt")
if ropt["region_month"] is not None:
month_number = _get_month_number(ropt["region_month"])
if month_number is not None:
da = da.loc[{"time": da.time.dt.month == month_number}]
# print(f"--> Good, month_number = {month_number}")
else:
month_number = None
else:
month_number = None
# print(f"Okay second chance -> month_number = {month_number}")
if (month_number is None) and ("region_season" in ropt):
month_length = da.time.dt.days_in_month
da = (da * month_length).resample(time="QS-DEC").sum(
dim="time"
) / month_length.resample(time="QS-DEC").sum(dim="time")
elif (month_number is None) and ("region_season" not in ropt):
print("Regional map is defaulting to annual mean.")
month_length = da.time.dt.days_in_month
da = (da * month_length).groupby("time.year").sum(
dim="time"
) / month_length.groupby("time.year").sum(dim="time")
da = da.rename({"year":"time"})
# else assume month_number is not none
if return_dataset:
return xr.Dataset(da)
else:
return da
[docs]
def _get_month_number(m):
"""
Utility to get the number (1-12) from a string form.
Input, `m`, can be string or integer.
If it is a string, can be 3-letter abbreviation or full month name.
If integer, needs to be 1-12.
Emits a useful string if it looks like a season abbreviation.
"""
from time import strptime
if isinstance(m, str):
if len(m) == 3:
if m in ["DJF", "MAM", "JJA", "SON"]:
print(f"Error: it looks like a season was supplied as a month: {m}")
n = None # fails if season was specified accidentally
else:
n = strptime(m, "%b").tm_mon # asumes 3-letter abbreviation (e.g., jan feb mar apr may ...)
elif len(m) > 3:
n = strptime(m, "%B").tm_mon # guessing full name (january, february, ...)
elif len(m) == 1:
n = strptime(m, "%m").tm_mon # guessing supplied the number
else:
print(f"ERROR: confused about what month is meant by {m}")
n = None
return n
elif isinstance(m, int):
return m
else:
print(f"Error: month supplied is neither string nor integer: {m}, {type(m)}")
return None
[docs]
def _get_sampling_string(ropt):
"""
Determines a string to use for the output file.
The month that is specified as `region_month`,
the season specified as `region_season`,
or `ANN` otherwise,
in that order of precedence.
Note: if month is specified as an integer, that is what will be returned.
"""
result = ropt.get("region_month")
if result is None:
result = ropt.get("region_season")
if result is None:
result = "ANN"
return result
[docs]
def regional_map_multicase_plot(adf, datadict, opt=None):
"""
Make a figure with up to 10 cases in a row of maps for specified region.
"""
ncols = len(datadict)
fig, ax = plt.subplots(
figsize=(4 * ncols, 3),
ncols=ncols,
constrained_layout=True,
sharex=True,
sharey=True,
subplot_kw={"projection": ccrs.PlateCarree()},
)
[a.coastlines() for a in ax]
cntrplts = []
titl = []
if opt is not None:
tiString = opt.pop("tiString", None)
tiFontSize = opt.pop("tiFontSize", 8)
else:
opt = {} # make an empty dict if one is not given
# an attempt to set titles to case names ... short names would be better:
short_names = simple_case_shortener(datadict.keys())
for i, xreg in enumerate(datadict):
assert len(datadict[xreg].dims) == 2, f"ERROR: the array must be 2D... has dims {datadict[xreg].dims}"
lons, lats = np.meshgrid(datadict[xreg].lon, datadict[xreg].lat)
#
# to do: check opt for all the things: cmap, contour levels, etc.
#
popts = prep_contour_plot(datadict[xreg], datadict[xreg], datadict[xreg] - datadict[xreg], **opt)
popts2 = {
**popts["contourf_opt"],
"cmap": popts["cmap1"],
"norm": popts["norm1"],
"levels": popts["levels1"],
}
cb_opt = popts.get("colorbar_opt")
cntrplts.append(
ax[i].contourf(lons, lats, datadict[xreg], transform=ccrs.PlateCarree(), **popts2)
) # can use ax[i] b/c it is only 1-d array of axes
titl.append(ax[i].set_title(short_names[i],loc='left'))
autoscale_title(titl[i], ax[i], fig)
gl = ax[i].gridlines(draw_labels=True, linewidth=0)
gl.top_labels = False
gl.right_labels = False
if ax[i].get_subplotspec().is_first_col():
gl.left_labels = True
else:
gl.left_labels = False
cb = fig.colorbar(
mpl.cm.ScalarMappable(norm=popts2["norm"], cmap=popts2["cmap"]),
ax=ax,
orientation="horizontal",
aspect=60,
**cb_opt,
)
return fig
[docs]
def _get_plot_location(adfobj):
"""
Determine plot location based on ADF object.
Create location if it is not a directory.
Return a Path object to the directory.
"""
plot_location = adfobj.plot_location # ADF output path for plots and tables
if not plot_location:
plot_location = adfobj.get_basic_info("cam_diag_plot_loc")
if isinstance(plot_location, list):
for pl in plot_location:
plpth = Path(pl)
# Check if plot output directory exists, and if not, then create it:
if not plpth.is_dir():
print(f"\t {pl} not found, making new directory")
plpth.mkdir(parents=True)
if len(plot_location) == 1:
plot_loc = Path(plot_location[0])
else:
print(
f"Ambiguous plotting location since all cases go on same plot. Will put them in first location: {plot_location[0]}"
)
plot_loc = Path(plot_location[0])
else:
plot_loc = Path(plot_location)
return plot_loc
[docs]
def _construct_outfile_name(ploc, ptype, *args):
"""
Construct a complete Path object for the output plot.
ploc -> plot location directory (expects Path object)
ptype -> file extension for output (e.g., pdf, png)
args -> arbitrary arguments to be joined by underscores
Example:
_construct_outfile_name("/scratch/plots", "png", "cam6.120", "PRECT", "March", "LabSea")
[returns] /scratch/plots/cam6.120_PRECT_March_LabSea_MultiCaseRegion_Mean.png
"""
cat = "_".join(args)
return ploc / f"{cat}_MultiCaseRegion_Mean.{ptype}"
[docs]
def autoscale_title(tiobj, ax, fig):
"""Utility function that tries to shrink the title to fit on top of the Axes."""
renderer = fig.canvas.get_renderer()
fsz = tiobj.get_fontsize()
while (fsz>6) and (tiobj.get_window_extent(renderer=renderer).width > ax.bbox.width):
fsz -= 1
tiobj.set_fontsize(fsz)
[docs]
def simple_case_shortener(case_names):
"""Reduce case names by removing a common prefix.
Short case names would be better!
"""
szs = [len(c) for c in case_names]
minsize = np.min(szs)
for i in range(minsize):
chkchar = [c[i] for c in case_names]
if all(ele == chkchar[0] for ele in chkchar):
continue
else:
return [ele[i:] for ele in case_names]
return case_names