Source code for powersimdata.scenario.execute
import pickle
import pandas as pd
from powersimdata.data_access.context import Context
from powersimdata.input.configure import adjust_pmin, adjust_ramp30, linearize_gencost
from powersimdata.input.grid import Grid
from powersimdata.input.input_data import InputData
from powersimdata.input.transform_grid import TransformGrid
from powersimdata.input.transform_profile import TransformProfile
from powersimdata.scenario.ready import Ready
from powersimdata.utility.config import get_deployment_mode
[docs]class Execute(Ready):
"""Scenario is in a state of being executed.
:param powersimdata.scenario.scenario.Scenario scenario: scenario instance.
"""
name = "execute"
allowed = ["delete"]
exported_methods = {
"check_progress",
"extract_simulation_output",
"launch_simulation",
"prepare_simulation_input",
"print_scenario_status",
"scenario_id",
} | Ready.exported_methods
def __init__(self, scenario):
"""Constructor."""
super().__init__(scenario)
self.refresh(scenario)
@property
def scenario_id(self):
"""Get the current scenario id
:return: (*str*) -- scenario id
"""
return self._scenario_info["id"]
[docs] def refresh(self, scenario):
"""Called during state changes to ensure instance is properly initialized
:param powersimdata.scenario.scenario.Scenario scenario: scenario instance
"""
print(
"SCENARIO: %s | %s\n"
% (self._scenario_info["plan"], self._scenario_info["name"])
)
print("--> State\n%s" % self.name)
print("--> Status\n%s" % self._scenario_status)
self._set_ct_and_grid()
self._launcher = Context.get_launcher(scenario)
def _get_kwargs(self):
"""Hack to determine reduction parameter for europe_tub
:return: (*dict*) -- additional kwargs to create a Grid object
"""
info = self._scenario_info
if info["grid_model"] == "europe_tub":
version = info["base_demand"].split("_")
if len(version) > 1:
return {"reduction": version[1]}
return {}
def _set_ct_and_grid(self):
"""Sets change table and grid."""
extra_args = self._get_kwargs()
base_grid = Grid(
self._scenario_info["interconnect"].split("_"),
source=self._scenario_info["grid_model"],
**extra_args,
)
if self._scenario_info["change_table"] == "Yes":
input_data = InputData()
self.ct = input_data.get_data(self._scenario_info, "ct")
self.grid = TransformGrid(base_grid, self.ct).get_grid()
else:
self.ct = {}
self.grid = base_grid
def _update_scenario_status(self):
"""Updates scenario status."""
self._scenario_status = self._execute_list_manager.get_status(self.scenario_id)
def _update_scenario_info(self):
"""Updates scenario information."""
self._scenario_info = self._scenario_list_manager.get_scenario(self.scenario_id)
[docs] def print_scenario_status(self):
"""Prints scenario status."""
print("---------------")
print("SCENARIO STATUS")
print("---------------")
self._update_scenario_status()
print(self._scenario_status)
[docs] def prepare_simulation_input(self, profiles_as=None):
"""Prepares scenario for execution
:param int/str/None profiles_as: if given, copy profiles from this scenario.
:raises TypeError: if profiles_as parameter not a str or int.
"""
if profiles_as is not None and not isinstance(profiles_as, (str, int)):
raise TypeError("profiles_as must be None, str, or int.")
self._update_scenario_status()
if self._scenario_status == "created":
print("---------------------------")
print("PREPARING SIMULATION INPUTS")
print("---------------------------")
si = SimulationInput(
self._data_access, self._scenario_info, self.grid, self.ct
)
for p in ["demand", "hydro", "solar", "wind"]:
si.prepare_profile(p, profiles_as)
si.prepare_grid()
if "demand_flexibility" in self.ct:
# Prepare all specified demand flexibility profiles
for p in list(self.ct["demand_flexibility"]):
if p != "demand_flexibility_duration":
si.prepare_profile(p, profiles_as)
si.prepare_demand_flexibility_parameters()
prepared = "prepared"
self._execute_list_manager.set_status(self.scenario_id, prepared)
self._scenario_status = prepared
else:
print("---------------------------")
print("SCENARIO CANNOT BE PREPARED")
print("---------------------------")
print("Current status: %s" % self._scenario_status)
return
def _check_if_ready(self):
"""Check if the current scenario is ready to launch
:raises ValueError: if status is invalid
"""
self._update_scenario_status()
valid_status = ["prepared", "failed", "finished"]
if self._scenario_status not in valid_status:
raise ValueError(
f"Status must be one of {valid_status}, but got status={self._scenario_status}"
)
[docs] def launch_simulation(self, threads=None, solver=None, extract_data=True):
"""Launches simulation on target environment
:param int/None threads: the number of threads to be used. This defaults to None,
where None means auto.
:param str solver: the solver used for optimization. This defaults to
None, which translates to gurobi
:param bool extract_data: whether the results of the simulation engine should
automatically extracted after the simulation has run. This defaults to True.
:return: (*subprocess.Popen*) or (*dict*) - the process, if using ssh to server,
otherwise a dict containing status information.
"""
self._check_if_ready()
mode = get_deployment_mode()
print(f"--> Launching simulation on {mode.lower()}")
return self._launcher.launch_simulation(threads, solver, extract_data)
[docs] def check_progress(self):
"""Get the status of an ongoing simulation, if possible
:return: (*dict*) -- either None if using ssh, or a dict which contains
"output", "errors", "scenario_id", and "status" keys which map to
stdout, stderr, and the respective scenario attributes
"""
return self._launcher.check_progress()
[docs] def extract_simulation_output(self):
"""Extracts simulation outputs {PG, PF, LMP, CONGU, CONGL} on server.
:return: (*subprocess.Popen*) -- new process used to extract output
data.
"""
self._update_scenario_status()
if self._scenario_status == "finished":
return self._launcher.extract_simulation_output()
else:
print("---------------------------")
print("OUTPUTS CANNOT BE EXTRACTED")
print("---------------------------")
print("Current status: %s" % self._scenario_status)
return
[docs]class SimulationInput:
"""Prepares scenario for execution.
:param powersimdata.data_access.data_access.DataAccess data_access:
data access object.
:param dict scenario_info: scenario information.
:param powersimdata.input.grid.Grid grid: a Grid object.
:param dict ct: change table.
"""
def __init__(self, data_access, scenario_info, grid, ct):
"""Constructor."""
self._data_access = data_access
self._scenario_info = scenario_info
self.grid = grid
self.ct = ct
self.scenario_id = scenario_info["id"]
self.REL_TMP_DIR = self._data_access.tmp_folder(self.scenario_id)
[docs] def prepare_grid(self):
"""Prepare grid for simulation."""
print("--> Preparing grid data")
grid = self.grid
storage = grid.storage
adjust_pmin(grid)
adjust_ramp30(grid.plant)
grid.gencost["after"] = linearize_gencost(grid.gencost["before"], grid.plant)
storage["gencost"] = linearize_gencost(storage["gencost"], storage["gen"])
dest_path = "/".join([self.REL_TMP_DIR, "grid.pkl"])
with self._data_access.write(dest_path, save_local=False) as f:
pickle.dump(self.grid, f)
[docs] def prepare_profile(self, kind, profile_as=None, slice=False):
"""Prepares profile for simulation.
:param kind: one of *demand*, *'hydro'*, *'solar'*, *'wind'*,
*'demand_flexibility_up'*, *'demand_flexibility_dn'*,
*'demand_flexibility_cost_up'*, or *'demand_flexibility_cost_dn'*.
:param int/str profile_as: if given, copy profile from this scenario.
:param bool slice: whether to slice the profiles by the Scenario's time range.
"""
file_name = f"{kind}.csv"
dest_path = "/".join([self.REL_TMP_DIR, file_name])
if profile_as is None:
tp = TransformProfile(self._scenario_info, self.grid, self.ct, slice)
profile = tp.get_profile(kind)
with self._data_access.write(dest_path, save_local=False) as f:
profile.to_csv(f)
else:
from_dir = self._data_access.tmp_folder(profile_as)
src = "/".join([from_dir, file_name])
self._data_access.fs.copy(src, dest_path)
[docs] def prepare_demand_flexibility_parameters(self):
"""Prepares demand_flexibility parameters file for simulation."""
params = {}
params["enabled"] = [True]
params["interval_balance"] = [True]
params["rolling_balance"] = [
True
if "demand_flexibility_duration" in self.ct["demand_flexibility"]
else False
]
params["duration"] = [
self.ct["demand_flexibility"]["demand_flexibility_duration"]
if "demand_flexibility_duration" in self.ct["demand_flexibility"]
else 0
]
params = pd.DataFrame.from_dict(params)
dest_path = "/".join([self.REL_TMP_DIR, "demand_flexibility_parameters.csv"])
with self._data_access.write(dest_path, save_local=False) as f:
params.to_csv(f)