import copy
import numpy as np
import pandas as pd
from powersimdata.input.configure import linearize_gencost
from powersimdata.input.grid import Grid
from powersimdata.utility.helpers import _check_import
[docs]def get_supply_data(grid, num_segments=1, save=None):
"""Accesses the generator cost and plant information data from a specified Grid
object.
:param powersimdata.input.grid.Grid grid: Grid object.
:param int num_segments: The number of segments into which the piecewise linear
cost curve will be split.
:param str save: Saves a .csv if a str representing a valid file path and file
name is provided. If None, nothing is saved.
:return: (*pandas.DataFrame*) -- Supply information needed to analyze cost and
supply curves.
:raises TypeError: if a powersimdata.input.grid.Grid object is not input, or
if the save parameter is not input as a str.
"""
# Check that a Grid object is input
if not isinstance(grid, Grid):
raise TypeError("A Grid object must be input.")
# Obtain a copy of the Grid object
grid = copy.deepcopy(grid)
# Access the generator cost and plant information data
gencost_df = linearize_gencost(grid.gencost["before"], grid.plant, num_segments)
plant_df = grid.plant
# Create a new DataFrame with the desired columns
supply_df = pd.concat(
[
plant_df[["type", "interconnect", "zone_name"]],
gencost_df[
gencost_df.columns.difference(
["type", "startup", "shutdown", "n", "interconnect"], sort=False
)
],
],
axis=1,
)
# Add p_diff and slope according to the number of cost curve segments
for i in range(num_segments):
supply_df["p_diff" + str(i + 1)] = (
supply_df["p" + str(i + 2)] - supply_df["p" + str(i + 1)]
)
supply_df["slope" + str(i + 1)] = (
supply_df["f" + str(i + 2)] - supply_df["f" + str(i + 1)]
) / supply_df["p_diff" + str(i + 1)]
# Save the supply data to a .csv file if desired
if save is not None:
if not isinstance(save, str):
raise TypeError("The file path and file name must be input as a str.")
else:
supply_df.to_csv(save)
# Return the necessary supply information
return supply_df
[docs]def check_supply_data(supply_data, num_segments=1):
"""Checks to make sure that the input supply data is a DataFrame and has the
correct columns. This is especially needed for checking instances where the input
supply data is not the DataFrame returned from get_supply_data().
:param pandas.DataFrame supply_data: DataFrame containing the supply curve
information.
:param int num_segments: The number of segments into which the piecewise linear
cost curve will be split.
:raises TypeError: if the input supply data is not a pandas.DataFrame.
:raises ValueError: if one of the mandatory columns is missing from the input
supply data.
"""
# Check that the data is input as a DataFrame
if not isinstance(supply_data, pd.DataFrame):
raise TypeError("supply_data must be input as a DataFrame.")
# Mandatory columns to be contained in the DataFrame
mand_cols = {
"type",
"interconnect",
"zone_name",
"c2",
"c1",
"c0",
}
# Add mandatory columns based on the number piecewise segments
for i in range(num_segments + 1):
mand_cols.update(["p" + str(i + 1), "f" + str(i + 1)])
if i > 0:
mand_cols.update(["p_diff" + str(i), "slope" + str(i)])
# Make sure all of the mandatory columns are contained in the input DataFrame
miss_cols = mand_cols - set(supply_data.columns)
if len(miss_cols) > 0:
raise ValueError(f'Missing columns: {", ".join(miss_cols)}')
[docs]def build_supply_curve(grid, num_segments, area, gen_type, area_type=None, plot=True):
"""Builds a supply curve for a specified area and generation type.
:param powersimdata.input.grid.Grid grid: Grid object.
:param int num_segments: The number of segments into which the piecewise linear
cost curve is split.
:param str area: Either the load zone, state name, state abbreviation, or
interconnect.
:param str/iterable gen_type: Generation type(s).
:param str area_type: one of *'loadzone'*, *'state'*, *'state_abbr'*,
*'interconnect'*. If set to None, type will be inferred.
:param bool plot: If True, the supply curve plot is shown. If False, the plot is
not shown.
:return: (*tuple*) -- First element is a list of capacity (MW) amounts needed
to create supply curve. Second element is a list of bids ($/MW) in the supply
curve.
:raises TypeError: if a powersimdata.input.grid.Grid object is not input.
:raises ValueError: if the specified area or generator type is not applicable.
"""
# Check that a Grid object is input
if not isinstance(grid, Grid):
raise TypeError("A Grid object must be input.")
# Check that the desired number of linearized cost curve segments is an int
if not isinstance(num_segments, int):
raise TypeError(
"The number of linearized cost curve segments must be input as an int."
)
# Check that whether a single generation type is specified
if isinstance(gen_type, str):
gen_type = set([gen_type])
# Obtain the desired generator cost and plant information data
supply_data = get_supply_data(grid, num_segments)
# Check the input supply data
check_supply_data(supply_data, num_segments)
# Check to make sure the generator type is valid
if len(gen_type - set(supply_data["type"].unique())) > 0:
raise ValueError(f"{gen_type} contains invalid generation type.")
# Identify the load zones that correspond to the specified area and area_type
returned_zones = grid.model_immutables.area_to_loadzone(area, area_type=area_type)
# Trim the DataFrame to only be of the desired area and generation type
supply_data = supply_data.loc[supply_data.zone_name.isin(returned_zones)]
supply_data = supply_data.loc[supply_data.type.isin(gen_type)]
# Remove generators that have no capacity (e.g., Maine coal generators)
if supply_data["slope1"].isnull().values.any():
supply_data.dropna(subset=["slope1"], inplace=True)
# Check if the area contains generators of the specified type
if supply_data.empty:
return [], []
# Combine the p_diff and slope information for each cost segment
supply_df_cols = []
for i in range(num_segments):
supply_df_cols.append(
supply_data.loc[:, ("p_diff" + str(i + 1), "slope" + str(i + 1))]
)
supply_df_cols[i].rename(
columns={"p_diff" + str(i + 1): "p_diff", "slope" + str(i + 1): "slope"},
inplace=True,
)
supply_df = pd.concat(supply_df_cols, axis=0)
# Sort the trimmed DataFrame by slope
supply_df = supply_df.sort_values(by="slope")
supply_df = supply_df.reset_index(drop=True)
# Determine the points that comprise the supply curve
capacity_data = []
price_data = []
capacity_diff_sum = 0
for i in supply_df.index:
capacity_data.append(capacity_diff_sum)
price_data.append(supply_df["slope"][i])
capacity_data.append(supply_df["p_diff"][i] + capacity_diff_sum)
price_data.append(supply_df["slope"][i])
capacity_diff_sum += supply_df["p_diff"][i]
# Plot the curve
if plot:
plt = _check_import("matplotlib.pyplot")
plt.figure(figsize=[20, 10])
plt.plot(capacity_data, price_data)
plt.title(f"Supply curve for selected generators in {area}", fontsize=20)
plt.legend(
["Generation types:\n{}".format("\n".join(list(gen_type)))], loc="best"
)
plt.xlabel("Capacity (MW)", fontsize=20)
plt.ylabel("Price ($/MW)", fontsize=20)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
plt.show()
# Return the capacity and bid amounts
return capacity_data, price_data
[docs]def lower_bound_index(desired_capacity, capacity_data):
"""Determines the index of the lower capacity value that defines a price segment.
Useful for accessing the prices associated with capacity values that aren't
explicitly stated in the capacity lists that are generated by the
build_supply_curve() function. Needed for ks_test().
:param float/int desired_capacity: Capacity value for which you want to determine
the index of the lowest capacity value in a price segment.
:param list capacity_data: List of capacity values used to generate a supply curve.
:return: (*int*) -- Index of a price segment's capacity lower bound.
"""
# Check that the list is not empty and that the capacity falls within the list range
if not capacity_data or capacity_data[0] > desired_capacity:
return None
# Get the index of the capacity that is immediately less than the desired capacity
for i, j in enumerate(capacity_data):
if j > desired_capacity:
return i - 1
[docs]def ks_test(
capacity_data1,
price_data1,
capacity_data2,
price_data2,
area=None,
gen_type=None,
plot=True,
):
"""Runs a test that is similar to the Kolmogorov-Smirnov test. This function takes
two supply curves as inputs and returns the greatest difference in price between
the two supply curves. This function requires that the supply curves offer the same
amount of capacity.
:param list capacity_data1: List of capacity values for the first supply curve.
:param list price_data1: List of price values for the first supply curve.
:param list capacity_data2: List of capacity values for the second supply curve.
:param list price_data2: List of price values for the second supply curve.
:param str area: Either the load zone, state name, state abbreviation, or
interconnect. Defaults to None because it's not essential.
:param str gen_type: Generation type. Defaults to None because it's not essential.
:param bool plot: If True, the supply curve plot is shown. If False, the plot is
not shown.
:return: (*float*) -- The maximum price difference between the two supply curves.
:raises TypeError: if the capacity and price inputs are not provided as lists.
:raises ValueError: if the supply curves do not offer the same amount of capacity.
"""
# Check that input capacities and prices are provided as lists
if not all(
isinstance(i, list)
for i in [capacity_data1, price_data1, capacity_data2, price_data2]
):
raise TypeError("Supply curve data must be input as lists.")
# Check that the supply curves offer the same amount of capacity
if max(capacity_data1) != max(capacity_data2):
raise ValueError(
"The two supply curves do not offer the same amount of capacity (MW)."
)
# Create a list that has every capacity value in which either supply curve steps up
capacity_data_all = list(set(capacity_data1) | set(capacity_data2))
capacity_data_all.sort()
# For each capacity value, associate the two corresponding price values
price_data_all = []
for i in range(len(capacity_data_all)):
# Determine the correpsonding price from the first supply curve
if capacity_data_all[i] == capacity_data1[-1]:
f1 = price_data1[-1]
else:
f1 = price_data1[lower_bound_index(capacity_data_all[i], capacity_data1)]
# Determine the correpsonding price from the second supply curve
if capacity_data_all[i] == capacity_data2[-1]:
f2 = price_data2[-1]
else:
f2 = price_data2[lower_bound_index(capacity_data_all[i], capacity_data2)]
# Pair the two price values
price_data_all.append([f1, f2])
# Determine the price differences for each capacity value
price_data_diff = [
abs(price_data_all[i][0] - price_data_all[i][1])
for i in range(len(price_data_all))
]
# Determine the maximum price difference
max_diff = max(price_data_diff)
# Plot the two supply curves overlaid
if plot:
plt = _check_import("matplotlib.pyplot")
plt.figure(figsize=[20, 10])
plt.plot(capacity_data1, price_data1)
plt.plot(capacity_data2, price_data2)
if None in {area, gen_type}:
plt.title("Supply Curve Comparison", fontsize=20)
else:
plt.title(
f"Supply curve comparison for {gen_type} generators in {area}",
fontsize=20,
)
plt.xlabel("Capacity (MW)", fontsize=20)
plt.ylabel("Price ($/MW)", fontsize=20)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
plt.show()
# Return the maximum price difference (this corresponds to the K-S statistic)
return max_diff
[docs]def plot_linear_vs_quadratic_terms(
grid,
area,
gen_type,
area_type=None,
plot=True,
zoom=False,
num_sd=3,
alpha=0.1,
):
"""Compares the linear (c1) and quadratic (c2) parameters from the quadratic
generator cost curves.
:param powersimdata.input.grid.Grid grid: Grid object.
:param str area: Either the load zone, state name, state abbreviation, or
interconnect.
:param str gen_type: Generation type.
:param str area_type: one of: *'loadzone'*, *'state'*, *'state_abbr'*,
*'interconnect'*. if set to None, the type will be inferred.
:param bool plot: If True, the linear term vs. quadratic term plot is shown. If
False, the plot is not shown.
:param bool zoom: If True, filters out quadratic term outliers to enable better
visualization. If False, there is no filtering.
:param float/int num_sd: The number of standard deviations used to filter out
quadratic term outliers.
:param float alpha: The alpha blending value for the scatter plot; takes values
between 0 (transparent) and 1 (opaque).
:return: (*None*) -- The linear term vs. quadratic term plot is displayed according
to the user.
:raises TypeError: if a powersimdata.input.grid.Grid object is not input.
:raises ValueError: if the specified area or generator type is not applicable.
"""
plt = _check_import("matplotlib.pyplot")
# Check that a Grid object is input
if not isinstance(grid, Grid):
raise TypeError("A Grid object must be input.")
# Obtain a copy of the Grid object
grid = copy.deepcopy(grid)
# Access the generator cost and plant information data
gencost_df = grid.gencost["before"]
plant_df = grid.plant
# Create a new DataFrame with the desired columns
supply_data = pd.concat(
[
plant_df[["type", "interconnect", "zone_name", "Pmin", "Pmax"]],
gencost_df[
gencost_df.columns.difference(
["type", "startup", "shutdown", "n", "interconnect"], sort=False
)
],
],
axis=1,
)
# Check to make sure the generator type is valid
if gen_type not in supply_data["type"].unique():
raise ValueError(f"{gen_type} is not a valid generation type.")
# Identify the load zones that correspond to the specified area and area_type
returned_zones = grid.model_immutables.area_to_loadzone(area, area_type=area_type)
# Trim the DataFrame to only be of the desired area and generation type
supply_data = supply_data.loc[supply_data.zone_name.isin(returned_zones)]
supply_data = supply_data.loc[supply_data["type"] == gen_type]
# Remove generators that have no capacity (e.g., Maine coal generators)
supply_data = supply_data[supply_data["Pmin"] != supply_data["Pmax"]]
# Check if the area contains generators of the specified type
if supply_data.empty:
return
# Filters out large c2 outlier values so the overall trend can be better visualized
zoom_name = ""
if zoom:
# Drop values outside a specified number of standard deviations of c2
quad_term_sd = np.std(supply_data["c2"])
quad_term_mean = np.mean(supply_data["c2"])
cutoff = quad_term_mean + num_sd * quad_term_sd
if len(supply_data[supply_data["c2"] > cutoff]) > 0:
zoom = True
supply_data = supply_data[supply_data["c2"] <= cutoff]
max_ylim = np.max(supply_data["c2"] + 0.01)
min_ylim = np.min(supply_data["c2"] - 0.01)
max_xlim = np.max(supply_data["c1"] + 1)
min_xlim = np.min(supply_data["c1"] - 1)
zoom_name = "(zoomed)"
else:
zoom = False
# Plot the c1 vs. c2 comparison
if plot:
fig, ax = plt.subplots()
fig.set_size_inches(20, 10)
plt.scatter(
supply_data["c1"],
supply_data["c2"],
s=np.sqrt(supply_data["Pmax"]) * 10,
alpha=alpha,
c=supply_data["Pmax"],
cmap="plasma",
)
plt.grid()
plt.title(
f"Linear term vs. Quadratic term for {gen_type} generator cost curves in "
+ f"{area} {zoom_name}",
fontsize=20,
)
if zoom:
plt.ylim([min_ylim, max_ylim])
plt.xlim([min_xlim, max_xlim])
plt.xlabel("Linear Term", fontsize=20)
plt.ylabel("Quadratic Term", fontsize=20)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
cbar = plt.colorbar()
cbar.set_label("Capacity (MW)", fontsize=20)
cbar.ax.tick_params(labelsize=20)
plt.show()
[docs]def plot_capacity_vs_price(
grid, num_segments, area, gen_type, area_type=None, plot=True
):
"""Plots the generator capacity vs. the generator price for a specified area
and generation type.
:param powersimdata.input.grid.Grid grid: Grid object.
:param int num_segments: The number of segments into which the piecewise linear
cost curve is split.
:param str area: Either the load zone, state name, state abbreviation, or
interconnect.
:param str gen_type: Generation type.
:param str area_type: one of: *'loadzone'*, *'state'*, *'state_abbr'*,
*'interconnect'*. If set to None, the type will be inferred.
:param bool plot: If True, the supply curve plot is shown. If False, the plot is
not shown.
:return: (*None*) -- The capacity vs. price plot is displayed according to the user.
:raises TypeError: if a powersimdata.input.grid.Grid object is not input.
:raises ValueError: if the specified area or generator type is not applicable.
"""
plt = _check_import("matplotlib.pyplot")
# Check that a Grid object is input
if not isinstance(grid, Grid):
raise TypeError("A Grid object must be input.")
# Check that the desired number of linearized cost curve segments is an int
if not isinstance(num_segments, int):
raise TypeError(
"The number of linearized cost curve segments must be input as an int."
)
# Obtain the desired generator cost and plant information data
supply_data = get_supply_data(grid, num_segments)
# Check the input supply data
check_supply_data(supply_data, num_segments)
# Check to make sure the generator type is valid
if gen_type not in supply_data["type"].unique():
raise ValueError(f"{gen_type} is not a valid generation type.")
# Identify the load zones that correspond to the specified area and area_type
returned_zones = grid.model_immutables.area_to_loadzone(area, area_type=area_type)
# Trim the DataFrame to only be of the desired area and generation type
supply_data = supply_data.loc[supply_data.zone_name.isin(returned_zones)]
supply_data = supply_data.loc[supply_data["type"] == gen_type]
# Remove generators that have no capacity (e.g., Maine coal generators)
if supply_data["slope1"].isnull().values.any():
supply_data.dropna(subset=["slope1"], inplace=True)
# Check if the area contains generators of the specified type
if supply_data.empty:
return
# Combine the p_diff and slope information for each cost segment
supply_df_cols = []
for i in range(num_segments):
supply_df_cols.append(
supply_data.loc[:, ("p_diff" + str(i + 1), "slope" + str(i + 1))]
)
supply_df_cols[i].rename(
columns={"p_diff" + str(i + 1): "p_diff", "slope" + str(i + 1): "slope"},
inplace=True,
)
supply_df = pd.concat(supply_df_cols, axis=0)
supply_df = supply_df.reset_index(drop=True)
# Determine the average price
total_capacity = supply_df["p_diff"].sum()
if total_capacity == 0:
average_price = 0
else:
average_price = (
supply_df["slope"] * supply_df["p_diff"]
).sum() / total_capacity
# Plot the comparison
if plot:
ax = supply_df.plot.scatter(
x="p_diff", y="slope", s=50, figsize=[20, 10], grid=True, fontsize=20
)
plt.title(
f"Capacity vs. Price for {gen_type} generators in {area}", fontsize=20
)
plt.xlabel("Segment Capacity (MW)", fontsize=20)
plt.ylabel("Segment Price ($/MW)", fontsize=20)
ax.plot(supply_df["p_diff"], [average_price] * len(supply_df.index), c="red")
plt.show()