import gc
import math
import warnings
import numpy as np
import pandas as pd
from scipy.optimize import linprog
from prereise.gather.demanddata.transportation_electrification import (
charging_optimization,
const,
data_helper,
dwelling,
)
warnings.simplefilter(action="ignore", category=FutureWarning)
[docs]def ldv_weekday_weekend_check(x, y):
"""Helper function to select weekday/weekend data rows
:param int x: data weekday/weekend value
:param int y: model year weekday/weekend value
:return: (*bool*) -- if data row matches whether the current day is a weekday/weekend
"""
return x == y
[docs]def hdv_use_all_data_rows(x, y):
"""Helper function to select weekday/weekend data rows
:param int x: data weekday/weekend value
:param int y: model year weekday/weekend value
:return: (*bool*) -- always returns true to use all data rows
"""
return True
[docs]def smart_charging(
model_year,
veh_range,
power,
location_strategy,
veh_type,
filepath,
external_signal,
bev_vmt,
census_region=None,
daily_values=None,
kwhmi=None,
trip_strategy=1,
input_day=None,
debug_printout=False,
):
"""Smart charging function
:param int model_year: year that is being modelled/projected to, 2017, 2030, 2040,
2050.
:param int veh_range: 100, 200, or 300, represents how far vehicle can travel on
single charge.
:param int power: charger power, EVSE kW.
:param int location_strategy: where the vehicle can charge-1, 2, 3, 4, or 5;
1-home only, 2-home and work related, 3-anywhere if possibile,
4-home and school only, 5-home and work and school.
:param str veh_type: determine which category (LDV or LDT) to produce charging
profiles for
:param str filepath: the path to the nhts mat file.
:param np.array external_signal: the initial load demand
:param float bev_vmt: BEV VMT value / scaling factor loaded from regional_scaling_factors.csv
:param int census_region: any of the 9 census regions defined by US census bureau.
:param pandas.Series daily_values: daily weight factors returned from
:func:`generate_daily_weighting`.
:param int kwhmi: fuel efficiency, should vary based on vehicle type and model_year.
:param int trip_strategy: determine to charge after any trip (1) or only after the
last trip (2)
:param numpy.ndarray input_day: daily list which specifies 1 if the day is a weekend,
and 2 if the day is a weekday
:param bool debug_printout: specify if additional model parameters will be printed
:return: (*numpy.ndarray*) -- charging profiles.
"""
# load NHTS data from function
if veh_type.lower() == "ldv":
newdata = data_helper.remove_ldt(data_helper.load_data(census_region, filepath))
# updates the weekend and weekday values in the nhts data
newdata = data_helper.update_if_weekend(newdata)
elif veh_type.lower() == "ldt":
newdata = data_helper.remove_ldv(data_helper.load_data(census_region, filepath))
# updates the weekend and weekday values in the nhts data
newdata = data_helper.update_if_weekend(newdata)
elif veh_type.lower() == "mdv":
newdata = data_helper.load_hdv_data("mhdv", filepath)
elif veh_type.lower() == "hdv":
newdata = data_helper.load_hdv_data("hhdv", filepath)
new_columns = [
"trip start battery charge",
"trip end battery charge",
"BEV could be used",
"Battery size",
"Electricity cost",
"Battery discharge",
"Battery charge",
"trip_number",
]
newdata = newdata.reindex(list(newdata.columns) + new_columns, axis=1, fill_value=0)
newdata["trip_number"] = newdata.groupby("vehicle_number").cumcount() + 1
if input_day is None:
input_day = data_helper.get_input_day(
data_helper.get_model_year_dti(model_year)
)
external_signal -= min(external_signal)
if kwhmi is None:
kwhmi = data_helper.get_kwhmi(model_year, veh_type, veh_range)
kwh = kwhmi * veh_range
if power > 19.2:
charging_efficiency = 0.95
else:
charging_efficiency = 0.9
# Add booleans for whether the location allows charging
if location_strategy == 3:
newdata["location_allowed"] = True
else:
if veh_type.lower() in {"ldv", "ldt"}:
allowed = const.ldv_location_allowed[location_strategy]
newdata["location_allowed"] = newdata["dwell_location"].isin(allowed)
elif veh_type.lower() in {"mdv", "hdv"}:
allowed = const.hdv_location_allowed[location_strategy]
newdata["location_allowed"] = newdata["dwell_location"].isin(allowed)
# Add booleans for whether the trip_number (compared to total trips) allows charging
if trip_strategy == 1:
newdata["trip_allowed"] = True
elif trip_strategy == 2:
newdata["trip_allowed"] = newdata["trip_number"] == newdata["total_trips"]
else:
raise ValueError("trip_strategy parameter is not valid, i.e. either 1 or 2")
# Add booleans for whether the dwell time is long enough to allow charging
newdata["dwell_allowed"] = newdata["dwell_time"] > 0.2
# Add boolean for whether this trip allows charging
allowed_cols = [
"location_allowed",
"trip_allowed",
"dwell_allowed",
]
newdata["charging_allowed"] = newdata[allowed_cols].apply(all, axis=1)
newdata["dwell_charging"] = (
newdata["charging_allowed"]
* newdata["dwell_time"]
* power
* charging_efficiency
)
grouped_trips = newdata.groupby("vehicle_number")
for vehicle_num, group in grouped_trips:
newdata.loc[group.index, "max_charging"] = newdata.loc[
group.index, "dwell_charging"
].sum()
newdata.loc[group.index, "required_charging"] = (
newdata.loc[group.index, "trip_miles"].sum() * kwhmi
)
# Filter for whenever available charging is insufficient to meet required charging
newdata = newdata.loc[
(newdata["required_charging"] <= newdata["max_charging"])
].copy()
# Filter for vehicle's battery range
newdata = newdata.loc[(newdata["total vehicle miles traveled"] < veh_range)]
# LDV and LDT filter for cyclical trips
if veh_type.lower() in {"ldv", "ldt"}:
filtered_census_data = pd.DataFrame(columns=const.nhts_census_column_names)
i = 0
while i < len(newdata):
total_trips = int(newdata.iloc[i, newdata.columns.get_loc("total_trips")])
# copy one vehicle information to the block
individual = newdata.iloc[i : i + total_trips].copy()
if individual["why_from"].iloc[0] == individual["dwell_location"].iloc[-1]:
filtered_census_data = pd.concat(
[filtered_census_data, individual], ignore_index=True
)
i += total_trips
newdata = filtered_census_data
nd_len = len(newdata)
model_year_profile = np.zeros(24 * len(input_day))
if veh_type.lower() in {"ldv", "ldt"}:
data_day = data_helper.get_data_day(newdata)
elif veh_type.lower() in {"mdv", "hdv"}:
data_day = np.ones(len(newdata)) + 1
daily_vmt_total = data_helper.get_total_daily_vmt(
newdata, input_day, veh_type.lower()
)
if veh_type.lower() in {"ldv", "ldt"}:
location_allowed = const.ldv_location_allowed
use_data_row = ldv_weekday_weekend_check
elif veh_type.lower() in {"mdv", "hdv"}:
location_allowed = const.hdv_location_allowed
use_data_row = hdv_use_all_data_rows
newdata = charging_optimization.get_constraints(
newdata,
kwhmi,
power,
trip_strategy,
location_strategy,
location_allowed,
charging_efficiency,
)
output_load_sum_list = []
day_num = len(input_day)
for day_iter in range(day_num):
print(f"Day: {day_iter}")
gc.collect()
adjusted_load = [
external_signal[i] + model_year_profile[i]
for i in range(
day_iter * 24, (day_iter + 1) * 24 + min(day_num - day_iter - 1, 2) * 24
)
]
if 3 - day_num + day_iter > 0:
adjusted_load += [
external_signal[i] + model_year_profile[i]
for i in range(24 * (3 - day_num + day_iter))
]
cost = np.array(adjusted_load)
g2v_load = np.zeros((100, 72))
individual_g2v_load = np.zeros((nd_len, 72))
# code developer debugging variables
individual_vmt = 0
individual_trip_miles = 0
linprog_charge_results = 0
optimization_fail = 0
flag_1_fail = 0
flag_2_fail = 0
flag_3_fail = 0
flag_4_fail = 0
missed_vmt = 0
if veh_type.lower() in {"ldv", "ldt"}:
missed_vehicles = pd.DataFrame(columns=const.nhts_census_column_names)
elif veh_type.lower() in {"mdv", "hdv"}:
missed_vehicles = pd.DataFrame(
columns=list(newdata.columns)
+ [
"power_allowed",
"charging consumption",
"seg",
"power",
"segcum",
"energy limit",
"rates",
]
)
i = 0
while i < nd_len:
# trip amount for each vehicle
total_trips = int(newdata.iloc[i, newdata.columns.get_loc("total_trips")])
if use_data_row(data_day[i], input_day[day_iter]):
# copy one vehicle information to the block
individual = newdata.iloc[i : i + total_trips].copy()
individual_vmt += float(
individual["total vehicle miles traveled"].iloc[0]
)
individual_trip_miles += float(individual["trip_miles"].sum())
individual["rates"] = individual.apply(
lambda d: dwelling.get_rates(
cost,
d["trip_end"],
d["dwell_time"],
),
axis=1,
)
charging_consumption = individual["charging consumption"].to_numpy()
rates = individual["rates"]
rates = [r for trip_rates in rates for r in trip_rates]
elimit = individual["energy limit"]
elimit = [el for energy_lim in elimit for el in energy_lim]
seg = individual["seg"].apply(int).to_numpy()
linprog_inputs = charging_optimization.calculate_optimization(
charging_consumption,
rates,
elimit,
seg,
total_trips,
kwh,
charging_efficiency,
)
linprog_result = linprog(**linprog_inputs)
exitflag = linprog_result.status
state_of_charge = np.zeros((total_trips, 2))
# find the feasible points
if exitflag == 0:
# fval is the value of the final cost, exitflag is the reason why the optimization terminates
# 0-success, 1-limit reached, 2-problem infeasible, 3-problem unbounded, 4-numerical difficulties
x_array = np.array(linprog_result.x)
# DANL EDITS
linprog_charge_results += x_array.sum().sum()
# can be an EV
individual.iloc[:, newdata.columns.get_loc("BEV could be used")] = 1
for n in range(total_trips):
# SOC drop in kwh, from driving
individual.iloc[
n, newdata.columns.get_loc("Battery discharge")
] = charging_consumption[n]
# G2V results
# define the G2V load during a trip
trip_g2v_load = np.zeros((1, 72))
start = math.floor(
individual.iloc[
n,
individual.columns.get_loc("trip_end"),
]
)
end = math.floor(
individual.iloc[
n,
individual.columns.get_loc("trip_end"),
]
+ individual.iloc[
n,
individual.columns.get_loc("dwell_time"),
]
)
dwell_location = int(
individual.iloc[
n, newdata.columns.get_loc("dwell_location")
]
)
segcum = np.cumsum(seg)
trip_g2v_load[:, start : end + 1] = (
x_array[segcum[n] - seg[n] : segcum[n]]
/ charging_efficiency
)
g2v_load[dwell_location, :] += trip_g2v_load[0, :]
individual_g2v_load[i + n][:] = trip_g2v_load
trip_g2v_cost = np.matmul(trip_g2v_load, cost)[0]
# charging charge. in DC
charge = sum(x_array[segcum[n] - seg[n] : segcum[n]])
# V2G results
trip_v2g_load = np.zeros((1, 72))
electricitycost = trip_g2v_cost
tripload = trip_v2g_load + trip_g2v_load
# update the cost function and convert from KW to MW
cost += (tripload / 1000 / daily_vmt_total[day_iter] * bev_vmt)[
0, :
]
# SOC rise in kwh, from charging
individual.iloc[
n, newdata.columns.get_loc("Battery charge")
] = charge
if n == 0:
state_of_charge[n][0] = charging_consumption[n]
state_of_charge[n][1] = state_of_charge[n][0] + charge
else:
state_of_charge[n][0] = (
state_of_charge[n - 1][1] + charging_consumption[n]
)
state_of_charge[n][1] = state_of_charge[n][0] + charge
individual.iloc[
n, newdata.columns.get_loc("Electricity cost")
] = electricitycost
# copy SOC back
individual.iloc[
n, newdata.columns.get_loc("trip start battery charge")
] = state_of_charge[n, 0]
individual.iloc[
n, newdata.columns.get_loc("trip end battery charge")
] = state_of_charge[n, 1]
# find the acutal battery size, in DC
batterysize = max(state_of_charge[:, 1]) - min(
state_of_charge[:, 0]
)
# copy to individual
individual.iloc[
:, newdata.columns.get_loc("Battery size")
] = batterysize
else:
# collected for debugging if desired
optimization_fail += 1
missed_vehicles = pd.concat(
[
missed_vehicles,
individual.reset_index(inplace=True, drop=True),
],
ignore_index=True,
)
missed_vmt += individual["trip_miles"].sum()
if exitflag == 1:
flag_1_fail += 1
elif exitflag == 2:
flag_2_fail += 1
elif exitflag == 3:
flag_3_fail += 1
elif exitflag == 4:
flag_4_fail += 1
# update the counter to the next vehicle
i += total_trips
outputelectricload = sum(g2v_load)
output_load_sum_list.append(np.sum(outputelectricload))
# create wrap-around indexing function
trip_window_indices = np.arange(day_iter * 24, day_iter * 24 + 72) % len(
model_year_profile
)
# MW
scaled_output = (
outputelectricload
* daily_values[day_iter]
/ (daily_vmt_total[day_iter] * 1000)
* bev_vmt
)
if debug_printout is True:
output_check_array = (
outputelectricload
/ kwhmi
/ daily_vmt_total[day_iter]
* charging_efficiency
)
output_check_sum = output_check_array.sum()
print(
f"Unscaling the optimiation output should be close to or equal to 1 and is : {output_check_sum}"
)
print(
f"Original vmt sum via the column total vehicle miles traveled was : {individual_vmt}"
)
print(
f"Original vmt sum via the column trip miles sum was : {individual_trip_miles}"
)
print(
f"Charging summation from linprog_x directly is : {linprog_charge_results}"
)
print(f"optimization output sum: {np.sum(outputelectricload)}")
print(f"scaled output sum: {np.sum(scaled_output)}")
print(f"Vmt missed in optimization: {missed_vmt}")
print(f"Exit Flag 1: {flag_1_fail}")
print(f"Exit Flag 2: {flag_2_fail}")
print(f"Exit Flag 3: {flag_3_fail}")
print(f"Exit Flag 4: {flag_4_fail}")
print(
missed_vehicles[
[
"dwell_time",
"total vehicle miles traveled",
"dwell_location",
"vehicle_number",
"location_allowed",
"trip_allowed",
"dwell_allowed",
"charging_allowed",
"dwell_charging",
"required_charging",
"max_charging",
# ,"trip_number","Date","Day of Week",
# "If Weekend","Trip start time (HHMM)","Trip end time (HHMM)","Travel Minutes",
# "trip_miles","Vehicle miles traveled","why_from",
]
]
)
model_year_profile[trip_window_indices] += scaled_output
return model_year_profile, output_load_sum_list, newdata