import os, sys, copy
from typing import NewType
import pandas as pd
import numpy as np
from . import nwsrfs
from . import utils
#import pdb; pdb.set_trace()
#create a new type for sacsnow_tci output
SACSnowTCI = NewType('SACSnowTCI',pd.DataFrame)
"""
Custom type alias for total channel inflow (tci) output.
This type represents a pandas DataFrame containing tci values with a column
for each zone (units: mm). It is strictly used to ensure type safety
when passing tci data between the :class:`~nwsrfs_py.nwsrfs.SacSnow` and :class:`~nwsrfs_py.nwsrfs.GammaUh` classes.
"""
class _NwrfcAcPrep:
'''
This function reads in forcing, streamflow, and optimized parameters from the NWRFC autocalibration process.
Files and folders in ``autocalb_dir`` must follow file conventions of the NOAA-NWRFC/nwsrfs-hydro-autocalibration repository
optimization tools. The forcing csv files must be in the directory ``'forcing_por_*'``, even if it is a routing only reach
(i.e. no local inflow modeled by SAC-SMA, Snow17, UH).
If no ``run_dir`` is provided, the first ``'results_*'`` directory found within the ``autocalb_dir`` path will be used.
Arguments:
autocalb_dir (str): Path to a NWRFC autocalibration directory
run_dir (str | None): Name of optimization run subdirectory within the ``autocal_dir``. If ``'None'`` is provided, defaults to using first ``'results_*'``' directory found within the ``autocalb_dir``.
'''
def __init__(self,
autocalb_dir: str,
run_dir: str | None = None):
#Check if directory exist and assign
if run_dir is not None:
check_path = os.path.join(autocalb_dir,run_dir)
else:
check_path = autocalb_dir
self.autocalb_dir = autocalb_dir
self._autocalb_dir_basename = os.path.basename(self.autocalb_dir)
if not os.path.isdir(check_path):
msg = f'{check_path} is not a directory.'
raise ValueError(msg)
#Search autocalb directory for csv files.
self.autocalb_files = self._create_dir_df(self.autocalb_dir)
#Assign run_dir if not specified
if run_dir is not None:
self.run_dir = run_dir
else:
results_dir_query = self.autocalb_files.loc[self.autocalb_files.folder.str.startswith('results'),'folder'].sort_values().unique()
if len(results_dir_query) == 0:
raise ValueError(f"{autocalb_dir} contains no results_* folder.")
self.run_dir = results_dir_query[0]
msg = f'Defaulting to using the following results directory: {self.run_dir}'
print(msg)
#Filter out files that are not in either autocalb_dir or the run_dir
self.autocalb_files = self.autocalb_files.loc[self.autocalb_files.folder.isin([self._autocalb_dir_basename,self.run_dir])]
#Validate autocalb_dir and run_dir contents
self._validate_contents()
#Read in files
self._load_files()
#Catalog nwsrfs model being utilized
self._interrogate_pars()
#Extract time vectors
self.dates = self.forcings_raw['map'].index.rename('datetime')
self.year = self.forcings_raw['map'].index.year.to_numpy()
self.month = self.forcings_raw['map'].index.month.to_numpy()
self.day = self.forcings_raw ['map'].index.day.to_numpy()
self.hour = self.forcings_raw['map'].index.hour.to_numpy()
#Model timestep in hours
self.dt_hours = int(utils._define_timestep_sec(self.year, self.month, self.day, self.hour)/3600)
# If a routing only reach only set forcings to None, otherwise assign zones to each DataFrame's column names
if not self.localflow_logic:
self.forcings_raw = None
else:
self.forcings_raw = {key: df.set_axis(self.zone_names, axis=1) for key, df in self.forcings_raw.items()}
#If a routing component, assign upflow names to columns
if self.upflow_logic:
self.upflow = self.upflow.set_axis(self.upflow_names, axis=1)
def _validate_contents(self):
'''
Validates that necessary files and within ``autocalb_dir``, and checks which auxiliary file are present
'''
#Check for optimal parameter file
self.pars_path = os.path.join(self.autocalb_dir,self.run_dir,'pars_optimal.csv')
if not os.path.isfile(self.pars_path):
msg = f'{self.pars_path} is missing'
raise ValueError(msg)
#Check for flow_daily files
self._daily_flow_df = self.autocalb_files.loc[self.autocalb_files.file_name.str.startswith('flow_daily')]
if len(self._daily_flow_df) == 0:
msg = 'Daily flow csv file is missing. File must start with flow_daily_*.'
raise ValueError(msg)
elif len(self._daily_flow_df) > 1:
msg = 'Daily flow csv file is ambiguous. Only one csv file must start with flow_daily_*.'
raise ValueError(msg)
self.daily_flow_path = os.path.join(self._daily_flow_df.path.squeeze(),self._daily_flow_df.file_name.squeeze())
#Check for forcing files
self.forcings_por_df = self.autocalb_files.loc[self.autocalb_files.file_name.str.startswith('forcing_por')].copy()
self.forcings_por_df.sort_values(by='file_name',inplace=True)
if len(self.forcings_por_df) == 0:
msg = 'POR forcing csv files are missing. Files must start with forcing_por_*.'
raise ValueError(msg)
#Check for instantaneous flow file
self._inst_flow_df = self.autocalb_files.loc[self.autocalb_files.file_name.str.startswith('flow_instantaneous')]
if len(self._inst_flow_df) == 0:
self.inst_flow_logic = False
self.inst_flow_path = None
elif len(self._inst_flow_df) == 1:
self.inst_flow_logic = True
self.inst_flow_path = os.path.join(self._inst_flow_df.path.squeeze(),self._inst_flow_df.file_name.squeeze())
else:
msg = 'Instantaneous flow csv file is ambiguous. Only one csv file must start with flow_instantaneous_*.'
raise ValueError(msg)
#Check for upflow flow files
self.upflow_df = self.autocalb_files.loc[self.autocalb_files.file_name.str.startswith('upflow')].copy()
self.upflow_df.sort_values(by='file_name',inplace=True)
if len(self.upflow_df) == 0:
self.upflow_logic = False
else:
self.upflow_logic = True
def _load_files(self):
drop_cols = ['year','month','day','hour']
#Read pars file
self.pars=pd.read_csv(self.pars_path)
self.pars = self.pars.sort_values(['name', 'zone'])
#Make par file edits for CHANLOSS
self.pars = self._cl_parfile_edits(self.pars)
#Read daily flow file
self.daily_flow = pd.read_csv(self.daily_flow_path)
self.daily_flow.index = pd.to_datetime(self.daily_flow[['year', 'month', 'day']])
#Added errors='ignore' to ignore missing hour column
self.daily_flow.drop(drop_cols,axis=1,inplace=True,errors='ignore')
self.daily_flow =self.daily_flow.resample('6h').ffill()
#Read forcing file data as DataFrame and reconstruct as a dictionary with each forcing as a key
forcings_import = self._read_tsfiles(self.forcings_por_df, drop_cols)
self.forcings_raw = {'map': pd.DataFrame(index=forcings_import[0].index),
'mat': pd.DataFrame(index=forcings_import[0].index),
'ptps': pd.DataFrame(index=forcings_import[0].index)}
#Create lists for each type of forcing
map_list, mat_list, ptps_list = [], [], []
for i in range(len(forcings_import)):
map_list.append(forcings_import[i].loc[:,'map_mm'].rename(f'zone_{str(i)}'))
mat_list.append(forcings_import[i].loc[:,'mat_degc'].rename(f'zone_{str(i)}'))
ptps_list.append(forcings_import[i].loc[:,'ptps'].rename(f'zone_{str(i)}'))
#Concatenate forcing list into a dictionary
self.forcings_raw = {
'map': pd.concat(map_list, axis=1),
'mat': pd.concat(mat_list, axis=1),
'ptps': pd.concat(ptps_list, axis=1)
}
#If exists, read in instantaneous flow file
if self.inst_flow_logic:
self.inst_flow = pd.read_csv(self.inst_flow_path)
self.inst_flow.index = pd.to_datetime(self.inst_flow[['year', 'month', 'day', 'hour']])
self.inst_flow.drop(drop_cols,axis=1,inplace=True)
else:
self.inst_flow = None
#If exists, read in upflow flow files and reconstruct as a single DataFrame
if self.upflow_logic:
upflow_import = self._read_tsfiles(self.upflow_df, drop_cols)
self.upflow = pd.DataFrame(index=upflow_import[0].index)
for i in range(len(upflow_import)):
self.upflow = pd.concat([self.upflow,upflow_import[i].flow_cfs.rename(f'upflow_{i}')],axis=1)
else:
self.upflow = None
def _interrogate_pars(self):
'''
Using the parfile, gathers number of zones, zone names, number of upstream points, upstream point names, if CONSUSE is used, and/or if CHANLOSS is used
'''
#Get zone names and number , if present
self.zone_names = tuple(self.pars.loc[(self.pars.zone.str.contains('-'))].zone.sort_values().unique())
self.n_zones = len(self.zone_names)
if self.n_zones > 0:
self.localflow_logic = True
else:
self.localflow_logic = False
#Catalog routing, if present
if self.upflow_logic:
self.upflow_names = tuple(self.pars.loc[self.pars.type=='lagk'].zone.unique())
self.n_upflow = len(self.upflow_names)
else:
self.upflow_names = None
self.n_upflow = 0
#Catalog CONSUSE, if present
if self.pars.zone.str.contains('-CU').any():
self.consuse_logic = True
self.consuse_names = tuple(self.pars.loc[self.pars.type=='consuse'].zone.unique())
self.n_consuse = len(self.consuse_names)
else:
self.consuse_logic = False
self.consuse_names = None
self.n_consuse=0
#Catalog CHANLOSS info if present
if self.pars.zone.str.contains('_CL').any():
self.chanloss_logic = True
else:
self.chanloss_logic = False
def _cl_parfile_edits(self, pars:pd.DataFrame):
'''
Makes changes to ``pars`` DataFrame in regards to CHANLOSS parameter row's
naming convention and parameters provided.
Args:
pars (pd.DataFrame): A DataFrame containing model parameters
Returns:
pars_edit (pd.Dataframe): A Dataframe with edits to naming convention of CHANLOSS parameters
'''
pars_edits = pars.copy()
#Rename Zones for each CL Module. But do not rename cl_type, n_clmods,min_q
cl_exclude_logic=(pars_edits.type=='chanloss')&(~pars_edits.name.isin(['cl_type','n_clmods','cl_min_q']))
pars_edits.loc[cl_exclude_logic,['zone']]=pars_edits.loc[cl_exclude_logic].zone + \
'_CL'+ pars_edits.loc[cl_exclude_logic].name.str.split('_').str[-1]
#n_clmods pars row now unnecessary
pars_edits.drop(pars_edits.loc[pars_edits.name=='n_clmods'].index,inplace=True)
#Remove the CL module name from the name columns, except for cl_min_q
cl_remove_logic = (pars_edits.type=='chanloss')&(pars_edits.name!='cl_min_q')
pars_edits.loc[cl_remove_logic ,['name']]= \
pars_edits.loc[cl_remove_logic].name.str.split('_').str[:-1].str.join('_')
#Correct the cl_type name
pars_edits.loc[pars_edits.p_name.str.contains('cl_type'),['name']]='cl_type'
return pars_edits
@staticmethod
def _create_dir_df(path:str):
'''
Scans the directory structure to inventory CSV files.
Args:
path (str): Path to directory with a basin nwrfc autocalibration run(s).
'''
autocalb_files=pd.DataFrame(columns=['path','file_name'])
n=1
for root, dirs, files in os.walk(path):
for file in files:
if '.csv' in file:
autocalb_files=pd.concat([autocalb_files,
pd.DataFrame({'path':root,
'file_name':file},index=[n])],axis=0)
n=n+1
autocalb_files['folder']=autocalb_files.path.apply(os.path.basename)
return autocalb_files
@staticmethod
def _read_tsfiles(path_ts:str, drop_cols: list[str]):
'''
Reads in csv timeseries files produced via the nwrfc autocalibration run(s) as a
pandas DataFrame.
Args:
path_ts (str): Path to csv timeseries file. File is expected to have ['year', 'month', 'day', 'hour'] columns.
drop_cols (list[str]): List of string with columns to drop from dataframe
'''
list_df = []
for index, row in path_ts.iterrows():
ts_df = pd.read_csv(os.path.join(row.path,row.file_name))
ts_df.index=pd.to_datetime(ts_df[['year', 'month', 'day', 'hour']])
ts_df.drop(drop_cols,axis=1,inplace=True)
list_df.append(ts_df)
return list_df
[docs]class NwsrfsRun(_NwrfcAcPrep,
nwsrfs.FA,
nwsrfs.SacSnow,
nwsrfs.GammaUh,
nwsrfs.Lagk,
nwsrfs.Chanloss):
'''
Orchestrates the execution of NWSRFS models (SAC-SMA, Snow17, lagk, etc.) using NWRFC autocalibration data.
Files and folders in ``autocalb_dir`` must follow file conventions of the
`NOAA-NWRFC/nwsrfs-hydro-autocalibration repository <https://github.com/NOAA-NWRFC/nwsrfs-hydro-autocalibration>`_ optimization tools.
Sample calibration data is available on `Zenodo <https://doi.org/10.5281/zenodo.18829935>`_.
If no ``run_dir`` is provided, the first ``'results_*'`` directory found within the ``autocalb_dir`` path will be used.
Arguments:
autocalb_dir (str): Path to a NWRFC autocalibration run directory
run_dir (str | None): Name of optimization run subdirectory within the ``autocal_dir``. If ``'None'`` is provided, defaults to using first ``'results_*'`` directory found within the ``autocalb_dir``.
forcing_adj (bool | list[str]): If ``True`` monthly climatological forcing adjustments will be applied to all forcings. Alternatively, a list with
with specific forcing to apply climatological forcing adjustments can be supplied: ``'map'``, ``'mat'``, ``'ptps'``, ``'pet'``. Default: ``True``.
return_inst (bool): Specifies to return instantaneous streamflow, rather than period average. Default: ``True``.
shift_sf (bool): Shifts :class:`~nwsrfs_py.nwsrfs.GammaUh` derived streamflow forward on one timestep. Requirement for NWRFC calibrations. Default: ``True``.
Attributes:
pars (pd.DataFrame): Contains all parameters used with NWSRFS models
forcings_raw (dict[str, pd.DataFrame]): A dictionary with forcings prior to applying :class:`~nwsrfs_py.nwsrfs.FA` adjustments
The dictionary keys are:
* **map**: Precipitation (units: mm)
* **mat**: Air temperature (units: degc)
* **ptps**: Fraction of precipitation as snow (units: fraction 0-1)
zone_names (tuple): Names of zones modeled in basin.
cu_names (tuple): Names of consuse zones modeled in basin.
daily_flow (pd.DataFrame): Daily mean observed flow.
inst_flow (pd.DataFrame|None): Instantaneous observed flow.
upflow (pd.DataFrame|None): Upstream flow derived from the :class:`adjustq.AdjustQ`.
fa_pars (dict[str, nwsrfs.FAPars]|None): If ``localflow_logic`` is ``True``, than dictionary of :class:`~nwsrfs_py.nwsrfs.FAPars` classes representing ``'map'``, ``'mat'``, ``'ptps'``, and ``'pet'``, otherwise ``'None'``.
sacsnow_pars (nwsrfs.SacSnowPars|None): If ``localflow_logic`` is ``True``, than :class:`~nwsrfs_py.nwsrfs.SacSnowPars` class, otherwise ``'None'``.
gammauh_pars (nwsrfs.GammaUhPars|None): If ``localflow_logic`` is ``True``, than :class:`~nwsrfs_py.nwsrfs.GammmaUhPars` class, otherwise ``'None'``.
lagk_pars (nwsrfs.LagkPars|None): If ``upflow_logic`` is ``True``, than :class:`~nwsrfs_py.nwsrfs.LagkPars` class, otherwise ``'None'``.
chanloss_pars (nwsrfs.ChanlossPars|None): If ``chanloss_logic`` is ``True``, than :class:`~nwsrfs_py.nwsrfs.ChanlossPars` class, otherwise ``'None'``.
consuse_pars (dict[str, nwsrfs.ConsusePars]|None): If ``consuse_logic`` is ``True``, than dictionary of :class:`~nwsrfs_py.nwsrfs.ConsusePars` classes representing each ``cu_names``, otherwise ``'None'``.
localflow_logic (bool): Logic if :class:`~nwsrfs_py.nwsrfs.FAPars`, :class:`~nwsrfs_py.nwsrfs.SacSnow`, and :class:`~nwsrfs_py.nwsrfs.GammaUh` models exist in configuration.
upflow_logic (bool): Logic if :class:`~nwsrfs_py.nwsrfs.Lagk` model exist in configuration.
chanloss_logic (bool): Logic if :class:`~nwsrfs_py.nwsrfs.Chanloss` model exist in configuration.
consuse_logic (bool): Logic if :class:`~nwsrfs_py.nwsrfs.Consuse` model exist in configuration.
n_zones (integer): Number of zones in the configuration.
'''
def __init__(self,
autocalb_dir: str,
run_dir: str | None = None,
forcing_adj: bool | list[str] = True,
return_inst: bool = True,
shift_sf: bool = True):
#Initiate nwsrfs_prep
_NwrfcAcPrep.__init__(self, autocalb_dir, run_dir)
#Set return_inst attribute
self.return_inst = return_inst
#Set shift_sf attribute
self.shift_sf = shift_sf
#Validate forcing_adj argument
self._interrogate_fa_arg(forcing_adj)
#Create instances of all relevant NWSRFS models
self.nwsrfs_run()
def _interrogate_fa_arg(self,forcing_adj):
'''
Add appropriate attributes regarding climatological forcing adjustments as specified by the forcing_adj argument and if
SAC-SMA, Snow17, UH are being utilized.
Args:
forcing_adj (bool | list[str]): If ``True`` monthly climatological forcing adjustments will be applied to all forcings. Alternatively, a list with
with specific forcing to apply climatological forcing adjustments can be supplied: 'map','mat', 'ptps','pet'
'''
#Validate forcing_adj argument
if not self.localflow_logic:
self.forcing_adj_logic = None
self.forcing_adj_types = ()
elif isinstance(forcing_adj,bool):
if forcing_adj:
self.forcing_adj_logic = True
self.forcing_adj_types = ('map','mat', 'ptps','pet')
else:
self.forcing_adj_logic = None
self.forcing_adj_types = ()
else:
self.forcing_adj_logic = True
#Just in case, set string entries to lower case and remove duplicates entries
forcing_adj = [s.lower() for s in forcing_adj]
forcing_adj = list(set(forcing_adj))
#Check if forcing_adj string inputs match expected forcing types
forcing_types = {'map','mat','ptps','pet'}
validate_types = set(forcing_adj).issubset(forcing_types)
#validate_types is true than assign forcing_adj_types else return error
if validate_types:
self.forcing_adj_types = tuple(forcing_types.intersection(forcing_adj))
else:
msg = f'One or more string forcing_adj argument string not understood, expecting: {", ".join(forcing_types)}'
raise ValueError(msg)
[docs] def nwsrfs_run(self):
'''
Prepares and creates instances of all nwsrfs models with parameters provided in the ``pars`` attribute.
'''
#Set all dataclass attributes to None
self.fa_pars = self.sacsnow_pars = self.gammauh_pars = self.lagk_pars = self.chanloss_pars = self.consuse_pars = None
#If there is a sac-snow model, perform forcing adjustments and activate SAC-SMA, SNOW17, UH Gamma models
if not self.localflow_logic:
self.fa_factors = self.forcings_climo = None
else:
self._fa_run()
self._sacsnow_uh_run()
#If there is a lagk model, activate
if self.upflow_logic:
self._lagk_run()
#If there is a chanloss model, activate
if self.chanloss_logic:
self._chanloss_run()
#If there is a consuse model, activate
if self.consuse_logic:
self._consuse_run()
[docs] def update_pars(self, new_pars: pd.DataFrame):
'''
Updates the ``pars`` file with new values.
**DataFrame Format**
``new_pars`` must have a ``'p_name'`` and ``'value'`` column. All values in the ``'p_name'`` columns must map to an row in the exiting ``pars`` DataFrame attribute.
Args:
new_pars (pd.DataFrame): dataframe with parameters to update nwsrfs models
'''
new_pars = new_pars.sort_values('p_name')
#Make par file edits for CHANLOSS
new_pars = self._cl_parfile_edits(new_pars)
#If the p_name column can't be mapped to p_name values in the existing ``pars`` attribute, then throw a error
if not set(new_pars.p_name).issubset(self.pars.p_name):
msg = 'All values in the p_value column of "new_pars" argument must exist in the ``pars`` attribute.'
raise ValueError(msg)
#Get old parameter dataframe and rename values column
old_pars = self.pars.rename({'value':'old_value'},axis=1)
#Merge old parameter file with new parameter file
pars = old_pars.merge(new_pars.loc[:,['p_name','value']],on='p_name',how='left')
#For row with which no new value was provided use the old value
pars['value'] = pars.value.where(~pars.value.isna(),pars.old_value)
#Reassign parameter attribute
self.pars = pars.drop('old_value',axis=1)
#Rerun all the NWRFS models with new pars file
self.nwsrfs_run()
def _fa_run(self,validate:bool = True):
'''
Apply monthly climatological forcing adjustments to forcing specified by the ``forcing_adj_types`` attribute
Args:
validate (bool): Validate that map, mat, ptps, and pet inputs are correct format/type. Default: True
'''
#If SAC-SMA and SNOW17 parameter don't exist return none
if not self.localflow_logic:
return None
#If forcing_adj_types is None or then set all fa_pars values to scale=1, p_redist=0, std=10, shift=0
nofa_pars = np.array([1,0,10,0])
#fa adjustment parameters: scale, p_redist, std, shift
df_2_dict = dict(zip(self.pars.loc[self.pars.type == 'fa'].name,self.pars.loc[self.pars.type == 'fa'].value))
fa_pars = {}
for f in ['map','mat','ptps','pet']:
#Turn off fa adjustments for all forcings not within forcing_adj_types attribute
if f not in self.forcing_adj_types:
fa_pars[f] = nofa_pars
#Otherwise use forcing adjustment parameters provided in pars
else:
fa_pars[f] = np.array([df_2_dict[f'{f}_scale'], df_2_dict[f'{f}_p_redist'],
df_2_dict[f'{f}_std'], df_2_dict[f'{f}_shift']])
# monthly forcing adjustments
fa_limits = {key: np.full([12, 2], np.nan) for key in ['map','mat','ptps','pet']}
peadj_m = np.full([12, self.n_zones], np.nan)
peadj = {key:None for key in ['map','mat','ptps']}
for i in range(12):
m = i + 1
for f in ['map','mat','ptps','pet']:
fa_limits[f][i, 0] = self.pars[(self.pars.name == f'{f}_lower_{m:02}')&(self.pars.zone==self.zone_names[0])].value.item()
fa_limits[f][i, 1] = self.pars[(self.pars.name == f'{f}_upper_{m:02}')&(self.pars.zone==self.zone_names[0])].value.item()
for j, z in zip(range(self.n_zones), self.zone_names):
#import pdb; pdb.set_trace()
peadj_m[i, j] = self.pars.loc[(self.pars.name == f'peadj_{m:02}') & (self.pars.zone == z) &
(self.pars.type == 'sac')].value.item()
peadj['pet'] = peadj_m
#Make dummy climo input
climo = None
# Create forcing arrays
forcings_raw = {}
for f in ['map','mat','ptps']:
forcings_raw[f] = self.forcings_raw[f].to_numpy()
forcings_raw['pet'] = None
#Get alat and area parameters
alat=self.pars.loc[self.pars.name.str.contains('alat')].value.to_numpy()
area=self.pars.loc[self.pars.name.str.contains('zone_area')].value.to_numpy()
#Create a dictionary for each forcings dataclass
fa_dc = {}
for f in ['map','mat','ptps','pet']:
fa_dc[f] = nwsrfs.FAPars(year = self.year,month =self.month,day = self.day,hour = self.hour,
pars = fa_pars[f],
area = area,
alat = alat,
limits = fa_limits[f],
forcings = forcings_raw[f],
peadj_m = peadj[f],
climo = climo)
#Initiate fa wrapper class
nwsrfs.FA.__init__(self,
map_dataclass = fa_dc['map'],
mat_dataclass = fa_dc['mat'],
ptps_dataclass = fa_dc['ptps'],
pet_dataclass = fa_dc['pet'],
validate = validate)
@property
def forcings(self) -> dict[str, pd.DataFrame]:
'''
Generates a dictionary of :class:`~nwsrfs_py.nwsrfs.FA` adjusted forcings as DataFrames.
The dictionary keys are:
* **map**: Precipitation (units: mm)
* **mat**: Air temperature (units: degc)
* **ptps**: Fraction of precipitation as snow (units: fraction 0-1)
* **pet**: Potential evaporation (units: mm)
* **etd**: Evaporation demand (units: mm)
'''
#If SAC-SMA and SNOW17 parameter don't exist return none
if not self.localflow_logic:
return None
#Get forcings from fa class
#Use .fget (Function Get) to grab the actual function inside because forcings is a property object
raw_output = nwsrfs.FA.forcings.fget(self)
#Remove "_fa" from dictionary keys and rename columns based on zone_names
fixed_output = {key.split('_')[0]:value.set_axis(self.zone_names, axis=1) for key, value in raw_output.items()}
return fixed_output
def _sacsnow_uh_run(self, validate:bool=True):
'''
Run SAC-SMA, SNOW17, and gamma UH with the parameters specified in ``pars``.
Args:
validate (bool): Validate that SAC-SMA, Snow17, and gamma UH inputs are correct format/type. Default: True
'''
#If SAC-SMA and SNOW17 parameter don't exist return none
if not self.localflow_logic:
return None
#Get a nested dictionary for SAC-SMA, Snow17, and gamma UH with parameter values
pars_dict = {}
for par_type in ['sac','snow','uh']:
pars_dict[par_type]= {}
for par in self.pars.loc[self.pars.type==par_type].name.unique():
pars_dict[par_type][par] = self.pars.loc[(self.pars.type==par_type)&
(self.pars['name'] == par)].sort_values(by='zone')['value'].to_numpy()
#Apply toc adjustment to toc parameter
toc = pars_dict['uh']['unit_toc'] * pars_dict['uh']['unit_toc_adj']
#Initate gammauh_pars data class
gamma_dc = nwsrfs.GammaUhPars(dt_hours = self.dt_hours, area = pars_dict['uh']['zone_area'],
shape = pars_dict['uh']['unit_shape'], toc = toc)
#Initiate GammaUh wrapper class
nwsrfs.GammaUh.__init__(self,pars_dataclass = gamma_dc,validate = validate)
#Format required dataclass inputs sac_pars and snow_pars
sac_pars = np.concatenate([[pars_dict['sac']['uztwm']], [pars_dict['sac']['uzfwm']], [pars_dict['sac']['lztwm']],
[pars_dict['sac']['lzfpm']],[pars_dict['sac']['lzfsm']], [pars_dict['sac']['adimp']],
[pars_dict['sac']['uzk']],[pars_dict['sac']['lzpk']], [pars_dict['sac']['lzsk']],
[pars_dict['sac']['zperc']],[pars_dict['sac']['rexp']], [pars_dict['sac']['pctim']],
[pars_dict['sac']['pfree']],[pars_dict['sac']['riva']], [pars_dict['sac']['side']],
[pars_dict['sac']['rserv']],[pars_dict['sac']['efc']]
],axis=0)
snow_pars = np.concatenate([[pars_dict['snow']['scf']], [pars_dict['snow']['mfmax']], [pars_dict['snow']['mfmin']],
[pars_dict['snow']['uadj']],[pars_dict['snow']['si']], [pars_dict['snow']['nmf']],
[pars_dict['snow']['tipm']],[pars_dict['snow']['mbase']], [pars_dict['snow']['plwhc']],
[pars_dict['snow']['daygm']],[pars_dict['snow']['adc_a']], [pars_dict['snow']['adc_b']],
[pars_dict['snow']['adc_c']]
],axis=0)
#Initate SacSnowPars data class
sacsnow_dc = nwsrfs.SacSnowPars(year = self.year,month =self.month,day = self.day,hour = self.hour,
alat = pars_dict['snow']['alat'], elev = pars_dict['snow']['elev'],
sac_pars = sac_pars, snow_pars = snow_pars,
init_swe = pars_dict['snow']['init_swe'],
pxadj = pars_dict['sac']['pxadj'], peadj = pars_dict['sac']['peadj'],
forcings_map = self.forcings['map'].to_numpy(),
forcings_mat = self.forcings['mat'].to_numpy(),
forcings_ptps = self.forcings['ptps'].to_numpy(),
forcings_etd = self.forcings['etd'].to_numpy())
#Initiate sacsnow wrapper class
nwsrfs.SacSnow.__init__(self,
pars_dataclass = sacsnow_dc,
validate = validate)
@property
def sacsnow_tci(self) -> 'SACSnowTCI':
'''
Generates a custom type alias for total channel inflow (tci) using :class:`~nwsrfs_py.nwsrfs.SacSnow` with a column for each zone (units - mm).
'''
#If SAC-SMA and SNOW17 parameters don't exist return none
if not self.localflow_logic:
return None
#Get tci from sacsnow class
#Use .fget (Function Get) to grab the actual function inside because forcings is a property object
raw_output = nwsrfs.SacSnow.sacsnow_tci.fget(self)
#Rename column of Dataframe to correpond to zone names
return raw_output.set_axis(self.zone_names, axis=1)
@property
def sacsnow_states(self) -> dict[str, pd.DataFrame]:
'''
Returns a dictionary of DataFrames containing all :class:`~nwsrfs_py.nwsrfs.SacSnow` model states with a column for each zone.
The dictionary keys are:
* **tci**: Total channel inflow (units: mm).
* **map_pxadj**: Precipitation after pxadj applied (units: mm).
* **etd_peadj**: Evaporation demand after peadj, efc, and aesc adjustments applied (units: mm).
* **aet**: Actual evapotranspiration (units: mm).
* **uztwc**: Upper zone tension water contents (units: mm).
* **uzfwc**: Upper zone free water contents (units: mm).
* **lztwc**: Lower zone tension water contents (units: mm).
* **lzfsc**: Lower zone free supplemental water contents (units: mm).
* **lzfpc**: Lower zone free primary water contents (units: mm).
* **adimc**: Additional impervious area water contents (units: mm).
* **roimp**: Impervious runoff prior to riparian vegetation adjustment (units: mm).
* **sdro**: Direct runoff prior to riparian vegetation adjustment (units: mm).
* **ssur**: Surface runoff prior to riparian vegetation adjustment (units: mm).
* **sif**: Interflow prior to riparian vegetation adjustment (units: mm).
* **bfs**: Baseflow supplemental runoff prior to riparian vegetation adjustment (units: mm).
* **bfp**: Baseflow primary runoff prior to riparian vegetation adjustment (units: mm).
* **swe**: Snow water equivalent (units: mm).
* **aesc**: Areal extent of snow cover (units: fraction 0-1).
* **neghs**: Snowpack heat deficit (units: mm).
* **liqw**: Liquid water held by snow against gravity drainage (units: mm).
* **raim**: Total rain plus snowmelt (units: mm).
* **psfall**: Precipitation falling as snow after scf adjustment has been applied (units: mm).
* **prain**: Precipitation falling as rain (units: mm).
'''
#If SAC-SMA and SNOW17 parameters don't exist return none
if not self.localflow_logic:
return None
#Get states from sacsnow class
#Use .fget (Function Get) to grab the actual function inside because forcings is a property object
raw_output = nwsrfs.SacSnow.sacsnow_states.fget(self)
#For each dictionary value, rename column of Dataframe to correpond to zone names
return {key: df.set_axis(self.zone_names, axis=1) for key, df in raw_output.items()}
[docs] def return_uh(self,tstep):
'''
Generates a unit hydrograph as a DataFrame, using :class:`~nwsrfs_py.nwsrfs.GammaUh`, at a timestep specified by ``tstep``.
Args:
tstep (int): Specifies timestep of unit hydrograph (units: hours).
Returns:
pd.DataFrame: A DataFrame containing unit hydrograph (uh) with a column for each zone (units: cfs/in).
'''
#If SAC-SMA and SNOW17 parameter don't exist return none
if not self.localflow_logic:
return None
raw_output = nwsrfs.GammaUh.return_uh(self,tstep)
#Rename column of Dataframe to correpond to zone names
return raw_output.set_axis(self.zone_names, axis=1)
@property
def uh(self) -> pd.DataFrame:
'''
Generates a unit hydrograph as a DataFrame, using :class:`~nwsrfs_py.nwsrfs.GammaUh`, at a timestep specified by the ``dt_hours`` attribute (units - cfs/in).
'''
#If SAC-SMA and SNOW17 parameters don't exist return none
if not self.localflow_logic:
return None
#Get uh from gamma_uh class
#Use .fget (Function Get) to grab the actual function inside because forcings is a property object
raw_output = nwsrfs.GammaUh.uh.fget(self)
#Rename column of Dataframe to correpond to zone names
return raw_output.set_axis(self.zone_names, axis=1)
[docs] def return_sf(self,tci:SACSnowTCI):
'''
Generates a timeseries of streamflow as a Series using :class:`~nwsrfs_py.nwsrfs.GammaUh` and input ``tci``.
Args:
tci (SACSnowTCI): Custom type alias for total channel inflow from :class:`~nwsrfs_py.nwsrfs.SacSnow` (units: mm).
Returns:
pd.DataFrame: A DataFrame containing streamflow with a column for each zone (units: cfs).
'''
#If SAC-SMA and SNOW17 parameter don't exist return none
if not self.localflow_logic:
return None
raw_output = nwsrfs.GammaUh.return_sf(self,tci,self.return_inst)
#Rename column of Dataframe to correpond to zone names
return raw_output.set_axis(self.zone_names, axis=1)
@property
def sacsnow_sf(self)-> pd.DataFrame:
'''
Calculates streamflow for each zone using :class:`~nwsrfs_py.nwsrfs.SacSnow` and :class:`~nwsrfs_py.nwsrfs.GammaUh` models (units - cfs).
'''
#If SAC-SMA and SNOW17 parameters don't exist return none
if not self.localflow_logic:
return None
#Get tci from sacsnow class
tci_output = self.sacsnow_tci
#Get streamflow from return_sf gamma_uh class function
sf_output = self.return_sf(tci=tci_output)
#For NWRFC Calibrations, UH output needs to be shifted forward one timestep because of how forcings are treated in AutoCalb
#Repeat the first flow data point and append to the beginning of the ts, so there is no loss in a timestep
if self.shift_sf:
shift = sf_output.shift(1)
sf_output = shift.combine_first(sf_output)
return sf_output
def _lagk_run(self,validate:bool=True):
'''
Runs Lagk with the parameters specified in ``pars``.
Args:
validate (bool): Validate that LagK inputs are correct format/type. Default: True
'''
#If lagk parameters don't exist return none
if not self.upflow_logic:
return None
#Get a dictionary of lagk parameter values
pars_dict = {}
for par in self.pars.loc[self.pars.type=='lagk'].name.unique():
pars_dict[par] = self.pars.loc[(self.pars.type=='lagk')&
(self.pars['name'] == par)].sort_values(by='zone')['value'].to_numpy()
lagk_dc = nwsrfs.LagkPars(year = self.year,month =self.month,day = self.day,hour = self.hour,
tbl_lageq_a = pars_dict['lagtbl_a'], tbl_lageq_b = pars_dict['lagtbl_b'],
tbl_lageq_c = pars_dict['lagtbl_c'], tbl_lageq_d = pars_dict['lagtbl_d'],
tbl_keq_a = pars_dict['ktbl_a'], tbl_keq_b = pars_dict['ktbl_b'],
tbl_keq_c = pars_dict['ktbl_c'], tbl_keq_d = pars_dict['ktbl_d'],
tbl_lagmax = pars_dict['lagk_lagmax'], tbl_lagmin = pars_dict['lagk_lagmin'],
tbl_kmax = pars_dict['lagk_kmax'], tbl_kmin = pars_dict['lagk_kmin'],
tbl_qmax = pars_dict['lagk_qmax'], tbl_qmin = pars_dict['lagk_qmin'],
init_co = pars_dict['init_co'], init_stor = pars_dict['init_stor'],
init_qin = pars_dict['init_if'], init_qout = pars_dict['init_of'],
qin = self.upflow.to_numpy())
nwsrfs.Lagk.__init__(self,
pars_dataclass = lagk_dc,
validate = validate)
@property
def lagk_route(self) -> pd.DataFrame:
'''
Generates routed flows as a DataFrame using :class:`~nwsrfs_py.nwsrfs.Lagk` with a column for each upstream reach (units - cfs).
'''
#If lagk parameters don't exist return none
if not self.upflow_logic:
return None
#Get routings from lagk class
#Use .fget (Function Get) to grab the actual function inside because forcings is a property object
raw_output = nwsrfs.Lagk.lagk_route.fget(self)
#Rename column of Dataframe to correpond to upstream reach name
return raw_output.set_axis(self.upflow_names, axis=1)
@property
def lagk_states(self) -> dict[str, pd.DataFrame]:
'''
Generates a dictionary of DataFrames containing all :class:`~nwsrfs_py.nwsrfs.Lagk` model states with a column for each upstream reach.
The dictionary keys are:
* **routed**: Routed flow with lag and k applied (units: cfs).
* **lag_time**: Lag applied to upstream flow (units: hours).
* **k_inflow**: upstreamflow with only lag applied (units: cfs).
* **k_storage**: Attenuation storage (units: cfs).
'''
#If lagk parameter don't exist return none
if not self.upflow_logic:
return None
#Get states from lagk class
#Use .fget (Function Get) to grab the actual function inside because forcings is a property object
raw_output = nwsrfs.Lagk.lagk_states.fget(self)
#For each dictionary value, rename column of Dataframe to correpond to zone names
return {key: df.set_axis(self.upflow_names, axis=1) for key, df in raw_output.items()}
@property
def _sacsnow_lagk_sim(self) -> pd.Series:
'''
Returns a Series of the sum of any sacsnow run off and upstream routed flow for each timestep (units: cfs)
'''
#Create blank simulation series
qin=pd.Series(0,index=self.dates)
#If there are sac/snow zone, calculate runoff
if self.localflow_logic:
qin += self.sacsnow_sf.sum(axis=1)
#If there are upstream reaches to route, add them to the total flow
if self.upflow_logic:
qin += self.lagk_route.sum(axis=1)
return qin.rename('sqin')
def _chanloss_run(self, validate:bool=True):
'''
Runs chanloss with the parameters specified in ``pars``.
Args:
validate (bool): Validate that chanloss inputs are correct format/type. Default: True
'''
#If chanloss parameters don't exist return none
if not self.chanloss_logic:
return None
#Get a dictionary of chanloss parameter values
pars_dict = {}
for par in self.pars.loc[self.pars.type=='chanloss'].name.unique():
pars_dict[par] = self.pars.loc[(self.pars.type=='chanloss')&
(self.pars['name'] == par)].sort_values(by='zone')['value'].to_numpy()
periods = np.column_stack((pars_dict['cl_period_start'],pars_dict['cl_period_end']))
qin = self._sacsnow_lagk_sim
chanloss_dc = nwsrfs.ChanlossPars(year = self.year,month =self.month,day = self.day,hour = self.hour,
periods = periods, factors = pars_dict['cl_factor'],
cl_type = pars_dict['cl_type'].item(), min_flow = pars_dict['cl_min_q'].item(),
qin = qin)
nwsrfs.Chanloss.__init__(self,
pars_dataclass = chanloss_dc,
validate = validate)
[docs] def chanloss_apply(self, qin:pd.Series, validate:bool=True):
'''
Runs :class:`~nwsrfs_py.nwsrfs.Chanloss` with ``qin`` supplied (units: cfs).
Args:
qin (pd.Series): Input streamflow to adjust (units: cfs).
validate (bool): Validate that chanloss inputs are correct format/type. Default: ``True``
'''
#If chanloss parameters don't exist return none
if not self.chanloss_logic:
return None
#create a copy of the existing pars dataclass and replace qin with the supplied qin argument
dc_pars = copy.deepcopy(self.chanloss_pars)
dc_pars.qin = qin.to_numpy()
#Initiate the chanloss class
cl_class = nwsrfs.Chanloss(pars_dataclass = dc_pars,validate=validate)
#Return adjustments
return cl_class.chanloss_qadj
@property
def _sacsnow_lagk_chanloss_sim(self) -> pd.Series:
'''
Returns a Series of the sum of any sacsnow run off and upstream routed flow for each timestep with chanloss applied (units: cfs)
'''
if self.chanloss_logic:
return self.chanloss_qadj.rename('sqin')
else:
return self._sacsnow_lagk_sim
def _consuse_run(self, validate:bool=True):
'''
Runs CONSUSE with the parameters specified in ``pars``.
Args:
validate (bool): Validate that CONSUSE inputs are correct format/type. Default: True
'''
#If consuse parameters don't exist return none
if not self.consuse_logic:
return None
#Create dictionary of ConsusePars dataclass
self.consuse_pars = dict()
#Get a dictionary of chanloss parameter values
pars_dict = dict()
for par in self.pars.loc[self.pars.type=='consuse'].name.unique():
pars_dict[par] = self.pars.loc[(self.pars.type=='consuse')&
(self.pars['name'] == par)].sort_values(by='zone')['value'].to_numpy()
#format peadj for consuse calculation
peadj_cu = np.full([12, self.n_consuse], np.nan)
for i in range(12):
m = i + 1
for j, z in enumerate(self.consuse_names):
peadj_cu[i, j] = self.pars.loc[(self.pars.name == 'peadj_cu_' + f'{m:02}') & (self.pars.zone == z) &
(self.pars.type == 'consuse')].value.item()
#Get natural flow
qnat = self._sacsnow_lagk_chanloss_sim
#Convert natural streamflow to daily average
qnat_peravg = self._inst_to_ave(qnat.to_numpy())
qnat_peravg = pd.Series(qnat_peravg,index=self.dates)
qnat_daily = qnat_peravg.resample('1D').mean()
#Get Daily PET
#NOTE: To match CHPS results have to be shifted back 1 hr so 00:00 timestep
# is included in previous day
pet_shift = self.forcings['pet'].shift(periods=-1, freq='h')
pet_shift.loc[self.dates[-1],:] = pet_shift.iloc[-1,:].values
pet_daily = pet_shift.resample('1D').sum()
#Create a blank state dataframe
state_param = ['qadj','qdiv','qrf_in','qrf_out','qol','qcd','ce','rfstor']
self._consuse_states = {key:pd.DataFrame() for key in state_param}
#Run consuse for each zone individually
for n, cu_name in enumerate(self.consuse_names):
#Get peadj associated with cu zones
peadj = self.pars.loc[(self.pars.name=='peadj')&(self.pars.zone==cu_name),'value'].squeeze()
#Concat the two timeseries and remove any na values
ts_df = pd.concat([pet_daily[cu_name],qnat_daily],axis=1)
ts_df = ts_df[~ts_df.isna().any(axis=1)]
#Get time arrays
dates = ts_df.index
year = dates.year.to_numpy()
month = dates.month.to_numpy()
day = dates.day.to_numpy()
#Get timeseries arrays
ts_array = ts_df.to_numpy()
#Create parameter dataclass
self.consuse_pars[cu_name] = nwsrfs.ConsusePars(year=year,month=month,day=day,
area=pars_dict['area_km2'][n], irr_eff=pars_dict['irr_eff'][n], min_flow=pars_dict['min_flow'][n],
rf_accum_rate=pars_dict['rf_accum_rate'][n],rf_decay_rate=pars_dict['rf_decay_rate'][n],
pet_adj=peadj, peadj_m=peadj_cu[:,n],
pet=ts_array[:,0],qin=ts_array[:,1])
cu_run = nwsrfs.Consuse(self.consuse_pars[cu_name], validate=validate)
#Concat state value for CU zone to dictionary. IF QADJ,replace value
for count, param in enumerate(state_param):
if param=='qadj':
self._consuse_states[param]=cu_run.consuse_qadj
else:
self._consuse_states[param]= pd.concat([self._consuse_states[param],
cu_run.consuse_states[param].rename(cu_name)],axis=1)
@property
def consuse_qadj(self) -> pd.Series:
'''
Generates a Series of flow adjusted by :class:`~nwsrfs_py.nwsrfs.Consuse` (units - cfsd).
'''
#If consuse parameters don't exist return none
if not self.consuse_logic:
return None
return self._consuse_states['qadj']
@property
def consuse_states(self) -> dict[str, pd.DataFrame]:
'''
Generates a dictionary of DataFrames containing all :class:`~nwsrfs_py.nwsrfs.Consuse` states with a column for each zone.
The dictionary keys are:
* **qadj**: Streamflow with all consumptive use adjustments applied (units: cfsd)
* **qdiv**: Flow diverted for consumptive use (units: cfsd)
* **qrf_in**: Return flow from irrigation area used as input to return flow storage (units:cfsd)
* **qrf_out**: Return flow to channel from return flow storage (units: cfsd)
* **qol**: Diverted flow other loses [eg transport, subsurface] (units: cfsd)
* **qcd**: Crop flow demand (units: cfsd)
* **ce**: Crop evapotranspiration demand (units: mm)
* **rfstor**: Return flow storage contents (units: mm)
'''
#If consuse parameters don't exist return none
if not self.consuse_logic:
return None
return self._consuse_states
@property
def sim(self) -> pd.Series:
'''
Generates a Series of the sum of any local flow runoff (:class:`~nwsrfs_py.nwsrfs.SacSnow` + :class:`~nwsrfs_py.nwsrfs.GammaUh`) and upstream routed (:class:`~nwsrfs_py.nwsrfs.Lagk`) flow for each timestep
with chanloss (:class:`~nwsrfs_py.nwsrfs.Chanloss`) and consuse (:class:`~nwsrfs_py.nwsrfs.Consuse`) applied (units - cfs).
'''
#Get flow prior to potential CONSUSE adjustment
qnat = self._sacsnow_lagk_chanloss_sim
#If there area CONSUSE areas, adjust the flow
if self.consuse_logic:
qnat_cu_adj = self.consuse_states['qdiv'].sum(axis=1)-self.consuse_states['qrf_out'].sum(axis=1)
#Shift forward a day so that the correct adjustment is applied to the correct day
qnat_cu_adj.index = qnat_cu_adj.index+pd.Timedelta(1, unit='D')
#Backfill to fill all values after 00:00 and fill in nan values at the end of time series with last daily
#qnat_cu_adj value cut off from reindex
qnat_cu_adj = qnat_cu_adj.reindex(qnat.index).bfill().replace({np.nan:qnat_cu_adj.values[-1]})
q_adj = qnat - qnat_cu_adj
#Replace any negative values with zero
q_adj.loc[q_adj < 0] = 0
return q_adj.rename('sqin')
else:
return qnat.rename('sqin')
[docs] @classmethod
def load_example(cls, lid:str='NRKW1', **kwargs):
'''
Generates a :class:`NwsrfsRun` for the package provided example data.
Example data are NWRFC auto calibration results for two locations:
* **NRKW1**: Nooksack River at North Cedarville, WA (USGS-12210700). The following models are being utilized: :class:`~nwsrfs_py.nwsrfs.SacSnow`, :class:`~nwsrfs_py.nwsrfs.GammaUh`, :class:`~nwsrfs_py.nwsrfs.Lagk`.
* **SFLN2**: Salmon Falls Creek NR San Jacinto NV (USGS 13105000). The following models are being utilized: :class:`~nwsrfs_py.nwsrfs.SacSnow`, :class:`~nwsrfs_py.nwsrfs.GammaUh`, :class:`~nwsrfs_py.nwsrfs.Chanloss`, :class:`~nwsrfs_py.nwsrfs.Consuse`.
Any optional arguments associated with :class:`NwsrfsRun`, can be passed.
Args:
lid (str): Five letter identifier of example data. Input can either be ``'NRKW1'`` or ``'SFLN2'``. Default: ``'NRKW1'``.
**kwargs: Optional keyword arguments passed to :class:`NwsrfsRun`.
Common options include:
* **forcing_adj** (bool | list[str]): Apply monthly climatological adjustments. Default: ``True``.
* **return_inst** (bool): Return instantaneous vs. period-average flow. Default: ``True``.
* **shift_sf** (bool): Shift streamflow forward one timestep. Default: ``True``.
'''
lid = lid.upper()
# Define the mapping within the method to keep it clean
config = {
'NRKW1': 'results_por_02',
'SFLN2': 'results_por_01'
}
#Throw an error if the lid argument passed is not recognized.
if lid not in config:
msg = f"Station ID '{lid}' not recognized. Available: {list(config.keys())}"
raise ValueError(msg)
#Return :class:`nwsrfs_py.simulation.Nwsrfs` for the lid specified.
with utils._get_example_dir(lid) as example_dir:
# Note: Ensure __init__ loads all data BEFORE the context manager closes
return cls(example_dir, config[lid], **kwargs)