import re
import pandas as pd
import pytz
from powersimdata.input.check import (
    _check_date_range_in_time_series,
    _check_time_series,
)
[docs]def slice_time_series(ts, start, end, between_time=None, dayofweek=None):
    """Slice a time series.
    :param pandas.DataFrame/pandas.Series ts: time series to slice.
    :param pandas.Timestamp/numpy.datetime64/datetime.datetime start: start date.
    :param pandas.Timestamp/numpy.datetime64/datetime.datetime end: end date.
    :param list between_time: specify the start hour and end hour of each day
        inclusively, default to None, which includes every hour of a day. Note that if
        the end hour is set before the start hour, the complementary hours of a day are
        picked.
    :param set dayofweek: specify the interest days of week, which is a subset of
        integers in [0, 6] with 0 being Monday and 6 being Sunday, default to None,
        which includes every day of a week.
    :return: (*pandas.DataFrame/pandas.Series*) -- the sliced time series.
    :raises TypeError:
        if between_time is provided but not a list and/or
        if not all elements of between_time are strings and/or
        if dayofweek is provided but not a set.
    :raises ValueError:
        if between_time is provided but does not have exactly two elements and/or
        if not all elements of between_time are in 24 hour format and/or
        if dayofweek is provided but not a subset of integers in [0, 6].
    """
    _check_date_range_in_time_series(ts, start, end)
    ts = ts[start:end]
    if between_time is not None and not isinstance(between_time, list):
        raise TypeError("between_time must be a list")
    if between_time:
        if len(between_time) != 2:
            raise ValueError("between_time must be a list with start_time and end_time")
        if not all([isinstance(t, str) for t in between_time]):
            raise TypeError("every element of between_time must be a string")
        if not all([is_24_hour_format(t) for t in between_time]):
            raise ValueError("every element of between_time must be in 24 hour format")
        ts = ts.between_time(*between_time)
    if dayofweek is not None and not isinstance(dayofweek, set):
        raise TypeError("dayofweek must be a set")
    if dayofweek:
        if not dayofweek.issubset(set(range(7))):
            raise ValueError(f"dayofweek must be a subset of {set(range(7))}")
        ts = ts[ts.index.dayofweek.isin(dayofweek)]
    return ts 
[docs]def resample_time_series(ts, freq, agg="sum"):
    """Resample a time series.
    :param pandas.DataFrame/pandas.Series ts: time series to resample.
    :param str freq: frequency. Either *'D'* (day), *'W'* (week), *'M'* (month).
    :param str agg: aggregation method. Either *'sum'* or *'mean'*.
    :return: (*pandas.DataFrame/pandas.Series*) -- the resampled time series.
    :raises ValueError: if freq is not one of *'D'*, *'W'*, *'M'* or agg is not one of
        *'sum'* or *'mean'* or ts is time zone aware with DST.
    .. note::
        When resampling:
        * the left side of the bin interval is closed.
        * the left bin edge is used to label the interval.
        * intervals start at midnight when freq is *'D'*.
        * intervals start on Sunday when freq is *'W'*.
        * incomplete days, weeks and months are clipped when agg is *'sum'*.
        * incomplete days, weeks and months are calculated using available data
          samples when agg is *'mean'*.
    """
    _check_time_series(ts, "time series")
    if is_dst(ts):
        raise ValueError(
            "DST is not supported. Use ETC/GMT+x or ETC/GMT-x where x is the offset"
        )
    if freq not in ["D", "W", "M"]:
        raise ValueError("frequency must be one of 'D', 'W', 'M'")
    if agg not in ["sum", "mean"]:
        raise ValueError("aggregation method must 'sum' or 'mean'")
    if agg == "sum":
        print("clip incomplete %s" % {"D": "days", "W": "weeks", "M": "months"}[freq])
    if freq == "D":
        if agg == "sum":
            return ts.resample("D").sum(min_count=24).dropna()
        else:
            return ts.resample("D").mean()
    elif freq == "W":
        if agg == "sum":
            return (
                ts.resample("W", label="left", closed="left")
                .sum(min_count=7 * 24)
                .dropna()
            )
        else:
            return ts.resample("W", label="left", closed="left").mean()
    elif freq == "M":
        if agg == "sum":
            # coerce Series to DataFrame as necessary, grab first column, count entries by month
            count = pd.DataFrame(ts).iloc[:, 0].resample("MS").count().to_dict()
            keep = [k for k, v in count.items() if k.days_in_month * 24 == v]
            return ts.resample("MS").sum().filter(items=keep, axis=0)
        else:
            return ts.resample("MS").mean() 
[docs]def change_time_zone(ts, tz):
    """Convert hourly time series to new time zone. UTC is assumed if no time zone is
    assigned to the input time series.
    :param pandas.DataFrame/pands.Series ts: time series.
    :param str tz: new time zone.
    :return: (*pandas.DataFrame/pandas.Series*) -- time series with new time zone.
    :raises TypeError: if tz is not a str.
    :raises ValueError: if tz is invalid or the time series has already been resampled.
    """
    _check_time_series(ts, "time series")
    if pd.infer_freq(ts.index) != "H":
        raise ValueError("frequency of time series must be 1h")
    if not isinstance(tz, str):
        raise TypeError("time zone must be a str")
    try:
        pytz.timezone(tz)
    except pytz.exceptions.UnknownTimeZoneError:
        raise ValueError("Unknown time zone %s" % tz)
    ts.index.name = tz
    if ts.index.tz is None:
        return ts.tz_localize("UTC").tz_convert(tz)
    else:
        return ts.tz_convert(tz) 
[docs]def is_dst(ts):
    """Flag Daylight Saving Time (DST) in a time series.
    :param pandas.DataFrame/pands.Series ts: time series.
    :return: (*bool*) -- True if time zone observes DST.
    """
    if ts.index.tz is None:
        return False
    else:
        return ts.index.map(lambda x: x.dst().total_seconds() != 0).any()