Source code for powersimdata.input.converter.pypsa_to_grid

import warnings

import numpy as np
import pandas as pd

from powersimdata.input.abstract_grid import AbstractGrid
from powersimdata.input.const import grid_const
from powersimdata.input.const.pypsa_const import pypsa_const
from powersimdata.network.constants.carrier.storage import storage as storage_const


def _translate_df(df, key):
    """Rename columns of a data frame.

    :param pandas.DataFrame df: data frame to operate on.
    :param str key: key in :data:`powersimdata.input.const.pypsa_const.pypsa_const`
        dictionary.
    :return: (*pandas.DataFrame*) -- data frame with translated columns.
    """
    translators = _invert_dict(pypsa_const[key]["rename"])
    return df.rename(columns=translators)


def _invert_dict(d):
    """Revert dictionary

    :param dict d: dictionary to revert.
    :return: (*dict*) -- reverted dictionary.
    """
    return {v: k for k, v in d.items()}


def _get_storage_storagedata(df, storage_type):
    """Get storage data from PyPSA for data frame "StorageData" in PSD's
    storage dict.

    :param pandas.DataFrame df: PyPSA component dataframe.
    :param str storage_type: key for PyPSA storage type.
    :return: (*pandas.DataFrame*) -- data frame with storage data.
    """
    storage_storagedata = _translate_df(df, "storage_storagedata")

    if storage_type == "storage_units":
        e_nom = df.eval("p_nom * max_hours")
        state_of_charge_initial = df["state_of_charge_initial"]

    elif storage_type == "stores":
        e_nom = df["e_nom"]
        state_of_charge_initial = df["e_initial"]

        # Efficiencies of Store are captured in link/dcline
        storage_storagedata["OutEff"] = 1
        storage_storagedata["InEff"] = 1
    else:
        raise ValueError(
            "Inapplicable storage_type passed to function _get_storage_storagedata."
        )

    # Initial storage bounds: PSD's default is same as initial storage
    storage_storagedata["InitialStorageLowerBound"] = state_of_charge_initial
    storage_storagedata["InitialStorageUpperBound"] = state_of_charge_initial
    # Apply PSD's default relationships/assumptions for remaining columns
    storage_storagedata["InitialStorageCost"] = storage_const["energy_value"]
    storage_storagedata["TerminalStoragePrice"] = storage_const["energy_value"]
    storage_storagedata["rho"] = 1
    storage_storagedata["InitialStorage"] = e_nom / 2

    # fill with heuristic defaults if non-existent
    defaults = {
        "MinStorageLevel": e_nom * storage_const["min_stor"],
        "MaxStorageLevel": e_nom * storage_const["max_stor"],
        "ExpectedTerminalStorageMax": e_nom * storage_const["terminal_max"],
        "ExpectedTerminalStorageMin": e_nom * storage_const["terminal_min"],
    }
    for k, v in defaults.items():
        if k not in storage_storagedata:
            storage_storagedata[k] = v

    return storage_storagedata


def _get_storage_gencost(df):
    """Get storage data from PyPSA for data frame "gencost" in PSD's storage
    dict.

    :param pandas.DataFrame df: PyPSA component dataframe.
    :return: (*pandas.DataFrame*) -- data frame with storage data.
    """
    # There are "type" columns in gen and gencost with "type" column reserved
    # for gen dataframe, hence drop it here before renaming
    df = df.drop(columns="type", errors="ignore")
    storage_gencost = _translate_df(df, "storage_gencost")
    storage_gencost = storage_gencost.assign(type=2, n=3, c0=0, c2=0)
    return storage_gencost


def _get_storage_gen(df, storage_type):
    """Get storage data from PyPSA for data frame "gen" in PSD's storage dict.

    :param pandas.DataFrame df: PyPSA component dataframe.
    :param str storage_type: key for PyPSA storage type.
    :return: (*pandas.DataFrame*) -- data frame with storage data.
    """
    if storage_type == "storage_units":
        pmax = df["p_nom"] * df["p_max_pu"]
        pmin = df["p_nom"] * df["p_min_pu"]
    elif storage_type == "stores":
        pmax = np.inf
        pmin = -np.inf
    else:
        raise ValueError(
            "Inapplicable storage_type passed to function _get_storage_gen."
        )

    storage_gen = _translate_df(df, "storage_gen")
    storage_gen["Pmax"] = pmax
    storage_gen["Pmin"] = pmin
    storage_gen["ramp_30"] = pmax
    storage_gen["Vg"] = 1
    storage_gen["mBase"] = 100
    storage_gen["status"] = 1
    storage_gen["bus_id"] = pd.to_numeric(storage_gen.bus_id, errors="ignore")
    storage_gen["type"] = pd.to_numeric(storage_gen.type, errors="ignore")

    return storage_gen


