Source code for nwsrfs_py.adjustq

'''
These class functions replicate the CHPS FEWS AdjustQ tool.

Used as a preprocessing step to create upstream streamflow timeseries for routing reaches where LAG-K is being optimized

AdjustQ adjusts observed mean daily discharges using observed instantaneous and simulated discharges. If both observed 
mean daily and instantaneous data are missing, simulated discharge is used (if available).   
The process combines two CHPS FEWS transformations:

    1. **AdjustQUsingInstantaneousDischarge**: Corrects simulated discharges based on instantaneous observations.
    2. **AdjustQUsingMeanDailyDischarge**: Applies additional corrections if mean daily discharges exceed the error tolerance.

Any resulting negative discharge values are set to zero.
'''

import os, sys, argparse, warnings
import pandas as pd, numpy as np
from .simulation import NwsrfsRun as nwsrfs_sim
from . import utils
#import pdb; pdb.set_trace()

class _AdjustQPrep:

    '''
    Internal-only class to create Pandas Series for observed instantaneous data, observed daily data, 
    and CHPS NWSRFS simulation results. Used within the adjustq class.

    Args:
        daily_flow_path (str): Path to daily average observed streamflow csv provided in a NWRFC autocalb directory [ex: flow_daily_[lid].csv].
        inst_flow_path (str): Path to instantaneous observed streamflow csv provided in a NWRFC autocalb directory [ex: flow_instantaneous_[lid].csv].
        ac_run_path (str | None): Optional path to NWRFC autocalibration results directory [ex: 2zone/[lid]/results_por_02].
    Attributes:
        obs_daily (pd.Series): The loaded daily observation timeseries. (units: cfs)
        obs_inst (pd.Series): The loaded instantaneous observation timeseries. (units: cfs)
        sim (pd.Series): The simulation timeseries (if provided). (units: cfs)
    '''

    def __init__(self,
                daily_flow_path: str,
                inst_flow_path: str,
                ac_run_path: str | None = None):
        
        #import observations csvs
        ob_daily_flow = pd.read_csv(daily_flow_path)
        ob_daily_flow['date'] = pd.to_datetime(ob_daily_flow[['year','month','day']])
        ob_daily_flow.set_index('date',inplace=True)
        self.obs_daily =  ob_daily_flow.flow_cfs
        

        ob_inst_flow = pd.read_csv(inst_flow_path)
        ob_inst_flow['datetime'] = pd.to_datetime(ob_inst_flow[['year','month','day','hour']])
        ob_inst_flow.set_index('datetime',inplace=True)
        self.obs_inst =  ob_inst_flow.flow_cfs

        if ac_run_path is not None:
            ac_path, ac_run = os.path.split(ac_run_path)
            ac_sim = nwsrfs_sim(ac_path,ac_run)
            ac_sim_run = ac_sim.sim
        else:
            ac_sim_run = None
        self.sim = ac_sim_run
        
