Source code for postreise.plot.plot_shadowprice_map

import numpy as np
import pandas as pd
from bokeh.layouts import row
from bokeh.models import ColorBar, ColumnDataSource
from bokeh.plotting import figure
from bokeh.tile_providers import Vendors, get_provider
from bokeh.transform import linear_cmap
from powersimdata.input.check import _check_date
from powersimdata.scenario.scenario import Scenario

from postreise.plot.colors import shadow_price_pallette
from postreise.plot.projection_helpers import project_branch, project_bus


[docs]def plot_shadowprice(scenario_id, datetime, lmp_split_points=None): """Map lmp variation and shadow prices :param str/int scenario_id: scenario id :param pandas.Timestamp/numpy.datetime64/datetime.datetime datetime: timestamp of the hour to analyze :param list/tuple/set/numpy.array lmp_split_points: the locational marginal pricing (lmp) values we have chosen to split the bus data. Includes min and max values. Must have 10 items or fewer. defaults to None. Example: [-1, 1, 20, 25, 30, 35, 40, 100] :return: (*bokeh.models.layout.Row*) -- bokeh map visual in row layout. :raises TypeError: if *'scenario_id'* is not a str/int, *'datetime'* is not a str or *'lmp_split_points'* is not a list :raises ValueError: if *'lmp_split_points'* array has more than 10 elements """ if not isinstance(scenario_id, (str, int)): raise TypeError("scenario_id must be a str/int") _check_date(datetime) if lmp_split_points is not None: if not isinstance(lmp_split_points, list): raise TypeError("lmp_split_points must be a list") if len(lmp_split_points) > 10: raise ValueError("lmp_split_points must have 10 items or fewer") interconnect, bus, lmp, branch, cong = _get_shadowprice_data(scenario_id) lmp_split_points, bus_segments = _construct_bus_data( bus, lmp, lmp_split_points, datetime ) branches_selected = _construct_branch_data(branch, cong, datetime) return _construct_shadowprice_visuals( interconnect, lmp_split_points, bus_segments, branches_selected )
def _get_shadowprice_data(scenario_id): """Gets data necessary for plotting shadow price :param str/int scenario_id: scenario id :return: (*tuple*) -- interconnect as a str, bus data as a data frame, lmp data as a data frame, branch data as a data frame and congestion data as a data frame """ s = Scenario(scenario_id) interconnect = s.info["interconnect"] interconnect = " ".join(interconnect.split("_")) s_grid = s.get_grid() # Get bus and add location data bus_map = project_bus(s_grid.bus) # get branch and add location data branch_map = project_branch(s_grid.branch) # get congestion congu = s.get_congu() congl = s.get_congl() cong_abs = pd.DataFrame( np.maximum(congu.to_numpy(), congl.to_numpy()), columns=congu.columns, index=congu.index, ) return interconnect, bus_map, s.get_lmp(), branch_map, cong_abs def _construct_bus_data(bus_map, lmp, user_set_split_points, datetime): """Adds lmp data to each bus, splits buses into 9 segments by lmp :param pandas.DataFrame bus_map: bus data frame with location data :param pandas.DataFrame lmp: lmp data frame :param list/tuple/set/numpy.array user_set_split_points: user-set lmp values to split the bus data. Must have 10 items or fewer. Example: [-1, 1, 20, 25, 30, 35, 40, 100] :param pandas.Timestamp/numpy.datetime64/datetime.datetime datetime: timestamp of the hour to analyze :return: (*tuple*) -- the lmp vals we have chosen to split the bus data, bus data split into segments """ lmp_hour = lmp.loc[datetime] lmp_hour = lmp_hour.T lmp_hour.name = "lmp" bus_map = pd.concat([bus_map, lmp_hour], axis=1) lmp_split_points = ( user_set_split_points if user_set_split_points is not None else _get_lmp_split_points(bus_map) ) bus_segments = [ bus_map[(bus_map["lmp"] > p) & (bus_map["lmp"] <= c)] for p, c in zip(lmp_split_points, lmp_split_points[1:]) ] return lmp_split_points, bus_segments def _get_lmp_split_points(bus_map): """Determine up to ten points to split the bus data (inc. min and max lmp). Always split on lmp -1 and 1 if possible. Split the rest of the buses into groups with an equal number of members :param pandas.DataFrame bus_map: bus data with lmp :return: (*list*) -- lmp values for split points """ min_lmp = round(bus_map["lmp"].min(), 2) max_lmp = round(bus_map["lmp"].max(), 2) split_points = [min_lmp] # If we have busses with negative lmp we always split at -1 and 1 if min_lmp < -1: split_points.append(-1) if min_lmp < 1: split_points.append(1) # remove busses with lmp below 1 bus_map_pos = bus_map[(bus_map["lmp"] >= 1)] else: bus_map_pos = bus_map # split remaining busses into equally sized groups num_splits = 9 - len(split_points) quantiles = [(i + 1) / (num_splits + 1) for i in range(num_splits)] split_points += [round(bus_map_pos.lmp.quantile(val), 2) for val in quantiles] return split_points + [max_lmp] def _construct_branch_data(branch_map, cong, datetime): """Add congestion data for each branch :param pandas.DataFrame branch_map: data frame of branch data :param pandas.DataFrame cong: data frame of congestion data for the selected hour :param pandas.Timestamp/numpy.datetime64/datetime.datetime datetime: timestamp of the hour to analyze :return: (*pandas.DataFrame*) -- modified branch data with congestion data added for each branch """ # Add congestion to branch dataframe cong_hour = cong.loc[datetime] cong_hour = cong_hour.T cong_hour.name = "medianval" branch_map = pd.concat([branch_map, cong_hour], axis=1) # select branches that have a binding constraint and are of type Line branch_map = branch_map.loc[ (branch_map["medianval"] > 1e-6) & (branch_map["branch_device_type"] == "Line") ] return branch_map def _construct_shadowprice_visuals( interconnect, lmp_split_points, bus_segments, branch_data ): """Use bokeh to plot variation in lmp and shadow prices :param str interconnect: scenario interconnect :param list/tuple/set/numpy.array lmp_split_points: lmp values chosen to split the bus data :param list bus_segments: bus data split into 9 segments :param pandas.DataFrame branch_data: branch data :return: (*bokeh.models.layout.Row*) -- bokeh map visual in row layout. """ tools = "pan,wheel_zoom,reset,hover,save" p = figure( title=f"{interconnect} Interconnect", tools=tools, x_axis_location=None, y_axis_location=None, plot_width=800, plot_height=800, ) # Add USA map p.add_tile(get_provider(Vendors.CARTODBPOSITRON)) # Add colored circles for bus locations indices = list(range(len(bus_segments))) indices.reverse() # We want the lowest prices on top for i in indices: bus_cds = ColumnDataSource( { "x": bus_segments[i]["x"], "y": bus_segments[i]["y"], "lmp": bus_segments[i]["lmp"], } ) p.circle( "x", "y", color=shadow_price_pallette[i], alpha=0.4, size=11, source=bus_cds ) # Add branches branch_cds = ColumnDataSource( { "xs": branch_data[["from_x", "to_x"]].values.tolist(), "ys": branch_data[["from_y", "to_y"]].values.tolist(), "medianval": branch_data.medianval, } ) # branch outline p.multi_line("xs", "ys", color="black", line_width=14, source=branch_cds) # branch color palette = shadow_price_pallette[-5:] mapper = linear_cmap(field_name="medianval", palette=palette, low=0, high=2000) p.multi_line("xs", "ys", color=mapper, line_width=9, source=branch_cds) # Add legends bus_legend = _construct_bus_legend(lmp_split_points) branch_legend = ColorBar( color_mapper=mapper["transform"], width=16, location=(0, 0), title="SP ($/MWh)", title_text_font_size="8pt", title_standoff=8, ) p.add_layout(branch_legend, "right") return row(bus_legend, p) def _construct_bus_legend(lmp_split_points): """Construct the legend for lmp at each bus :param list/tuple/set/numpy.array lmp_split_points: lmp values chosen to split the bus data :return: (*bokeh.plotting.figure*) -- the legend showing lmp for each bus """ x_range = [""] bars, bar_len_sum, labels = _get_bus_legend_bars_and_labels(lmp_split_points) # Make legend p = figure( x_range=x_range, plot_height=800, plot_width=110, toolbar_location=None, tools="", ) p.vbar_stack( [*bars], x="x_range", width=0.9, color=shadow_price_pallette[: len(bars)], source={**{"x_range": [""]}, **bars}, ) p.y_range.start = -1 p.x_range.range_padding = 0.1 p.xgrid.grid_line_color = None p.axis.minor_tick_line_color = None p.outline_line_color = None p.yaxis.ticker = [*labels] p.yaxis.major_label_overrides = labels p.text( x=[-0.05], y=[bar_len_sum * 1.01], text=["LMP ($/MWh)"], text_font_size="8pt", text_font_style="italic", ) return p def _get_bus_legend_bars_and_labels(lmp_split_points): """Get the bar lengths and labels for the bus legend :param list/tuple/set/numpy.array lmp_split_points: lmp values chosen to split the bus data :return: (*tuple*) -- bar lengths and labels for the bus legend """ bars = {} bar_length_sum = 0 labels = {} # { y-position: label_text, ... } for i, (p, c) in enumerate(zip(lmp_split_points, lmp_split_points[1:])): bar_length = max(1, min(5, c - p)) bars[str(i)] = [bar_length] labels[bar_length_sum] = str(p) bar_length_sum += bar_length labels[bar_length_sum] = str(c) return bars, bar_length_sum, labels