Source code for postreise.plot.plot_bar_shortfall

import matplotlib.pyplot as plt
import pandas as pd
from powersimdata.design.generation.clean_capacity_scaling import (
    add_demand_to_targets,
    add_resource_data_to_targets,
    add_shortfall_to_targets,
    calculate_overall_shortfall,
)
from powersimdata.scenario.scenario import Scenario


[docs]def plot_bar_shortfall( areas, scenario_ids, target_df, strategy=None, scenario_names=None, baseline_scenario=None, baseline_scenario_name=None, plot_show=True, ): """Plot a stacked bar chart of generation shortfall based on given targets for any number of scenarios. :param list/str areas: list of area(s) to show shortfall bar plots. If the target of an area is not defined in ``target_df``, it will be ignored. :param int/list/str scenario_ids: list of scenario id(s). :param pandas.DataFrame target_df: target data frame, which defines the clean energy target fraction, allowed resources and external historical amount of qualified energy for each area. :param dict strategy: a dictionary with keys being scenario ids and values being strategies, either *"collaborative"* or *"independent"*. *"collaborative"* is used if None. :param list/str scenario_names: list of scenario name(s) of same length as scenario ids, defaults to None. :param str/int baseline_scenario: scenario id that serves as a baseline in the bar chart, default to None. :param str baseline_scenario_name: specify the label of the baseline scenario shown in the bar chart, default to None, in which case the name of the scenario will be used. :param bool plot_show: display the generated figure or not, defaults to True. :return: (*matplotlib.axes.Axes*) -- axes object of the plot. :raises ValueError: if length of ``scenario_names`` and ``scenario_ids`` is different. :raises TypeError: if ``target_df`` is not a pandas.DataFrame. if ``strategy`` is provided but not in a dict format. if ``baseline_scenario`` is provided but not in a str/int format. if ``baseline_scenario_name`` is provided but not in a str format. """ if isinstance(areas, str): areas = [areas] if isinstance(scenario_ids, (int, str)): scenario_ids = [scenario_ids] if not isinstance(target_df, pd.DataFrame): raise TypeError("target_df must be a pandas.DataFrame") if strategy is None: strategy = dict() if not isinstance(strategy, dict): raise TypeError("strategy must be a dict") if isinstance(scenario_names, str): scenario_names = [scenario_names] if scenario_names is not None and len(scenario_names) != len(scenario_ids): raise ValueError("scenario_names must have same size as scenario_ids") if baseline_scenario is not None and not isinstance(baseline_scenario, (str, int)): raise TypeError("baseline_scenario must be a str or int") if baseline_scenario_name is not None and not isinstance( baseline_scenario_name, str ): raise TypeError("baseline_scenario_name must be a str") scenarios = dict() targets = dict() all_sids = scenario_ids + [baseline_scenario] if baseline_scenario else scenario_ids for sid in all_sids: s = Scenario(sid) tmp_df = target_df.copy() tmp_df = add_resource_data_to_targets(tmp_df, s) tmp_df = add_demand_to_targets(tmp_df, s) tmp_df = add_shortfall_to_targets(tmp_df) scenarios[sid] = s targets[sid] = tmp_df for area in areas: if area not in target_df.index and area != "all": print(f"{area} is skipped due to lack of target information in target_df!") continue ax_data = {} for i, sid in enumerate(scenario_ids): label = scenario_names[i] if scenario_names else scenarios[sid].info["name"] if area == "all": demand = targets[sid]["demand"].sum() shortfall = calculate_overall_shortfall( targets[sid], method=strategy.get(sid, "collaborative") ) ce_generated = targets[sid]["ce_target"].sum() - shortfall if baseline_scenario: baseline_shortfall = calculate_overall_shortfall( targets[baseline_scenario], method=strategy.get(baseline_scenario, "collaborative"), ) baseline = ( targets[baseline_scenario]["ce_target"].sum() - baseline_shortfall ) shortfall = max(shortfall, 0) else: demand = targets[sid].loc[area, "demand"] shortfall = targets[sid].loc[area, "ce_shortfall"] ce_generated = ( targets[sid].loc[area, "prev_ce_generation"] + targets[sid].loc[area, "external_ce_addl_historical_amount"] ) if baseline_scenario: baseline = ( targets[baseline_scenario].loc[area, "prev_ce_generation"] + targets[baseline_scenario].loc[ area, "external_ce_addl_historical_amount" ] ) if baseline_scenario: ax_data.update( { label: { baseline_scenario_name: round(100 * baseline / demand, 2), "Increment from baseline": max( 0, round(100 * (ce_generated - baseline) / demand, 2) ), "Missed target": round(100 * shortfall / demand, 2), } } ) else: ax_data.update( { label: { "Qualified clean energy": round( 100 * ce_generated / demand, 2 ), "Missed target": round(100 * shortfall / demand, 2), } } ) if area != "all": target_pct = round(100 * target_df.loc[area, "ce_target_fraction"], 2) else: # The overall target percentage is different among scenarios given the # demand might be different. Here we pick the first scenario in scenario_ids # to calculate the overall target percentage target_demand = targets[scenario_ids[0]]["demand"].sum() target_generation = targets[scenario_ids[0]]["ce_target"].sum() target_pct = round(100 * target_generation / target_demand, 2) if baseline_scenario: axes = _construct_shortfall_visuals( area, ax_data, target_pct, baseline=True, baseline_scenario_name=baseline_scenario_name, ) else: axes = _construct_shortfall_visuals( area, ax_data, target_pct, ) if plot_show: plt.show() else: return axes
def _construct_shortfall_visuals( zone, ax_data, target_pct, baseline=False, baseline_scenario_name=None ): """Plot formatted data. :param str zone: the zone name. :param dict ax_data: nested dictionary with keys on the top layer being scenario names and values are dictionaries with keys being the categories of the bar chart data and values being the numbers in percentage. :param float target_pct: target in terms of percentage of demand. :param bool baseline: a boolean indicator indicates whether there is a baseline scenario specified. :param str baseline_scenario_name: name of the baseline scenario. """ df = pd.DataFrame(ax_data).T if baseline: df = df[[baseline_scenario_name, "Increment from baseline", "Missed target"]] else: df = df[["Qualified clean energy", "Missed target"]] ax = df.plot.bar( stacked=True, color=["darkgreen", "yellowgreen", "salmon"] if baseline else ["yellowgreen", "salmon"], figsize=(10, 8), fontsize=16, ) ax.set_title(zone, fontsize=26) ax.set_ylim(top=1.33 * ax.get_ylim()[1]) ax.set_xticklabels(ax.get_xticklabels(), rotation=45, horizontalalignment="right") # Legend handles, labels = ax.get_legend_handles_labels() ax.legend( reversed(handles), reversed(labels), bbox_to_anchor=(1.556, 1.015), fontsize=14 ) # Add target line if target_pct > 0: ax_text = f"Target {target_pct:}% of demand" ax.text( 1.01, target_pct, ax_text, transform=ax.get_yaxis_transform(), fontsize=16, verticalalignment="center", ) ax.axhline(y=target_pct, dashes=(5, 2), color="black") # Percent numbers patch_indices = list(range(len(ax_data) * (2 + baseline)))[-1 * len(ax_data) :] ax_data_value = list(ax_data.values()) for i, ind in enumerate(patch_indices): if ax_data_value[i]["Missed target"] != 0: b = ax.patches[ind].get_bbox() ax.annotate( f"{ax_data_value[i]['Missed target']}%\nshortfall", (b.x1 - 0.5, b.y1 * 1.02), fontsize=16, )