[docs]class AdjustQ(_AdjustQPrep): ''' Class to perform an equivalent CHPS FEWS AdjustQ calculation, inheriting from :class:`_AdjustQPrep`. Intended to be used within the NWRFC autocalibration folder/data structure. Uses two different procedures to determine if observed instantaneous or simulated data is used to shape daily average observed streamflow: * One to correct for small gaps in observed instantaneous data. * Another method for correcting large gaps in observed instantaneous data. The ``blend`` input variable defines which procedure to use. If both observed mean daily and instantaneous data is missing, simulated discharge is used (if available). Args: daily_flow_path (str): Path to daily average observed streamflow csv provided in a NWRFC autocalb directory. inst_flow_path (str): Path to instantaneous observed streamflow csv provided in a NWRFC autocalb directory. ac_run_path (str | None): Optional path to NWRFC autocalibration results directory. blend (int): Threshold for determining how many time steps of missing observed instantaneous data constitutes a "large gap". Default: 10. interp_type (str): Correction procedure used to correct simulated discharges for missing gaps smaller than the blend threshold. Accepts ``'ratio'`` or ``'difference'``. Default: ``'ratio'``. * **ratio**: Correction factor is based on the ratio between observed and simulated discharges. * **difference**: Correction value is based on the difference between observed and simulated discharges. error_tol (float): Percent tolerance that the instantaneous daily average must match the observed daily flow average. Default: 0.01. max_iterations (int): Maximum number of iterations to adjust the instantaneous daily average to match the observed daily flow. Default: 15 Attributes: obs_daily (pd.Series): Daily observation streamflow timeseries. (units: cfs) obs_inst (pd.Series): Instantaneous observation streamflow timeseries. (units: cfs) sim (pd.Series): Simulation streamflow timeseries (if provided). (units: cfs) ''' def __init__(self, daily_flow_path: str, inst_flow_path: str, ac_run_path: str | None = None, blend: int=10, interp_type: str='ratio', error_tol: float=0.01, max_iterations: int=15): #Get input timeseries super().__init__(daily_flow_path, inst_flow_path, ac_run_path) #Set options if interp_type == 'ratio' or interp_type == 'difference': self.interp_type = interp_type else: msg = "interp_type input must be either 'ratio' or 'difference'. Default is 'ratio'" raise ValueError(msg) self.blend = int(blend) self.error_tol = error_tol self.max_iterations = int(max_iterations) #Set raw_output to None until run function is executed self.__raw_output = None @property def adjustq(self) -> pd.Series: """ This function detects if a simulation timeseries is provided and uses the appropriate AdjustQ calculation a. :meth:`.AdjustQ.adjustq_calc` is used if NWRFC autocalibration simulation (``sim``) is provided. b. Otherwise :meth:`.AdjustQ.inst_mean_q_merge` is used. Returns a pd.Series. """ #Get adjustq if (self.sim is not None) & (self.__raw_output is None): self.adjustq_calc() elif self.__raw_output is None: self.inst_mean_q_merge() return self.__raw_output def _adjustq_inst_smallgaps(self,obs_sim_working,gap_list): """ Steps to fill in missing data where the gap is < blend parameter. This procedure uses observed instantaneous discharges to correct simulated discharges. If observed data exists, it replaces the simulated value. If not, it calculates a corrected value. **Correction Methods:** * **Ratio Option**: Correction factor based on the ratio between observed/simulated at start/end of gap. * **Difference Option**: Correction based on the difference between observed/simulated. **Special Case:** The program overrides the configured method and uses **difference** interpolation if: 1. The ratio between start and end ratios exceeds 2. 2. Either ratio is greater than 5. Args: obs_sim_working (pd.DataFrame): Internal working DataFrame used to build the adjustq timeseries. gap_list (pd.DataFrame): Internal DataFrame used to identify periods with small gaps. Returns: pd.DataFrame: The updated working DataFrame with small gaps filled. """ for index, row in gap_list.iterrows(): #Grab the period with the missing values end=index periods=row.Period_Gap start=end-pd.Timedelta(periods*6, unit='h') interp_period=obs_sim_working.loc[start:end].copy() #Check if the ratio tolerance limits are violated start_ratio=interp_period.iloc[0].Inst_Ratio end_ratio=interp_period.iloc[-1].Inst_Ratio ratio_check1=abs(start_ratio-end_ratio)>2 ratio_check2=max(start_ratio,end_ratio)>5 #If interpolation type is difference or if ratio check are violated use the difference method if (self.interp_type[0].lower()=='d') | (ratio_check1) | (ratio_check2): #print('Difference Procedure Initiated') interp_period['Inst_Difference']=interp_period.Inst_Difference.interpolate() interp_period['AdjustQ_Inst']=interp_period.simulated+interp_period.Inst_Difference #If not using difference interpolation method, use the ratio method elif self.interp_type[0].lower()=='r': #print('Ratio Procedure Initiated') interp_period['Inst_Ratio']=interp_period.Inst_Ratio.interpolate() interp_period['AdjustQ_Inst']=interp_period.simulated*interp_period.Inst_Ratio #If you get here, an erroneous interpolation type was selected else: msg = "Interp method must be 'ratio' or 'difference', got '{self.interp_type}'." raise ValueError(msg) #Add the interpolated data to the obs_sim_working dataframe obs_sim_working.loc[start:end,['AdjustQ_Inst']]=interp_period.loc[start:end,['AdjustQ_Inst']] return obs_sim_working #Steps to fill in missing data where the gap is < blend parameter def _adjustq_inst_largegaps(self,obs_sim_working:pd.DataFrame,gap_list:pd.DataFrame): ''' Steps to fill in missing data where the gap is > blend parameter When the gap in observed data is too large to fill with interpolation, this procedure uses a "blend" method to ensure a smooth transition. It blends the difference between the simulated and observed discharge at the start and end of the gap over the specified ``blend`` steps. Args: obs_sim_working (pd.DataFrame): Internal working DataFrame which is used to build the adjustq timeseries gap_list (pd.DataFrame): Internal DataFrame used to identify periods with large gaps Returns: pd.DataFrame: The updated working DataFrame with large gaps filled. ''' #Steps to fill in missing data where the gap is > blend parameter for index, row in gap_list.iterrows(): #Pull back end of the blend period end=index end_blend=end-pd.Timedelta(self.blend*6, unit='h') end_blend_period=obs_sim_working.loc[end_blend:end].copy() end_blend_period.reset_index(inplace=True) #Get the difference to blend end_diff=end_blend_period.iloc[-1].Inst_Difference #Blend the difference over the specified blend period end_blend_period['AdjustQ_Inst']=end_blend_period.simulated+(end_blend_period.index/self.blend)*end_diff end_blend_period.set_index('datetime_local_tz',inplace=True) #Add the interpolated data to the obs_sim_working dataframe obs_sim_working.loc[end_blend:end,['AdjustQ_Inst']]=end_blend_period.loc[:,['AdjustQ_Inst']] periods=row.Period_Gap #Pull front end of the blend period start=end-pd.Timedelta(periods*6, unit='h') start_blend=start+pd.Timedelta(self.blend*6, unit='h') start_blend_period=obs_sim_working.loc[start:start_blend].copy() start_blend_period.reset_index(inplace=True) #Get the difference to blend start_diff=start_blend_period.iloc[0].Inst_Difference #Blend the difference over the specified blend period start_blend_period['AdjustQ_Inst']=start_blend_period.simulated+(1-start_blend_period.index/self.blend)*start_diff start_blend_period.set_index('datetime_local_tz',inplace=True) #Add the interpolated data to the obs_sim_working dataframe obs_sim_working.loc[start:start_blend,['AdjustQ_Inst']]=start_blend_period.loc[:,['AdjustQ_Inst']] return obs_sim_working def _adjustq_daily(self,obs_sim_working:pd.DataFrame): ''' Corrects simulated discharge using mean daily discharge values. This iterative procedure scales the instantaneous discharge so that its daily average matches the observed daily average. It repeats until the error is within the specified tolerance (``error_tol``) or ``max_iterations`` is reached. Args: obs_sim_working (pd.DataFrame): Internal working DataFrame which is used to build the adjustq timeseries. Returns: pd.DataFrame: The updated working DataFrame with daily volume corrections applied. ''' i = 1 max_error=self.error_tol+1 while (i < self.max_iterations+1) & (self.error_tol<max_error): #print(i) #Get Daily Average simulated flow considering both 00:00 time values on either end (given half weight). daily_avg=pd.DataFrame((obs_sim_working.AdjustQ_Inst.rolling(5,center=True).sum()+obs_sim_working.AdjustQ_Inst.rolling(3,center=True).sum())/8) #12z time has the correct daily weighted average flow daily_avg=daily_avg.loc[daily_avg.index.hour==12] daily_avg.rename(columns={'AdjustQ_Inst':'Daily_Sim'},inplace=True) #Remove any Na values daily_avg=daily_avg.loc[~daily_avg.Daily_Sim.isna()] #change timestep to daily daily_avg=daily_avg.resample('1D').sum() #Pull observed daily data daily_avg['Daily_Obs']=self.obs_daily.loc[self.obs_daily.index.isin(daily_avg.index)] #Get the daily ratio to adjust the instantaneous flow daily_avg['Daily_Ratio']=daily_avg.Daily_Obs.divide(daily_avg.Daily_Sim) #Get the pbias to track the tolerance daily_avg['Pbias']=abs((daily_avg.Daily_Sim-daily_avg.Daily_Obs)/daily_avg.Daily_Sim) #Convert the index to a include a hour timestep. daily_avg.index=daily_avg.index+pd.Timedelta(12, unit='h') #Add the daily ratio to the obs_sim_working dataframe set at non 00:00 time values #For the 00:00 time use average of the two days obs_sim_working['Daily_Ratio']=daily_avg.Daily_Ratio obs_sim_working['Daily_Ratio']=obs_sim_working.Daily_Ratio.interpolate(method='nearest',limit=1,limit_direction='both') obs_sim_working['Daily_Ratio']=obs_sim_working.Daily_Ratio.interpolate(limit_direction='both',limit=2) #update the instant flow values using the ratio daily_index = obs_sim_working.loc[obs_sim_working.Daily_Ratio.notna()].index obs_sim_working.loc[daily_index,'AdjustQ_Inst']=obs_sim_working.loc[daily_index,'AdjustQ_Inst']*obs_sim_working.loc[daily_index,'Daily_Ratio'] i += 1 max_error=daily_avg.Pbias.max() #print(max_error) return obs_sim_working
[docs] def adjustq_calc(self): ''' Primary AdjustQ calculation function when NWRFC autocalibration simulation (``sim``) is provided. This method orchestrates the full adjustment process: 1. Identifies gaps in observed data. 2. Calls :meth:`.AdjustQ._adjustq_inst_smallgaps` for short missing periods. 3. Calls :meth:`.AdjustQ._adjustq_inst_largegaps` for long missing periods. 4. Calls :meth:`.AdjustQ._adjustq_daily` to match daily volumes. 5. Clips any negative results to zero. Returns: pd.Series: A Series with a adjustq timeseries (units: cfs) ''' ###############PREP Data################################ if self.sim is None: msg = "No model simulation timeseries defined. Use `inst_mean_q_merge` instead." raise ValueError(msg) #Format the daily observed flow # daily_q = self.obs_daily.copy(deep=True) # daily_q.index.rename('date_local_tz',inplace=True) # daily_q.rename('Daily_Avg_Streamflow_cfs',inplace=True) #Format the instantaneous observed flow inst_q = self.obs_inst.copy(deep=True) inst_q.index.rename('datetime_local_tz',inplace=True) inst_q.rename('observed',inplace=True) #Grab the nearest instantaneous value, within 15min, to each 6hr timestep obs_6h_begin=inst_q.index[0].floor(freq='D') obs_6h_end=inst_q.index[-1].ceil(freq='D') obs_6h=pd.DataFrame(index=pd.date_range(start=obs_6h_begin,end=obs_6h_end,freq='6h')) obs_6h.index.rename('datetime_local_tz',inplace=True) obs_6h=pd.merge_asof(obs_6h,inst_q,left_index=True,right_index=True,tolerance=pd.Timedelta('15m'),direction='nearest') #Remove any values below zero obs_6h=obs_6h[obs_6h.observed>0] #Format Simulated 6hr Data sim = self.sim.copy(deep=True) sim.rename('simulated',inplace=True) sim.index.rename('datetime_local_tz',inplace=True) #Extend the simulated data to span complete days sim_begin=sim.index[0].floor(freq='D') sim_end=sim.index[-1].ceil(freq='D') sim_extend=pd.Index(data=pd.date_range(start=sim_begin,end=sim_end,freq='6h'),name='datetime_local_tz') sim = sim.reindex(sim_extend) sim = sim.interpolate(limit_direction='both') #Working DataFrame working=pd.concat([obs_6h,sim],axis=1) working['Inst_Ratio']=working['observed']/working['simulated'] working['Inst_Difference']=working['observed']-working['simulated'] working['AdjustQ_Inst']=working['observed'] ####################AdjustQ Instantaneous Discharge################# #Find gaps in the observed period obs_gaps=obs_6h.loc[~obs_6h.observed.isna()].copy() obs_gaps['Period_Gap']=(obs_gaps.index.to_series().diff()/pd.to_timedelta('1 hour'))/6 obs_gaps=obs_gaps.loc[obs_gaps.Period_Gap>1,['Period_Gap']].sort_values(by='Period_Gap') obs_gaps_interp=obs_gaps.loc[obs_gaps.Period_Gap<self.blend] obs_gaps_blend=obs_gaps.loc[obs_gaps.Period_Gap>=self.blend] #Adjust gaps in the observed data less than the blend length working=self._adjustq_inst_smallgaps(working,obs_gaps_interp) #Adjust gaps in the observed data more than the blend length working=self._adjustq_inst_largegaps(working,obs_gaps_blend) #Condense working dataframe to only include the period where there is simulation data working=working.loc[~working.simulated.isna()] #For any rows that haven't been filled (didn't meet critera to use inst data), use simulation data directly working.loc[working.AdjustQ_Inst.isna(),['AdjustQ_Inst']]=working.loc[working.AdjustQ_Inst.isna(),['simulated']].values ####################AdjustQ Mean Daily################# working=self._adjustq_daily(working) #Replace any negative flow value with zero working['AdjustQ_Inst']=working.AdjustQ_Inst.clip(lower=0) self.__raw_output = working.AdjustQ_Inst return self.__raw_output
[docs] def inst_mean_q_merge(self): ''' Merges instantaneous observed streamflow data with daily mean streamflow data when NWRFC autocalibration simulation (``sim``) is ``'None'``. For locations like reservoir outflows. Where available this method uses instantaneous data to shape daily mean flow using :meth:`.AdjustQ._adjustq_daily`. Returns: pd.Series: A Series containing the merged and adjusted discharge timeseries (units: cfs). ''' ###############PREP Data################################ #Format the daily observed flow #Shifting forward 1 timestep to have 00:00 be part of previous day average daily_q_6h = self.obs_daily.copy(deep=True) daily_q_6h = daily_q_6h.resample('6h').ffill() daily_q_6h.index = daily_q_6h.index+pd.Timedelta(6, unit='h') daily_q_6h.index.rename('datetime_local_tz',inplace=True) daily_q_6h.rename('Inst_Streamflow_cfs',inplace=True) #Format the instantaneous observed flow inst_q = self.obs_inst.copy(deep=True) inst_q.index.rename('datetime_local_tz',inplace=True) inst_q.rename('Inst_Streamflow_cfs',inplace=True) #Grab the nearest instantaneous value, within 2hours, to each 6hr timestep inst_q_6h_begin=daily_q_6h.index[0].floor(freq='D') inst_q_6h_end=daily_q_6h.index[-1].ceil(freq='D') inst_q_6h=pd.DataFrame(index=pd.date_range(start=inst_q_6h_begin,end=inst_q_6h_end,freq='6h')) inst_q_6h.index.rename('datetime_local_tz',inplace=True) #Grab the nearest available the instantaneous data within 15min of the 6hr timesteps inst_q_6h=pd.merge_asof(inst_q_6h,inst_q,left_index=True,right_index=True,tolerance=pd.Timedelta('15m'),direction='nearest') #Remove any values below zero inst_q_6h=inst_q_6h[inst_q_6h.Inst_Streamflow_cfs>0] #For timestep where instantaneousdata was not available, supplement with daily data inst_q_6h_merge=pd.DataFrame({'AdjustQ_Inst':inst_q_6h.Inst_Streamflow_cfs.combine_first(daily_q_6h)}) ####################AdjustQ Mean Daily################# inst_q_6h_merge=self._adjustq_daily(inst_q_6h_merge) #Replace any negative flow value with zero inst_q_6h_merge['AdjustQ_Inst']=inst_q_6h_merge.AdjustQ_Inst.clip(lower=0) self.__raw_output = inst_q_6h_merge.AdjustQ_Inst return self.__raw_output
[docs] @classmethod def load_example(cls, sim:bool=True, **kwargs): ''' Generates a :class:`AdjustQ` for the package provided example data. Example data are NWRFC auto calibration results for location: * **NRKW1**: Nooksack River at North Cedarville, WA (USGS-12210700). The simulated timeseries is utilizing the following models: :class:`~nwsrfs_py.nwsrfs.SacSnow`, :class:`~nwsrfs_py.nwsrfs.GammaUh`, :class:`~nwsrfs_py.nwsrfs.Lagk`. Any optional arguments associated with :class:`AdjustQ`, can be passed. Args: sim (bool): Incorporate the model simulation results in the AdjustQ calculation. Default: ``True``. **kwargs: Optional keyword arguments passed to :class:`AdjustQ`. Common options include: * **blend** (int): Threshold for determining how many time steps of missing observed instantaneous data constitutes a "large gap". Default: 10. * **interp_type** (str): Correction procedure used to correct simulated discharges for missing gaps smaller than the blend threshold. Accepts ``'ratio'`` or ``'difference'``. Default: ``'ratio'``. * **error_tol** (float): Percent tolerance that the instantaneous daily average must match the observed daily flow average. Default: 0.01. * **max_iterations** (int): Maximum number of iterations to adjust the instantaneous daily average to match the observed daily flow. Default: 15. ''' # Define the mapping within the method to keep it clean config = { 'daily_flow_csv': 'flow_daily_NRKW1.csv', 'inst_flow_csv': 'flow_instantaneous_NRKW1.csv', 'sim_flow_dir': 'results_por_02', } #If the sim option is set to False then set the sim_flow_dir key to None if not sim: config['sim_flow_dir'] = None #Return :class:`nwsrfs_py.adjustq.AdjustQ` for NRKW1. with utils._get_example_dir('NRKW1') as example_dir: # Note: Ensure __init__ loads all data BEFORE the context manager closes #Set paths daily_flow_path = os.path.join(example_dir,config['daily_flow_csv']) inst_flow_path = os.path.join(example_dir,config['inst_flow_csv']) #If the sim option is set to False then set the sim_path to None if sim: sim_path = os.path.join(example_dir,config['sim_flow_dir']) else: sim_path = None return cls(daily_flow_path, inst_flow_path, sim_path, **kwargs)