Source code for lib.adf_info

"""
Information/Parameter (Info) class for
the Atmospheric Diagnostics Framework (ADF).

This class inherits from the AdfConfig class.
Currently this class does four things:

1.  Initializes an instance of AdfConfig.

2.  Checks for the three, required upper-level
    dictionaries specified in the config file,
    and makes copies where the variables have
    been expanded.

3.  Extract values for "compare_obs", "diag_var_list",
    and "plot_location", and provide properties to
    access these values to the rest of ADF.

4.  Set "num_procs" variable, and provde num_procs
    property to the rest of ADF.

This class also provide methods for extracting
variables from the standard, expanded config
dictionaries.
"""

#++++++++++++++++++++++++++++++
#Import standard python modules
#++++++++++++++++++++++++++++++

from pathlib import Path
import copy
import os
import getpass

#+++++++++++++++++++++++++++++++++++++++++++++++++
#import non-standard python modules, including ADF
#+++++++++++++++++++++++++++++++++++++++++++++++++

# pylint: disable=unused-import
import numpy as np
import xarray as xr
# pylint: enable=unused-import

#ADF modules:
from .adf_config import AdfConfig
from .adf_base   import AdfError

#+++++++++++++++++++
#Define Obs class
#+++++++++++++++++++

[docs] class AdfInfo(AdfConfig): """ Information/Parameter class, which initializes an AdfConfig object and provides additional variables and methods to simplify access to the standard, expanded config dictionaries. """ def __init__(self, config_file, debug=False): """ Initalize ADF Info object. """ #Initialize Config attributes: super().__init__(config_file, debug=debug) #Add basic diagnostic info to object: self.__basic_info = self.read_config_var('diag_basic_info', required=True) #Expand basic info variable strings: self.expand_references(self.__basic_info) #Add CAM climatology info to object: self.__cam_climo_info = self.read_config_var('diag_cam_climo', required=True) #Expand CAM climo info variable strings: self.expand_references(self.__cam_climo_info) # Add CVDP info to object: self.__cvdp_info = self.read_config_var("diag_cvdp_info") # Expand CVDP climo info variable strings: if self.__cvdp_info is not None: self.expand_references(self.__cvdp_info) # End if # Add MDTF info to object: self.__mdtf_info = self.read_config_var("diag_mdtf_info") if self.__mdtf_info is not None: self.expand_references(self.__mdtf_info) # End if # Get the current system user self.__user = getpass.getuser() # Check if inputs are of the correct type: # ------------------------------------------- #Use "cam_case_name" as the variable that sets the total number of cases: if isinstance(self.get_cam_info("cam_case_name", required=True), list): #Extract total number of test cases: self.__num_cases = len(self.get_cam_info("cam_case_name")) else: #Set number of cases to one: self.__num_cases = 1 #End if #Loop over all items in config dict: for conf_var, conf_val in self.__cam_climo_info.items(): # Hist_str can be a list for each case, so set it as a nested list here if "hist_str" in conf_var: self.hist_str_to_list(conf_var, conf_val) elif isinstance(conf_val, list): # If a list, then make sure it is has the correct number of entries: if not len(conf_val) == self.__num_cases: emsg = f"diag_cam_climo config variable '{conf_var}' should have" emsg += f" {self.__num_cases} entries, instead it has {len(conf_val)}" self.end_diag_fail(emsg) else: #If not a list, then convert it to one: self.__cam_climo_info[conf_var] = [conf_val] #End if #End for #------------------------------------------- #Initialize ADF variable list: self.__diag_var_list = self.read_config_var('diag_var_list', required=True) #Case names: case_names = self.get_cam_info('cam_case_name', required=True) #Grab test case nickname(s) test_nickname_list = self.get_cam_info('case_nickname') if test_nickname_list: test_nicknames = [] #set to be an empty list for i,nickname in enumerate(test_nickname_list): if nickname is None: test_nicknames.append(case_names[i]) else: test_nicknames.append(test_nickname_list[i]) #End if #End for else: test_nicknames = [] #Re-set to be an empty list for case_name in case_names: test_nicknames.append(case_name) #End for #End if self.__base_hist_str = "" #Initialize "compare_obs" variable: self.__compare_obs = self.get_basic_info('compare_obs') #Check if a CAM vs AMWG obs comparison is being performed: if self.__compare_obs: #If so, then set the baseline info to None, to ensure any scripts #that check this variable won't crash: self.__cam_bl_climo_info = None # Set baseline hist string object to None self.__base_hist_str = None #Also set data name for use below: data_name = "Obs" base_nickname = "Obs" #Set the baseline years to empty strings: syear_baseline = "" eyear_baseline = "" else: #If not, then assume a CAM vs CAM run and add CAM baseline climatology info to object: self.__cam_bl_climo_info = self.read_config_var('diag_cam_baseline_climo', required=True) #Expand CAM baseline climo info variable strings: self.expand_references(self.__cam_bl_climo_info) #Set data name to baseline case name: data_name = self.get_baseline_info('cam_case_name', required=True) #Attempt to grab baseline start_years (not currently required): syear_baseline = self.get_baseline_info('start_year') eyear_baseline = self.get_baseline_info('end_year') #Get climo years for verification or assignment if missing baseline_hist_locs = self.get_baseline_info('cam_hist_loc') # Read hist_str (component.hist_num, eg cam.h0) from the yaml file baseline_hist_str = self.get_baseline_info("hist_str") #Check if any time series files are pre-made baseline_ts_done = self.get_baseline_info("cam_ts_done") #Check if time series files already exist, #if so don't rely on climo years from history location if baseline_ts_done: baseline_hist_locs = None #Grab baseline time series file location input_ts_baseline = self.get_baseline_info("cam_ts_loc", required=True) input_ts_loc = Path(input_ts_baseline) #Get years from pre-made timeseries file(s) found_syear_baseline, found_eyear_baseline = self.get_climo_yrs_from_ts( input_ts_loc, data_name) found_yr_range = np.arange(found_syear_baseline,found_eyear_baseline,1) #History file path isn't needed if user is running ADF directly on time series. #So make sure start and end year are specified: if syear_baseline is None: msg = f"\t WARNING: No given start year for {data_name}, " msg += f"using first found year: {found_syear_baseline}" print(msg) syear_baseline = found_syear_baseline if syear_baseline not in found_yr_range: msg = f"\t WARNING: Given start year '{syear_baseline}' is not in current dataset " msg += f"{data_name}, using first found year: {found_syear_baseline}" print(msg) syear_baseline = found_syear_baseline if eyear_baseline is None: msg = f"\t WARNING: No given end year for {data_name}, " msg += f"using last found year: {found_eyear_baseline}" print(msg) eyear_baseline = found_eyear_baseline if eyear_baseline not in found_yr_range: msg = f"\t WARNING: Given end year '{eyear_baseline}' is not in current dataset " msg += f"{data_name}, using first found year: {found_eyear_baseline}" print(msg) eyear_baseline = found_eyear_baseline # End if # Check if history file path exists: if any(baseline_hist_locs): #Check if user provided if not baseline_hist_str: baseline_hist_str = ['cam.h0a'] else: #Make list if not already if not isinstance(baseline_hist_str, list): baseline_hist_str = [baseline_hist_str] #Initialize baseline history string list self.__base_hist_str = baseline_hist_str #Grab first possible hist string, just looking for years of run base_hist_str = baseline_hist_str[0] starting_location = Path(baseline_hist_locs) print(f"\tChecking history files in '{starting_location}'") file_list = sorted(starting_location.glob("*" + base_hist_str + ".*.nc")) #Check if the history file location exists if not starting_location.is_dir(): msg = "Checking history file location:\n" msg += f"\tThere is no history file location: '{starting_location}'." self.debug_log(msg) emsg = f"{data_name} starting_location: History file location not found!\n" emsg += "\tTry checking the path 'cam_hist_loc' in 'diag_cam_baseline_climo' " emsg += "section in your config file is correct..." self.end_diag_fail(emsg) file_list = sorted(starting_location.glob("*" + base_hist_str + ".*.nc")) #Check if there are any history files if len(file_list) == 0: msg = "Checking history files:\n" msg += f"\tThere are no history files in '{starting_location}'." self.debug_log(msg) emsg = f"{data_name} starting_location {starting_location}: " emsg += f"No history files found for {base_hist_str}!\n" emsg += "\tTry checking the path 'cam_hist_loc' or the 'hist_str' " emsg += " in 'diag_cam_baseline_climo' " emsg += "section in your config file are correct..." self.end_diag_fail(emsg) # Partition string to find exactly where h-number is # This cuts the string before and after the `{hist_str}.` sub-string # so there will always be three parts: # before sub-string, sub-string, and after sub-string #Since the last part always includes the time range, grab that with last index (2) #NOTE: this is based off the current CAM file name structure in the form: # $CASE.cam.h#.YYYY<other date info>.nc base_climo_yrs = [int(str(i).partition(f"{base_hist_str}.")[2][0:4]) for i in file_list] if not base_climo_yrs: msg = f"\t ERROR: No climo years found in {baseline_hist_locs}, " raise AdfError(msg) base_climo_yrs = sorted(np.unique(base_climo_yrs)) base_found_syr = int(base_climo_yrs[0]) base_found_eyr = int(base_climo_yrs[-1]) #Check if start or end year is missing. If so then just assume it is the #start or end of the entire available model data. if syear_baseline is None: msg = f"\t WARNING: No given start year for {data_name}, " msg += f"using first found year: {base_found_syr}" print(msg) syear_baseline = base_found_syr if syear_baseline not in base_climo_yrs: msg = f"\t WARNING: Given start year '{syear_baseline}' is not in current dataset " msg += f"{data_name}, using first found year: {base_climo_yrs[0]}" print(msg) syear_baseline = base_found_syr if eyear_baseline is None: msg = f"\t WARNING: No given end year for {data_name}, " msg += f"using last found year: {base_found_eyr}" print(msg) eyear_baseline = base_found_eyr if eyear_baseline not in base_climo_yrs: msg = f"\t WARNING: Given end year '{eyear_baseline}' is not in current dataset " msg += f"{data_name}, using last found year: {base_climo_yrs[-1]}" print(msg) eyear_baseline = base_found_eyr #Grab baseline nickname base_nickname = self.get_baseline_info('case_nickname') if base_nickname is None: base_nickname = data_name #End if #Grab baseline nickname base_nickname = self.get_baseline_info('case_nickname') if base_nickname is None: base_nickname = data_name #Get integer for baseline years for searching climo files syear_baseline = int(syear_baseline) eyear_baseline = int(eyear_baseline) #Update baseline case name: data_name += f"_{syear_baseline}_{eyear_baseline}" #End if (compare_obs) #Initialize case nicknames: self.__test_nicknames = test_nicknames self.__base_nickname = base_nickname #Save starting and ending years as object variables: self.__syear_baseline = syear_baseline self.__eyear_baseline = eyear_baseline #Create plot location variable for potential use by the website generator. #Please note that this is also assumed to be the output location for the analyses scripts: #------------------------------------------------------------------------- self.__plot_location = [] #Must be a list to manage multiple cases #Plot directory: plot_dir = self.get_basic_info('cam_diag_plot_loc', required=True) #Case names: case_names = self.get_cam_info('cam_case_name', required=True) #Start years (not currently required): syears = self.get_cam_info('start_year') #End year (not currently rquired): eyears = self.get_cam_info('end_year') #Make lists of None to be iterated over for case_names if syears is None: syears = [None]*len(case_names) #End if if eyears is None: eyears = [None]*len(case_names) #End if #Extract cam history files location: cam_hist_locs = self.get_cam_info('cam_hist_loc') #Get cleaned nested list of hist_str for test case(s) (component.hist_num, eg cam.h0) cam_hist_str = self.__cam_climo_info.get('hist_str', None) if not cam_hist_str: hist_str = [['cam.h0a']]*self.__num_cases else: hist_str = cam_hist_str #End if #Initialize CAM history string nested list self.__hist_str = hist_str #Check if using pre-made ts files cam_ts_done = self.get_cam_info("cam_ts_done") #Grab case time series file location(s) input_ts_locs = self.get_cam_info("cam_ts_loc", required=True) #Loop over cases: syears_fixed = [] eyears_fixed = [] for case_idx, case_name in enumerate(case_names): syear = syears[case_idx] eyear = eyears[case_idx] #Check if time series files exist, if so don't rely on climo years if cam_ts_done[case_idx]: cam_hist_locs[case_idx] = None #Grab case time series file location input_ts_loc = Path(input_ts_locs[case_idx]) print(f"Checking existing time-series files in {input_ts_loc}") #Get years from pre-made timeseries file(s) found_syear, found_eyear = self.get_climo_yrs_from_ts(input_ts_loc, case_name) found_yr_range = np.arange(found_syear,found_eyear,1) #History file path isn't needed if user is running ADF directly on time series. #So make sure start and end year are specified: if syear is None: msg = f"\t WARNING: No given start year for {case_name}, " msg += f"using first found year: {found_syear}" print(msg) syear = found_syear if syear not in found_yr_range: msg = f"\t WARNING: Given start year '{syear}' is not in current dataset " msg += f"{case_name}, using first found year: {found_syear}\n" print(msg) syear = found_syear #End if if eyear is None: msg = f"\t WARNING: No given end year for {case_name}, " msg += f"using last found year: {found_eyear}" print(msg) eyear = found_eyear if eyear not in found_yr_range: msg = f"\t WARNING: Given end year '{eyear}' is not in current dataset " msg += f"{case_name}, using last found year: {found_eyear}\n" print(msg) eyear = found_eyear #End if #End if #Check if history file path exists: hist_str_case = hist_str[case_idx] if any(cam_hist_locs): #Grab first possible hist string, just looking for years of run hist_str = hist_str_case[0] #Get climo years for verification or assignment if missing starting_location = Path(cam_hist_locs[case_idx]) print(f"\tChecking history files in '{starting_location}'") file_list = sorted(starting_location.glob('*'+hist_str+'.*.nc')) #Check if the history file location exists if not starting_location.is_dir(): msg = "Checking history file location:\n" msg += f"\tThere is no history file location: '{starting_location}'." self.debug_log(msg) emsg = f"{case_name} starting_location: History file location not found!\n" emsg += "\tTry checking the path 'cam_hist_loc' in 'diag_cam_climo' " emsg += "section in your config file is correct..." self.end_diag_fail(emsg) #Check if there are any history files file_list = sorted(starting_location.glob('*'+hist_str+'.*.nc')) if len(file_list) == 0: msg = "Checking history files:\n" msg += f"\tThere are no history files in '{starting_location}'." self.debug_log(msg) emsg = f"{case_name} starting_location {starting_location}: " emsg += f"No history files found for {hist_str}!\n" emsg += "\tTry checking the path 'cam_hist_loc' or the 'hist_str' " emsg += "in 'diag_cam_climo' " emsg += "section in your config file are correct..." self.end_diag_fail(emsg) #Partition string to find exactly where h-number is #This cuts the string before and after the `{hist_str}.` sub-string # so there will always be three parts: # before sub-string, sub-string, and after sub-string #Since the last part always includes the time range, grab that with last index (2) #NOTE: this is based off the current CAM file name structure in the form: # $CASE.cam.h#.YYYY<other date info>.nc case_climo_yrs = [int(str(i).partition(f"{hist_str}.")[2][0:4]) for i in file_list] if not case_climo_yrs: msg = f"\t ERROR: No climo years found in {cam_hist_locs[case_idx]}, " raise AdfError(msg) case_climo_yrs = sorted(np.unique(case_climo_yrs)) case_found_syr = int(case_climo_yrs[0]) case_found_eyr = int(case_climo_yrs[-1]) #Check if start or end year is missing. If so then just assume it is the #start or end of the entire available model data. if syear is None: msg = f"\t WARNING: No given start year for {case_name}, " msg += f"using first found year: {case_found_syr}" print(msg) syear = case_found_syr if syear not in case_climo_yrs: msg = f"\t WARNING: Given start year '{syear}' is not in current dataset " msg += f"{case_name}, using first found year: {case_climo_yrs[0]}\n" print(msg) syear = case_found_syr #End if if eyear is None: msg = f"\t WARNING: No given end year for {case_name}, " msg += f"using last found year: {case_found_eyr}" print(msg) eyear = case_found_eyr if eyear not in case_climo_yrs: msg = f"\t WARNING: Given end year '{eyear}' is not in current dataset " msg += f"{case_name}, using last found year: {case_climo_yrs[-1]}\n" print(msg) eyear = case_found_eyr #End if #End if #Update climo year lists in case anything changed syear = int(syear) eyear = int(eyear) syears_fixed.append(syear) eyears_fixed.append(eyear) #Update case name with provided/found years: case_name += f"_{syear}_{eyear}" #Set the final directory name and save it to plot_location: direc_name = f"{case_name}_vs_{data_name}" plot_loc = os.path.join(plot_dir, direc_name) self.__plot_location.append(plot_loc) #If first iteration, then save directory name for use by baseline: first_case_dir = '' if case_idx == 0: first_case_dir = direc_name #End if #Go ahead and make the diag plot location if it doesn't exist already diag_location = Path(plot_loc) print(f"\n\tDiagnostic Plot Location: {diag_location}") if not diag_location.is_dir(): print(f"\tINFO: Directory not found, making new diagnostic plot location") diag_location.mkdir(parents=True) #End for self.__syears = syears_fixed self.__eyears = eyears_fixed #Finally add baseline case (if applicable) for use by the website table #generator. These files will be stored in the same location as the first #listed case. if not self.compare_obs: self.__plot_location.append(os.path.join(plot_dir, first_case_dir)) #End if #------------------------------------------------------------------------- #Initialize "num_procs" variable: #----------------------------------------- temp_num_procs = self.get_basic_info('num_procs') if not temp_num_procs: #Variable not present, so set to a single processor: self.__num_procs = 1 else: #Check if variable is a string and matches keyword: if isinstance(temp_num_procs, str) and \ temp_num_procs.strip() == "*": #Set number of processors to total number of CPUs #on the node. Please note that at some point this #may need to be replaced with a DASK implementation #instead: #First try to get CPUs allowed by OS for process to use: try: self.__num_procs = len(os.sched_getaffinity(0)) except AttributeError: #Operating system doesn't support getaffinity, so try #straight CPU number: if os.cpu_count(): self.__num_procs = os.cpu_count() else: #Something is weird with this Operating System, #so warn user and then try to run in serial mode: wmsg = "WARNING!!!! ADF unable to determine how" wmsg += " many processors are availble on this system," wmsg += " so defaulting to a single process/core." print(wmsg) self.__num_procs = 1 #End if #End except else: #If anything else, then try to convert to integer: try: self.__num_procs = int(temp_num_procs) except ValueError: #This variable has been set to something that #can't be converted into an integer, so warn #user and then try to run in serial mode: wmsg = "WARNING!!!! The 'num_procs' variable" wmsg += f" has been set to '{temp_num_procs}'" wmsg += " which cannot be converted to an integer." wmsg += "\nThe ADF will now default to a single core" wmsg += " and attempt to run." print(wmsg) self.__num_procs = 1 #End except #End if #End if #Print number of processors being used to debug log (if requested): self.debug_log(f"ADF is running with {self.__num_procs} processors.") # ----------------------------------------- #########
[docs] def hist_str_to_list(self, conf_var, conf_val): """ Make hist_str a nested list [ncases,nfiles] of the given value(s) """ if isinstance(conf_val, list): hist_str = conf_val else: # one case, one hist str hist_str = [ conf_val ] self.__cam_climo_info[conf_var] = [hist_str]
# ----------------------------------------- ######### # Create property needed to return "user" name to user: @property def user(self): """Return the "user" name if requested.""" return self.__user # Create property needed to return "compare_obs" logical to user: @property def compare_obs(self): """Return the "compare_obs" logical to the user if requested.""" return self.__compare_obs # Create property needed to return the number of test cases (num_cases) to user: @property def num_cases(self): """Return the "num_cases" integer value to the user if requested.""" return self.__num_cases # Create property needed to return "diag_var_list" list to user: @property def diag_var_list(self): """Return a copy of the "diag_var_list" list to the user if requested.""" #Note that a copy is needed in order to avoid having a script mistakenly #modify this variable, as it is mutable and thus passed by reference: return copy.copy(self.__diag_var_list) # Create property needed to return "basic_info" expanded dictionary to user: @property def basic_info_dict(self): """Return a copy of the "basic_info" list to the user if requested.""" #Note that a copy is needed in order to avoid having a script mistakenly #modify this variable, as it is mutable and thus passed by reference: return copy.copy(self.__basic_info) # Create property needed to return "basic_info" expanded dictionary to user: @property def cam_climo_dict(self): """Return a copy of the "cam_climo_dict" list to the user if requested.""" #Note that a copy is needed in order to avoid having a script mistakenly #modify this variable, as it is mutable and thus passed by reference: return copy.copy(self.__cam_climo_info) # Create property needed to return "basic_info" expanded dictionary to user: @property def baseline_climo_dict(self): """Return a copy of the "cam_bl_climo_info" list to the user if requested.""" #Note that a copy is needed in order to avoid having a script mistakenly #modify this variable, as it is mutable and thus passed by reference: return copy.copy(self.__cam_bl_climo_info) # Create property needed to return "num_procs" to user: @property def num_procs(self): """Return the "num_procs" logical to the user if requested.""" return self.__num_procs # Create property needed to return "plot_location" variable to user: @property def plot_location(self): """Return a copy of the '__plot_location' string list to user if requested.""" #Note that a copy is needed in order to avoid having a script mistakenly #modify this variable: return copy.copy(self.__plot_location) # Create property needed to return the climo start (syear) and end (eyear) years to user: @property def climo_yrs(self): """Return the "syear" and "eyear" integer values to the user if requested.""" syears = copy.copy(self.__syears) #Send copies so a script doesn't modify the original eyears = copy.copy(self.__eyears) return {"syears":syears,"eyears":eyears, "syear_baseline":self.__syear_baseline, "eyear_baseline":self.__eyear_baseline} # Create property needed to return the case nicknames to user: @property def case_nicknames(self): """Return the test case and baseline nicknames to the user if requested.""" #Note that copies are needed in order to avoid having a script mistakenly #modify these variables, as they are mutable and thus passed by reference: test_nicknames = copy.copy(self.__test_nicknames) base_nickname = self.__base_nickname return {"test_nicknames":test_nicknames,"base_nickname":base_nickname} @property def hist_string(self): """ Return the CAM history string list to the user if requested.""" cam_hist_strs = copy.copy(self.__hist_str) if self.__base_hist_str: base_hist_strs = copy.copy(self.__base_hist_str) else: base_hist_strs = "" hist_strs = {"test_hist_str":cam_hist_strs, "base_hist_str":base_hist_strs} return hist_strs ######### #Utility function to access expanded 'diag_basic_info' variables:
[docs] def get_basic_info(self, var_str, required=False): """ Return the config variable from 'diag_basic_info' as requested by the user. """ return self.read_config_var(var_str, conf_dict=self.__basic_info, required=required)
######### #Utility function to access expanded 'diag_cam_climo' variables:
[docs] def get_cam_info(self, var_str, required=False): """ Return the config variable from 'diag_cam_climo' as requested by the user. """ return self.read_config_var(var_str, conf_dict=self.__cam_climo_info, required=required)
######### #Utility function to access expanded 'diag_cam_baseline_climo' variables:
[docs] def get_baseline_info(self, var_str, required=False): """ Return the config variable from 'diag_cam_baseline_climo' as requested by the user. This function assumes that if the user is requesting it, then it must be required. """ #Check if the cam baseline dictionary exists: if not self.__cam_bl_climo_info: #If required, then throw an error: if required: emsg = "get_baseline_info: Requested variable cannot be found" emsg += " because no baseline info exists.\n" emsg += "This is likely because an observational comparison is being done," emsg += " so try adding 'required = False' to the get call." self.end_diag_fail(emsg) #End if #If not required, then return none: return None #End if #If basline dictionary exists, then search for variable like normal: return self.read_config_var(var_str, conf_dict=self.__cam_bl_climo_info, required=required)
######### #Utility function to add a new model variable to the ADF (diag) variable list:
[docs] def add_diag_var(self, var_str): """ Adds a new variable to the ADF variable list """ if var_str not in self.__diag_var_list: self.__diag_var_list.append(var_str)
#End if ######### # Utility function to access expanded 'diag_cvdp_info' variables
[docs] def get_cvdp_info(self, var_str, required=False): """ Return the config variable from 'diag_cvdp_info' as requested by the user. If 'diag_cvdp_info' is not found then try grabbing the variable from the top level of the YAML config file dictionary instead. """ return self.read_config_var( var_str, conf_dict=self.__cvdp_info, required=required )
######### # Utility function to access expanded 'diag_mdtf_info' variables
[docs] def get_mdtf_info(self, var_str, required=False): """ Return the config variable from 'diag_mdtf_info' as requested by the user. If 'diag_mdtf_info' is not found then try grabbing the variable from the top level of the YAML config file dictionary instead. """ return self.read_config_var( var_str, conf_dict=self.__mdtf_info, required=required )
######### # Utility function to grab climo years from pre-made time series files:
[docs] def get_climo_yrs_from_ts(self, input_ts_loc, case_name): """ Grab start and end climo years if none are specified in config file for pre-made time series file(s) Return ------ - start year - end year """ #Grab variable list var_list = self.diag_var_list #Create "Path" objects: input_location = Path(input_ts_loc) #Check that time series input directory actually exists: if not input_location.is_dir(): errmsg = f"\t ERROR: Time series directory '{input_ts_loc}' not found. Script is exiting." raise AdfError(errmsg) # Search for first available variable in var_list to get a time series file to read # NOTE: it is assumed all the variables have the same dates! # Also, it is assumed that only h0 files should be climo-ed. for var in var_list: ts_files = sorted(input_location.glob(f"{case_name}*h0*.{var}.*nc")) if ts_files: break else: logmsg = "get years for time series:" logmsg = f"\n\tVar '{var}' not in dataset, skip to next to try and find climo years..." self.debug_log(logmsg) #Read in file(s) if len(ts_files) == 1: cam_ts_data = xr.open_dataset(ts_files[0], decode_times=True) else: cam_ts_data = xr.open_mfdataset(ts_files, decode_times=True, combine='by_coords') #Average time dimension over time bounds, if bounds exist: if 'time_bnds' in cam_ts_data: time_bounds_name = 'time_bnds' elif 'time_bounds' in cam_ts_data: time_bounds_name = 'time_bounds' else: time_bounds_name = None if time_bounds_name: time = cam_ts_data['time'] #NOTE: force `load` here b/c if dask & time is cftime, #throws a NotImplementedError: time = xr.DataArray(cam_ts_data[time_bounds_name].load().mean(dim='nbnd').values, dims=time.dims, attrs=time.attrs) cam_ts_data['time'] = time cam_ts_data.assign_coords(time=time) cam_ts_data = xr.decode_cf(cam_ts_data) #Extract first and last years from dataset: syr = int(cam_ts_data.time[0].dt.year.values) eyr = int(cam_ts_data.time[-1].dt.year.values) if eyr-syr >= 100: msg = f"WARNING: the found climo year range is large: {eyr-syr} years, " msg += "this may take a long time!" print(msg) return syr, eyr
#++++++++++++++++++++ #End Class definition #++++++++++++++++++++