import datetime
import sys
import networkx as nx
import numpy as np
import pandas as pd
# Importing the module, not anything in it, to avid a circular import
import powersimdata.input.grid as _grid
from powersimdata.network.model import ModelImmutables
[docs]def check_grid(grid):
"""Check whether an object is an internally-consistent Grid object.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:raises ValueError: if ``grid`` has any inconsistency
"""
_check_grid_type(grid)
error_messages = []
# Run all checks which operate on a Grid object
for check in [
_check_attributes,
_check_for_islanded_buses,
_check_for_undescribed_buses,
_check_bus_against_bus2sub,
_check_ac_interconnects,
_check_transformer_substations,
_check_line_voltages,
_check_plant_against_gencost,
_check_connected_components,
_check_for_loop_branches,
]:
try:
check(grid, error_messages)
except Exception:
error_messages.append(
f"Exception during {check.__name__}: {sys.exc_info()[1]!r}"
)
# Run checks which operate on a pandas data frame
for gencost_key in ("before", "after"):
try:
_check_gencost(grid.gencost[gencost_key], error_messages)
except Exception:
error_messages.append(
f"Exception during _check_gencost: {gencost_key}: "
f"{sys.exc_info()[1]!r}"
)
if len(error_messages) > 0:
collected = "\n".join(error_messages)
raise ValueError(f"Problem(s) found with grid:\n{collected}")
def _check_attributes(grid, error_messages):
"""Check whether a Grid object has the required attributes.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:param list error_messages: list, to be appended to with a str if:
``grid`` is missing one or more required attributes.
"""
required = {
"branch",
"bus",
"bus2sub",
"dcline",
"data_loc",
"gencost",
"grid_model",
"interconnect",
"model_immutables",
"plant",
"storage",
"sub",
}
for r in required:
if not hasattr(grid, r):
error_messages.append(f"grid object must have attribute {r}.")
def _check_for_islanded_buses(grid, error_messages):
"""Check whether a transmission network (AC & DC) does not connect to one or more
buses.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:param list error_messages: list, to be appended to with a str if:
branches/DC lines exist in the ``grid``, but one or more buses are islanded.
"""
if len(grid.branch) + len(grid.dcline) > 0:
connected_buses = set().union(
set(grid.branch.from_bus_id),
set(grid.branch.to_bus_id),
set(grid.dcline.from_bus_id),
set(grid.dcline.to_bus_id),
)
isolated_buses = set(grid.bus.index) - connected_buses
if len(isolated_buses) > 0:
error_messages.append(f"islanded buses detected: {isolated_buses}.")
def _check_for_undescribed_buses(grid, error_messages):
"""Check whether any transmission elements are connected to buses that are not
described in the bus table.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:param list error_messages: list, to be appended to with a str if:
any transmission elements are connected to buses that are not described in the
bus table of the ``grid``.
"""
expected_buses = set().union(
set(grid.branch.from_bus_id),
set(grid.branch.to_bus_id),
set(grid.dcline.from_bus_id),
set(grid.dcline.to_bus_id),
)
undescribed_buses = expected_buses - set(grid.bus.index)
if len(undescribed_buses) > 0:
error_messages.append(
"buses present in transmission network but missing from bus table: "
f"{undescribed_buses}."
)
def _check_bus_against_bus2sub(grid, error_messages):
"""Check whether indices of bus and bus2sub tables match.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:param list error_messages: list, to be appended to with a str if:
indices of bus and bus2sub tables of the ``grid`` don't match.
"""
if not set(grid.bus.index) == set(grid.bus2sub.index):
error_messages.append("indices for bus and bus2sub don't match.")
def _check_ac_interconnects(grid, error_messages):
"""Check whether any AC branches bridge across interconnections.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:param list error_messages: list, to be appended to with a str if:
any AC branches bridge across interconnections of the ``grid``.
"""
from_interconnect = grid.branch.from_bus_id.map(grid.bus.interconnect)
to_interconnect = grid.branch.to_bus_id.map(grid.bus.interconnect)
if not all(from_interconnect == to_interconnect):
non_matching_ids = grid.branch.index[from_interconnect != to_interconnect]
error_messages.append(
"branch(es) connected across multiple interconnections: "
f"{non_matching_ids}."
)
def _check_transformer_substations(grid, error_messages):
"""Check whether any transformers are are not within exactly one same substation.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:param list error_messages: list, to be appended to with a str if:
any transformers in the ``grid`` are not within exactly one same substation.
"""
txfmr_branch_types = {"Transformer", "TransformerWinding"}
branch = grid.branch
transformers = branch.loc[branch.branch_device_type.isin(txfmr_branch_types)]
from_sub = transformers.from_bus_id.map(grid.bus2sub.sub_id)
to_sub = transformers.to_bus_id.map(grid.bus2sub.sub_id)
if not all(from_sub == to_sub):
non_matching_transformers = transformers.index[from_sub != to_sub]
error_messages.append(
"transformer(s) connected across multiple substations: "
f"{non_matching_transformers}."
)
def _check_line_voltages(grid, error_messages):
"""Check whether any lines connect across different voltage levels.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:param list error_messages: list, to be appended to with a str if:
any lines in the ``grid`` connect across different voltage levels.
"""
lines = grid.branch.query("branch_device_type == 'Line'")
from_kV = lines.from_bus_id.map(grid.bus.baseKV) # noqa: N806
to_kV = lines.to_bus_id.map(grid.bus.baseKV) # noqa: N806
if not all(from_kV == to_kV):
non_matching_lines = lines.index[from_kV != to_kV]
error_messages.append(
f"line(s) connected across multiple voltages: {non_matching_lines}."
)
def _check_plant_against_gencost(grid, error_messages):
"""Check whether indices of plant and gencost tables match.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:param list error_messages: list, to be appended to with a str if:
indices of plant and gencost tables of the ``grid`` don't match.
"""
if not (
set(grid.plant.index)
== set(grid.gencost["before"].index)
== set(grid.gencost["after"].index)
):
error_messages.append("indices for plant and gencost don't match.")
def _check_connected_components(grid, error_messages):
"""Check whether connected components and listed interconnects of a grid match.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:param list error_messages: list, to be appended to with a str if:
connected components and listed interconnects of a ``grid`` don't match.
"""
g = nx.from_pandas_edgelist(grid.branch, "from_bus_id", "to_bus_id")
num_connected_components = len([c for c in nx.connected_components(g)])
if len(grid.interconnect) == 1:
# Check for e.g. ['USA'] interconnect, which is really three interconnects
interconnect_aliases = grid.model_immutables.zones["name2interconnect"]
if grid.interconnect[0] in interconnect_aliases:
num_interconnects = len(interconnect_aliases[grid.interconnect[0]])
else:
num_interconnects = 1
else:
num_interconnects = len(grid.interconnect)
if num_interconnects != num_connected_components:
error_messages.append(
f"This grid contains {num_connected_components} connected components, "
f"but is specified as having {num_interconnects} interconnects: "
f"{grid.interconnect}."
)
def _check_for_loop_branches(grid, error_messages):
"""Check whether any branches in a grid have the same start and end bus.
:param powersimdata.input.grid.Grid grid: grid or grid-like object to check.
:param list error_messages: list, to be appended to with a str if:
there are any branches with the same start and end bus.
"""
if not all(grid.branch.from_bus_id != grid.branch.to_bus_id):
loop_lines = grid.branch.query("from_bus_id == to_bus_id").index # noqa: F841
error_messages.append(f"This grid contains loop lines: {list(loop_lines)}")
def _check_grid_models_match(grid1, grid2):
"""Check whether an object is an internally-consistent Grid object.
:param powersimdata.input.grid.Grid grid1: first Grid instance.
:param powersimdata.input.grid.Grid grid2: second Grid instance.
:raises ValueError: if the grid models don't match.
"""
_check_grid_type(grid1)
_check_grid_type(grid2)
if not grid1.grid_model == grid2.grid_model:
raise ValueError(
f"Grid models don't match: {grid1.grid_model}, {grid2.grid_model}"
)
def _check_data_frame(df, label):
"""Ensure that input is a pandas data frame.
:param pandas.DataFrame df: a data frame.
:param str label: name of data frame (used for error messages).
:raises TypeError: if df is not a data frame or label is not a str.
:raises ValueError: if data frame is empty.
"""
if not isinstance(label, str):
raise TypeError("label must be a str")
if not isinstance(df, pd.DataFrame):
raise TypeError(label + " must be a pandas.DataFrame object")
if not df.shape[0] > 0:
raise ValueError(label + " must have at least one row")
if not df.shape[1] > 0:
raise ValueError(label + " must have at least one column")
def _check_time_series(ts, label):
"""Check that a time series is specified properly.
:param pandas.DataFrame/pandas.Series ts: time series to check.
:param str label: name of time series (used for error messages).
:raises TypeError: if ts is not a data frame/time series or label is not a str.
:raises ValueError: if indices are not timestamps.
"""
if not isinstance(label, str):
raise TypeError("label must be a str")
if not isinstance(ts, (pd.DataFrame, pd.Series)):
raise TypeError(label + " must be a pandas.DataFrame or pandas.Series object")
if not ts.shape[0] > 0:
raise ValueError(label + " must have at least one row")
if not isinstance(ts.index, pd.DatetimeIndex):
raise ValueError(label + " must be a time series")
def _check_grid_type(grid):
"""Ensure that ``grid`` is a Grid object.
:param powersimdata.input.grid.Grid grid: a Grid instance.
:raises TypeError: if input is not a Grid instance.
"""
if not isinstance(grid, _grid.Grid):
raise TypeError(f"grid must be a {_grid.Grid} object")
def _check_areas_and_format(areas, mi=None):
"""Ensure that areas are valid. Duplicates are removed and state/country
abbreviations are converted to their actual name.
:param str/list/tuple/set areas: areas(s) to check. Could be load zone name(s),
state/country name(s)/abbreviation(s) or interconnect(s).
:param powersimdata.network.model.ModelImmutables mi: immutables of a grid model.
:raises TypeError: if ``areas`` is not a list/tuple/set of str.
:raises ValueError: if ``areas`` is empty or not valid.
:return: (*set*) -- areas as a set. State/Country abbreviations are converted to
state/country names.
"""
if mi is None:
mi = ModelImmutables("usa_tamu")
if isinstance(areas, str):
areas = {areas}
elif isinstance(areas, (list, set, tuple)):
if not all([isinstance(z, str) for z in areas]):
raise TypeError("all areas must be str")
areas = set(areas)
else:
raise TypeError("areas must be a str or a list/tuple/set of str")
if len(areas) == 0:
raise ValueError("areas must be non-empty")
all_areas = set().union(*(mi.zones[z] for z in mi.zones["mappings"]))
if not areas <= all_areas:
diff = areas - all_areas
raise ValueError("invalid area(s): %s" % " | ".join(diff))
abv_in_areas = [z for z in areas if z in mi.zones["abv"]]
for a in abv_in_areas:
areas.remove(a)
areas.add(mi.zones[f"abv2{mi.zones['division']}"][a])
return areas
def _check_resources_and_format(resources, mi=None):
"""Ensure that resources are valid and convert variable to a set.
:param str/list/tuple/set resources: resource(s) to check.
:param powersimdata.network.model.ModelImmutables mi: immutables of a grid model.
:raises TypeError: if resources is not a list/tuple/set of str.
:raises ValueError: if resources is empty or not valid.
:return: (*set*) -- resources as a set.
"""
if mi is None:
mi = ModelImmutables("usa_tamu")
if isinstance(resources, str):
resources = {resources}
elif isinstance(resources, (list, set, tuple)):
if not all([isinstance(r, str) for r in resources]):
raise TypeError("all resources must be str")
resources = set(resources)
else:
raise TypeError("resources must be a str or a list/tuple/set of str")
if len(resources) == 0:
raise ValueError("resources must be non-empty")
if not resources <= mi.plants["all_resources"]:
diff = resources - mi.plants["all_resources"]
raise ValueError("invalid resource(s): %s" % " | ".join(diff))
return resources
def _check_resources_are_renewable_and_format(resources, mi=None):
"""Ensure that resources are valid renewable resources and convert variable to
a set.
:param powersimdata.network.model.ModelImmutables mi: immutables of a grid model.
:param str/list/tuple/set resources: resource(s) to analyze.
:raises ValueError: if resources are not renewables.
return: (*set*) -- resources as a set
"""
if mi is None:
mi = ModelImmutables("usa_tamu")
resources = _check_resources_and_format(resources, mi=mi)
if not resources <= mi.plants["renewable_resources"]:
diff = resources - mi.plants["all_resources"]
raise ValueError("invalid renewable resource(s): %s" % " | ".join(diff))
return resources
def _check_areas_are_in_grid_and_format(areas, grid):
"""Ensure that list of areas are in grid.
:param dict areas: keys are area types: '*loadzone*', '*state*'/'*country*'' or
'*interconnect*'. Values are str/list/tuple/set of areas.
:param powersimdata.input.grid.Grid grid: Grid instance.
:return: (*dict*) -- modified areas dictionary. Keys are area types ('*loadzone*',
'*state*'/'*country*' or '*interconnect*'). State abbreviations, if present,
are converted to state names. Values are areas as a set.
:raises TypeError: if areas is not a dict or its keys are not str.
:raises ValueError: if area type is invalid, an area in not in ``grid`` or an
invalid loadzone, state/country or interconnect is passed.
"""
_check_grid_type(grid)
if not isinstance(areas, dict):
raise TypeError("areas must be a dict")
mi = grid.model_immutables
division_type = mi.zones["division"]
areas_formatted = {}
for a in areas.keys():
if a in ["loadzone", division_type, "interconnect"]:
areas_formatted[a] = set()
all_loadzones = set()
for k, v in areas.items():
if not isinstance(k, str):
raise TypeError("area type must be a str")
elif k == "interconnect":
interconnects = _check_areas_and_format(v, mi)
for i in interconnects:
try:
all_loadzones.update(mi.zones["interconnect2loadzone"][i])
except KeyError:
raise ValueError("invalid interconnect: %s" % i)
areas_formatted["interconnect"].update(interconnects)
elif k == division_type:
divisions = _check_areas_and_format(v, mi)
for s in divisions:
try:
all_loadzones.update(mi.zones[f"{division_type}2loadzone"][s])
except KeyError:
raise ValueError(f"invalid {division_type}: %s" % s)
areas_formatted[division_type].update(divisions)
elif k == "loadzone":
loadzones = _check_areas_and_format(v, mi)
for l in loadzones:
if l not in mi.zones["loadzone"]:
raise ValueError("invalid load zone: %s" % l)
all_loadzones.update(loadzones)
areas_formatted["loadzone"].update(loadzones)
else:
raise ValueError("invalid area type")
valid_loadzones = set(grid.plant["zone_name"].unique())
if not all_loadzones <= valid_loadzones:
diff = all_loadzones - valid_loadzones
raise ValueError("%s not in in grid" % " | ".join(diff))
return areas_formatted
def _check_resources_are_in_grid_and_format(resources, grid):
"""Ensure that resource(s) is represented in at least one generator in the grid
used for the scenario.
:param str/list/tuple/set resources: resource(s) to analyze.
:param powersimdata.input.grid.Grid grid: a Grid instance.
:return: (*set*) -- resources as a set.
:raises ValueError: if resources is not used in scenario.
"""
_check_grid_type(grid)
resources = _check_resources_and_format(resources, grid.model_immutables)
valid_resources = set(grid.plant["type"].unique())
if not resources <= valid_resources:
diff = resources - valid_resources
raise ValueError("%s not in in grid" % " | ".join(diff))
return resources
def _check_plants_are_in_grid(plant_id, grid):
"""Ensure that list of plant id are in grid.
:param list/tuple/set plant_id: list of plant id.
:param powersimdata.input.grid.Grid grid: Grid instance.
:raises TypeError: if plant_id is not a list of int or str.
:raises ValueError: if plant id is not in network.
"""
_check_grid_type(grid)
if not (
isinstance(plant_id, (list, tuple, set))
and all([isinstance(p, (int, str)) for p in plant_id])
):
raise TypeError("plant_id must be a a list/tuple/set of int or str")
if not set([str(p) for p in plant_id]) <= set([str(i) for i in grid.plant.index]):
raise ValueError("plant_id must be subset of plant index")
def _check_number_hours_to_analyze(scenario, hours):
"""Ensure that number of hours is greater than simulation length.
:param powersimdata.scenario.scenario.Scenario scenario: scenario instance.
:param int hours: number of hours to analyze.
:raises TypeError: if hours is not int.
:raises ValueError: if hours is negative or greater than simulation length
"""
start_date = pd.Timestamp(scenario.info["start_date"])
end_date = pd.Timestamp(scenario.info["end_date"])
if not isinstance(hours, int):
raise TypeError("hours must be an int")
if hours < 1:
raise ValueError("hours must be positive")
if hours > (end_date - start_date).total_seconds() / 3600 + 1:
raise ValueError("hours must not be greater than simulation length")
def _check_date(date):
"""Check date is valid.
:param pandas.Timestamp/numpy.datetime64/datetime.datetime date: timestamp.
:raises TypeError: if date is improperly formatted.
"""
if not isinstance(date, (pd.Timestamp, np.datetime64, datetime.datetime)):
raise TypeError(
"date must be a pandas.Timestamp, a numpy.datetime64 or a datetime.datetime object"
)
def _check_date_range_in_scenario(scenario, start, end):
"""Check if start time and end time define a valid time range of the given scenario.
:param powersimdata.scenario.scenario.Scenario scenario: scenario instance.
:param pandas.Timestamp/numpy.datetime64/datetime.datetime start: start date.
:param pandas.Timestamp/numpy.datetime64/datetime.datetime end: end date.
:raises ValueError: if the date range is invalid.
"""
_check_date(start)
_check_date(end)
scenario_start = pd.Timestamp(scenario.info["start_date"])
scenario_end = pd.Timestamp(scenario.info["end_date"])
if not scenario_start <= start <= end <= scenario_end:
raise ValueError("Must have scenario_start <= start <= end <= scenario_end")
def _check_date_range_in_time_series(ts, start, end):
"""Check if start time and end time define a valid time range of the time series.
:param pandas.DataFrame/pandas.Series ts: a time series with timestamp as indices.
:param pandas.Timestamp/numpy.datetime64/datetime.datetime start: start date.
:param pandas.Timestamp/numpy.datetime64/datetime.datetime end: end date.
:raises ValueError: if the date range is invalid.
"""
_check_time_series(ts, "time series")
_check_date(start)
_check_date(end)
if not ts.index[0] <= start <= end <= ts.index[-1]:
raise ValueError(
"Must have time_series_start <= start <= end <= time_series_end"
)
def _check_epsilon(epsilon):
"""Ensure epsilon is valid.
:param float/int epsilon: precision for binding constraints.
:raises TypeError: if epsilon is not a float or an int.
:raises ValueError: if epsilon is negative.
"""
if not isinstance(epsilon, (float, int)):
raise TypeError("epsilon must be numeric")
if epsilon < 0:
raise ValueError("epsilon must be non-negative")
def _check_gencost(gencost, error_messages=None):
"""Check that gencost is valid.
:param pandas.DataFrame gencost: cost curve polynomials.
:param list error_messages: list to append error messages to. If `error_messages``
is None and an error is encountered, an Exception will be raised instead.
:raises TypeError: if ``error_messages`` is None and: gencost is not a data frame,
or polynomial degree is not an int.
:raises ValueError: if data frame has no rows, does not have the required columns,
curves are not polynomials and have not the appropriate coefficients.
"""
try:
# check for nonempty dataframe
if isinstance(gencost, pd.DataFrame):
_check_data_frame(gencost, "gencost")
else:
print(gencost)
raise TypeError("gencost must be a pandas.DataFrame object")
# check for proper columns
required_columns = ("type", "n")
for r in required_columns:
if r not in gencost.columns:
raise ValueError("gencost must have column " + r)
# check that gencosts are all specified as type 2 (polynomial)
cost_type = gencost["type"]
if not cost_type.where(cost_type == 2).equals(cost_type):
raise ValueError("each gencost must be type 2 (polynomial)")
# check that all gencosts are specified as same order polynomial
if not (gencost["n"].nunique() == 1):
raise ValueError("all polynomials must be of same order")
# check that this order is an integer > 0
n = gencost["n"].iloc[0]
if not isinstance(n, (int, np.integer)):
raise TypeError("polynomial degree must be specified as an int")
if n < 1:
raise ValueError("polynomial must be at least of order 1 (constant)")
# check that the right columns are here for this dataframe
coef_columns = ["c" + str(i) for i in range(n)]
for c in coef_columns:
if c not in gencost.columns:
raise ValueError(f"gencost of order {n} must have column {c}")
except Exception:
if error_messages is not None:
error_messages.append(repr(sys.exc_info()[1]))
else:
raise