Source code for prereise.gather.hydrodata.eia.decompose_profile

import pandas as pd
from powersimdata.input.grid import Grid
from powersimdata.network.model import ModelImmutables

mi = ModelImmutables("usa_tamu")
abv2interconnect = mi.zones["abv2interconnect"]
abv2loadzone = mi.zones["abv2loadzone"]


[docs]def get_profile_by_state(profile, state, grid=None): """Decompose total hydro profile into plant level profile based on hydro generator capacities in the query state. :param pandas.Series/list profile: profile in query state. :param str state: the query state. :param powersimdata.input.grid.Grid grid: Grid instance. Use the generator capacities in the given grid if provided, otherwise use the base grid. :return: (*pandas.DataFrame*) -- hydro profile for each plant in the query state. :raises TypeError: if profile is not a time-series and/or state is not a str. :raises ValueError: if state is invalid. """ if not isinstance(profile, (pd.Series, list)): raise TypeError("profile must be a pandas.Series object or list") if not isinstance(state, str): raise TypeError("state must be a str") if state not in abv2loadzone.keys(): raise ValueError( "Invalid state. Possible states are %s" % " | ".join(set(abv2loadzone.keys())) ) if not grid: grid = Grid([abv2interconnect[state]]) plant = grid.plant hydro_plant_in_state = plant[ (plant.type == "hydro") & (plant["zone_name"].isin(abv2loadzone[state])) ] hydro_capacity_in_state = hydro_plant_in_state["Pmax"].sum() hydro_profile = pd.DataFrame( columns=hydro_plant_in_state.index, index=profile.index ) for i in hydro_profile.columns: factor = hydro_plant_in_state.loc[i]["Pmax"] / hydro_capacity_in_state plant_profile = [v * factor for v in profile] hydro_profile[i] = plant_profile.copy() return hydro_profile
[docs]def get_profile_by_plant(plant_df, total_profile): """Decompose total hydro profile into plant level profile based on hydro generator capacities in the dataframe. :param pandas.DataFrame plant_df: plant dataframe contains generator capacity as 'Pmax' for each entry. :param pandas.Series/list total_profile: aggregated profile to decompose :return: (*pandas.DataFrame*) -- hydro profile for each plant decomposed from the total_profile. :raises TypeError: if plant_df is not a pandas.Dataframe and/or total_profile is not a time-series and/or all elements in total_profile are numbers. :raises ValueError: if plant_df does not contain 'Pmax' as a column. """ if not isinstance(plant_df, pd.DataFrame): raise TypeError("plant_df must be a pandas.DataFrame object") if not isinstance(total_profile, (pd.Series, list)): raise TypeError("total_profile must be a pandas.Series object or list") if not all([isinstance(val, (float, int)) for val in total_profile]): raise TypeError("total_profile must be all numbers") if "Pmax" not in plant_df.columns: raise ValueError("Pmax must be one of the columns of plant_df") total_hydro_capacity = plant_df["Pmax"].sum() res_profile = pd.DataFrame(index=total_profile.index, columns=plant_df.index) for plantid in res_profile.columns: if total_hydro_capacity == 0: factor = 0 else: factor = plant_df.loc[plantid]["Pmax"] / total_hydro_capacity plant_profile = [val * factor for val in total_profile] res_profile[plantid] = plant_profile.copy() return res_profile
[docs]def get_normalized_profile(plant_df, plant_profile): """Normalize plant level profile by Pmax of the corresponding plant :param pandas.DataFrame plant_df: plant dataframe contains generator capacity as 'Pmax' for each entry. :param pandas.DataFrame plant_profile: plant level profile with values of generation :return: (*pandas.DataFrame*) -- normalized plant level profile :raises TypeError: if plant_df and/or plant_profile is not a pandas.Dataframe :raises ValueError: if plant_df does not contain 'Pmax' as a column and/or plants in plant_df don't match the ones in plant_profile """ if not isinstance(plant_df, pd.DataFrame): raise TypeError("plant_df must be a pandas.DataFrame object") if not isinstance(plant_profile, pd.DataFrame): raise TypeError("plant_profile must be a pandas.DataFrame object") if "Pmax" not in plant_df.columns: raise ValueError("Pmax must be one of the columns of plant_df") if sorted(plant_df.index) != sorted(plant_profile.columns): raise ValueError("plant_df and plant_profile must contain same plants") return plant_profile.divide(plant_df["Pmax"]).clip(0, 1).fillna(0)