[docs]class FromPyPSA(AbstractGrid): """Grid builder for PyPSA network object. :param pypsa.Network network: PyPSA network to read in. :param bool add_pypsa_cols: PyPSA data frames with renamed columns appended to Grid object data frames. """ def __init__(self, network, add_pypsa_cols=True): """Constructor""" super().__init__() self.network = network self.add_pypsa_cols = add_pypsa_cols def _set_interconnect(self): if self.interconnect is None: self.interconnect = self.network.name.split(", ") def _set_data_loc(self): if self.data_loc is not None: return if len(self.interconnect) > 1: self.data_loc = self.interconnect[0] else: self.data_loc = "pypsa" def _set_zone_mapping(self): n = self.network if any(self.id2zone) and any(self.zone2id): return # only relevant if the PyPSA network was originally created from PSD if "zone_id" in n.buses and "zone_name" in n.buses: uniques = ~n.buses.zone_id.duplicated() * n.buses.zone_id.notnull() self.zone2id = ( n.buses[uniques].set_index("zone_name").zone_id.astype(int).to_dict() ) self.id2zone = _invert_dict(self.zone2id)
[docs] def build(self): """PyPSA Network reader.""" n = self.network add_pypsa_cols = self.add_pypsa_cols # Read in data from PyPSA bus_pypsa = n.buses sub_pypsa = pd.DataFrame() gencost_cols = ["start_up_cost", "shut_down_cost", "marginal_cost"] gencost_pypsa = n.generators[gencost_cols] plant_pypsa = n.generators.drop(gencost_cols, axis=1) lines_pypsa = n.lines transformers_pypsa = n.transformers branch_pypsa = pd.concat( [lines_pypsa, transformers_pypsa], join="outer", ) dcline_pypsa = n.links[lambda df: df.index.str[:3] != "sub"] storageunits_pypsa = n.storage_units stores_pypsa = n.stores # bus df = bus_pypsa.drop(columns="type") bus = _translate_df(df, "bus") bus["type"] = bus.type.replace( ["(?i)PQ", "(?i)PV", "(?i)Slack", ""], [1, 2, 3, 4], regex=True ).astype(int) # substations # only relevant if the PyPSA network was originally created from PSD sub_cols = ["name", "interconnect_sub_id", "lat", "lon", "interconnect"] sub_pypsa_cols = [ "y", "x", ] if "is_substation" in bus: sub = bus[bus.is_substation][sub_cols] sub.index = sub[sub.index.str.startswith("sub")].index.str[3:] sub_pypsa = bus_pypsa[bus_pypsa.is_substation][sub_pypsa_cols] sub_pypsa.index = sub.index bus = bus[~bus.is_substation] bus_pypsa = bus_pypsa[~bus_pypsa.is_substation] bus2sub = bus[["substation", "interconnect"]].copy() bus2sub["sub_id"] = pd.to_numeric( bus2sub.pop("substation").str[3:], errors="coerce" ) else: # try to parse typical pypsa-eur(-sec) pattern for substations sub_pattern = "[A-Z][A-Z]\d+\s\d+$" sub = bus[bus.index.str.match(sub_pattern)].reindex(columns=sub_cols) sub["interconnect"] = np.nan sub["sub_id"] = sub.index sub_pypsa = bus_pypsa[bus_pypsa.index.str.match(sub_pattern)][ sub_pypsa_cols ] sub_pattern = "([A-Z][A-Z]\d+\s\d+).*" bus2sub = pd.DataFrame( { "sub_id": bus.index.str.extract(sub_pattern)[0].values, "interconnect": np.nan, }, index=bus.index, ) if sub.empty and bus2sub.empty: warnings.warn("Substations could not be parsed.") sub = pd.DataFrame() bus2sub = pd.DataFrame() # shunts # append PyPSA's shunts information to PSD's buses data frame on columns if not n.shunt_impedances.empty: shunts = _translate_df(n.shunt_impedances, "bus") bus[["Bs", "Gs"]] = shunts[["Bs", "Gs"]] # plant df = plant_pypsa.drop(columns="type") plant = _translate_df(df, "generator") plant["ramp_30"] = n.generators["ramp_limit_up"].fillna(0) plant["Pmin"] *= plant["Pmax"] # from relative to absolute value plant["lat"] = plant.bus_id.map(bus.lat) plant["lon"] = plant.bus_id.map(bus.lon) plant["bus_id"] = pd.to_numeric(plant.bus_id, errors="ignore") # generation costs # for type: type of cost model (1 piecewise linear, 2 polynomial), n: number of parameters for total cost function, c(0) to c(n-1): parameters gencost = _translate_df(gencost_pypsa, "gencost") gencost = gencost.assign( type=2, n=3, c0=0, c2=0, interconnect=plant.get("interconnect") ) # branch # lines drop_cols = ["x", "r", "b", "g"] df = lines_pypsa.drop(columns=drop_cols, errors="ignore") lines = _translate_df(df, "branch") lines["branch_device_type"] = lines.get("branch_device_type", "Line") # transformers df = transformers_pypsa.drop(columns=drop_cols, errors="ignore") transformers = _translate_df(df, "branch") transformers["branch_device_type"] = transformers.get( "branch_device_type", "Transformer" ) branch = pd.concat([lines, transformers], join="outer") # BE model assumes a 100 MVA base, pypsa "assumes" a 1 MVA base branch["x"] *= 100 branch["r"] *= 100 branch["from_lat"] = branch.from_bus_id.map(bus.lat) branch["from_lon"] = branch.from_bus_id.map(bus.lon) branch["to_lat"] = branch.to_bus_id.map(bus.lat) branch["to_lon"] = branch.to_bus_id.map(bus.lon) branch["from_bus_id"] = pd.to_numeric(branch.from_bus_id, errors="ignore") branch["to_bus_id"] = pd.to_numeric(branch.to_bus_id, errors="ignore") # DC lines dcline = _translate_df(dcline_pypsa, "link") dcline["Pmin"] *= dcline["Pmax"] # convert relative to absolute dcline["from_bus_id"] = pd.to_numeric(dcline.from_bus_id, errors="ignore") dcline["to_bus_id"] = pd.to_numeric(dcline.to_bus_id, errors="ignore") # storage units c = "storage_units" storage_gen_storageunits = _get_storage_gen(n.storage_units, c) storage_gencost_storageunits = _get_storage_gencost(n.storage_units) storage_storagedata_storageunits = _get_storage_storagedata(n.storage_units, c) inflow = n.get_switchable_as_dense("StorageUnit", "inflow") has_inflow = inflow.any() if has_inflow.any(): # add artificial buses suffix = " inflow" def add_suffix(s): return str(s) + suffix storage_gen_inflow = storage_gen_storageunits[has_inflow] buses_old = storage_gen_inflow.bus_id.astype(str) buses_new = storage_gen_inflow.index bus_rename = dict(zip(buses_old, buses_new)) bus_inflow = bus.reindex(buses_old).rename(index=bus_rename) bus2sub_inflow = bus2sub.reindex(buses_old).rename(index=bus_rename) # add discharging dcline (has same index as inflow storages) dcline_inflow = pd.DataFrame( { "from_bus_id": buses_new, "to_bus_id": buses_old, "Pmax": storage_gen_inflow.Pmax, "Pmin": storage_gen_inflow.Pmin, } ) # add inflow generator gen_inflow = storage_gen_inflow.rename(index=add_suffix) gen_inflow["bus_id"] = buses_new gen_inflow["Pmax"] = n.storage_units_t.inflow.max().rename(add_suffix) gen_inflow["capital_cost"] = 0.0 gen_inflow["p_nom_extendable"] = False gen_inflow["committable"] = False gen_inflow["type"] = "inflow" gen_inflow["lat"] = gen_inflow.bus_id.map(bus_inflow.lat) gen_inflow["lon"] = gen_inflow.bus_id.map(bus_inflow.lon) gen_inflow = gen_inflow.reindex(columns=plant.columns) gencost_inflow = storage_gencost_storageunits[has_inflow].rename( index=add_suffix ) gencost_inflow = gencost_inflow.assign( c0=0, c1=0, c2=0, type=2, startup=0, shutdown=0, n=3 ) # add everything to data storage_gen_storageunits.loc[has_inflow, "bus_id"] = buses_new storage_gen_storageunits.loc[ has_inflow, "Pmin" ] = -np.inf # don't limit charging from inflow bus = pd.concat([bus, bus_inflow]) bus2sub = pd.concat([bus2sub, bus2sub_inflow]) plant = pd.concat([plant, gen_inflow]) gencost = pd.concat([gencost, gencost_inflow]) dcline = pd.concat([dcline, dcline_inflow]) # stores c = "stores" storage_gen_stores = _get_storage_gen(n.stores, c) storage_gencost_stores = _get_storage_gencost(n.stores) storage_storagedata_stores = _get_storage_storagedata(n.stores, c) storage_genfuel = list(n.storage_units.carrier) + list(n.stores.carrier) # Pull operational properties into grid object if len(n.snapshots) == 1: bus = bus.assign(**self._translate_pnl(n.pnl("Bus"), "bus")) bus["Va"] = np.rad2deg(bus["Va"]) bus = bus.assign(**self._translate_pnl(n.pnl("Load"), "bus")) plant = plant.assign(**self._translate_pnl(n.pnl("Generator"), "generator")) _ = pd.concat( [ self._translate_pnl(n.pnl(c), "branch") for c in ["Line", "Transformer"] ] ) branch = branch.assign(**_) dcline = dcline.assign(**self._translate_pnl(n.pnl("Link"), "link")) else: plant["status"] = n.generators_t.status.any().astype(int) # Reindex data and add PyPSA columns data = dict() keys = [ "bus", "sub", "bus2sub", "plant", "gencost", "branch", "dcline", "storage_gen_storageunits", "storage_gencost_storageunits", "storage_storagedata_storageunits", "storage_gen_stores", "storage_gencost_stores", "storage_storagedata_stores", ] values = [ (bus, bus_pypsa, grid_const.col_name_bus), (sub, sub_pypsa, grid_const.col_name_sub), (bus2sub, None, grid_const.col_name_bus2sub), (plant, plant_pypsa, grid_const.col_name_plant), (gencost, gencost_pypsa, grid_const.col_name_gencost), (branch, branch_pypsa, grid_const.col_name_branch), (dcline, dcline_pypsa, grid_const.col_name_dcline), ( storage_gen_storageunits, storageunits_pypsa, grid_const.col_name_plant, ), ( storage_gencost_storageunits, storageunits_pypsa, grid_const.col_name_gencost, ), ( storage_storagedata_storageunits, storageunits_pypsa, grid_const.col_name_storage_storagedata, ), (storage_gen_stores, stores_pypsa, grid_const.col_name_plant), (storage_gencost_stores, stores_pypsa, grid_const.col_name_gencost), ( storage_storagedata_stores, stores_pypsa, grid_const.col_name_storage_storagedata, ), ] for k, v in zip(keys, values): df_psd, df_pypsa, const_location = v df_psd = df_psd.reindex(const_location, axis="columns") # Add renamed PyPSA columns if add_pypsa_cols and df_pypsa is not None: df_pypsa = df_pypsa.add_prefix("pypsa_") df_psd = pd.concat([df_psd, df_pypsa], axis=1) # Convert to numeric df_psd.index = pd.to_numeric(df_psd.index, errors="ignore") data[k] = df_psd for df in ( data["storage_gen_storageunits"], data["storage_gencost_storageunits"], data["storage_storagedata_storageunits"], ): df["pypsa_component"] = "storage_units" for df in ( data["storage_gen_stores"], data["storage_gencost_stores"], data["storage_storagedata_stores"], ): df["pypsa_component"] = "stores" # Build PSD grid object # Interconnect and data location # only relevant if the PyPSA network was originally created from PSD self._set_interconnect() self._set_data_loc() self._set_zone_mapping() self.bus = data["bus"] self.sub = data["sub"] self.bus2sub = data["bus2sub"] self.branch = data["branch"].sort_index() self.dcline = data["dcline"] self.plant = data["plant"] self.gencost["before"] = data["gencost"] self.gencost["after"] = data["gencost"] self.storage["gen"] = pd.concat( [data["storage_gen_storageunits"], data["storage_gen_stores"]], join="outer", ) self.storage["gencost"] = pd.concat( [data["storage_gencost_storageunits"], data["storage_gencost_stores"]], join="outer", ) self.storage["StorageData"] = pd.concat( [ data["storage_storagedata_storageunits"], data["storage_storagedata_stores"], ], join="outer", ) self.storage["genfuel"] = storage_genfuel self.storage.update(storage_const) # Set index names to match PSD self.bus.index.name = "bus_id" self.plant.index.name = "plant_id" self.gencost["before"].index.name = "plant_id" self.gencost["after"].index.name = "plant_id" self.branch.index.name = "branch_id" self.dcline.index.name = "dcline_id" self.sub.index.name = "sub_id" self.bus2sub.index.name = "bus_id" self.storage["gen"].index.name = "storage_id" self.storage["gencost"].index.name = "storage_id" self.storage["StorageData"].index.name = "storage_id" return self
def _translate_pnl(self, pnl, key): """Translate time-dependent data frames with one time step from PyPSA to static data frames. :param str pnl: name of the time-dependent dataframe. :param str key: key in :data:`powersimdata.input.const.pypsa_const.pypsa_const` dictionary. :return: (*pandas.DataFrame*) -- the static data frame. """ translators = _invert_dict(pypsa_const[key]["rename_t"]) df = pd.concat( {v: pnl[k].iloc[0] for k, v in translators.items() if k in pnl}, axis=1 ) return df