Source code for forecastflowml.preprocessing
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
import pandas as pd
from forecastflowml.utils import _check_input_type, _check_spark
from typing import Dict, List, Optional, Union
def _history_length(df, id_col, date_col):
w = Window.partitionBy(id_col).orderBy(date_col)
df = df.withColumn("history_length", F.row_number().over(w))
return df
def _lag_window_summarizer(df, id_col, target_col, date_col, features):
w1 = Window.partitionBy(id_col).orderBy(date_col)
for key, values in features.items():
if key == "lag":
for lag in values:
df = df.withColumn(f"lag_{lag}", F.lag(target_col, lag).over(w1))
else:
for window, lag in values:
w2 = w1.rowsBetween(-(lag + window - 1), -lag)
df = df.withColumn(
f"window_{window}_lag_{lag}_{key}",
F.expr(f"{key}({target_col})").over(w2),
)
return df
def _count_consecutive_values(df, id_col, value_col, date_col, value, lags):
w1 = (
Window.partitionBy(id_col)
.orderBy(date_col)
.rowsBetween(Window.unboundedPreceding, 0)
)
w2 = (
Window.partitionBy(id_col, "value_group")
.orderBy(date_col)
.rowsBetween(Window.unboundedPreceding, 0)
)
df = (
df.withColumn("mask", F.when(F.col(value_col) == value, 1).otherwise(0))
.withColumn("value_group", F.sum(1 - F.col("mask")).over(w1))
.withColumn("count", F.sum("mask").over(w2))
)
w3 = Window.partitionBy(id_col).orderBy(date_col)
for lag in lags:
output_col = f"count_consecutive_value_lag_{lag}"
df = df.withColumn(output_col, F.lag("count", lag).over(w3))
df = df.drop("mask", "value_group", "count")
return df
def _date_features(df, date_col, features):
supported_features = [
"day_of_week",
"day_of_year",
"day_of_month",
"week_of_year",
"week_of_month",
"weekend",
"month",
"quarter",
"year",
]
not_supported = set(features) - set(supported_features)
if len(not_supported) > 0:
raise ValueError(f"{', '.join(not_supported)} feature(s) not supported.")
for feature in features:
if feature == "day_of_week":
df = df.withColumn(feature, F.dayofweek(F.col(date_col)).cast("tinyint"))
if feature == "day_of_year":
df = df.withColumn(feature, F.dayofyear(F.col(date_col)).cast("smallint"))
if feature == "day_of_month":
df = df.withColumn(feature, F.dayofmonth(F.col(date_col)).cast("tinyint"))
if feature == "week_of_year":
df = df.withColumn(feature, F.weekofyear(F.col(date_col)).cast("tinyint"))
if feature == "week_of_month":
df = df.withColumn(
feature, F.ceil(F.dayofmonth(F.col(date_col)) / 7).cast("tinyint")
)
if feature == "weekend":
df = df.withColumn(
feature,
F.when(F.dayofweek(F.col(date_col)).isin([1, 7]), 1)
.otherwise(0)
.cast("tinyint"),
)
if feature == "month":
df = df.withColumn("month", F.month(F.col(date_col)).cast("tinyint"))
if feature == "quarter":
df = df.withColumn("quarter", F.quarter(F.col(date_col)).cast("tinyint"))
if feature == "year":
df = df.withColumn("year", F.year(F.col(date_col)).cast("smallint"))
return df
[docs]class FeatureExtractor:
"""Extract features from time series
Parameters
----------
id_col
Id column name.
date_col
Date column name.
target_col
Target column name.
lag_window_features
Dictionary that contains different types of functions as keys and
their corresponding lag-window arguments as values.
The lag argument specifies how many units in the past the window should start,
while the window specifies the size of the window to apply the function across.
- For the lag function, only list of integers needs to be provided.
- For all other functions, list of lists such that [[window, lag]] needs to be provided.
========= ====================================================================
function example
========= ====================================================================
lag {"lag": [1, 2, 3, 4]}
mean {"mean": [[window, lag] for lag in [1, 2, 3] for window in [7, 14]]}
stddev {"stddev": [window, lag] for lag in [1, 2, 3] for window in [7, 14]}
========= ====================================================================
The logic of the code is represented visually using symbols:
- o: denotes the time stamp for which the window is summarized to
- x: represents other time stamps within the window being summarized.
- -: is used to denote observations, past or future, that are not part of the window.
==== ====== ============================
lag window calculation
==== ====== ============================
1 3 [- - - - - * * * o - - - -]
2 3 [- - - - * * * - o - - - -]
1 5 [- - - * * * * * o - - - -]
==== ====== ============================
Keys needs to be a native pyspark functions.
date_features
Date features to extract: day_of_week, day_of_year, day_of_month,
week_of_year, week_of_month, weekend, month, quarter, year.
count_consecutive_values
Counts consecutive apperance of spesific value. Needs to be a dictionary that
contains value for counting, and lags for how many units in the past the counting should start,
- Example: count_consecutive_values={"value": 0, "lags": [7, 14, 21, 28]}
history_length
Whether to count number of time periods after the start of time series.
"""
def __init__(
self,
id_col: str,
date_col: str,
target_col: str,
lag_window_features: Optional[Dict[str, List[Union[int, List[int]]]]] = None,
date_features: List[str] = None,
count_consecutive_values: Optional[
Dict[str, List[Union[int, List[int]]]]
] = None,
history_length: bool = False,
):
self.id_col = id_col
self.date_col = date_col
self.target_col = target_col
self.lag_window_features = lag_window_features
self.date_features = date_features
self.count_consecutive_values = count_consecutive_values
self.history_length = history_length
[docs] def transform(
self,
df: Union[pd.DataFrame, pyspark.sql.DataFrame],
spark: Optional[SparkSession] = None,
) -> Union[pd.DataFrame, pyspark.sql.DataFrame]:
"""Extract features
Parameters
----------
df
DataFrame to extract features.
spark
Spark session instance. Only provide when ``df`` is a pandas DataFrame.
Returns
-------
DataFrame with features added.
"""
input_type = _check_input_type(df)
_check_spark(df, input_type, spark)
df = spark.createDataFrame(df) if input_type == "df_pandas" else df
if self.lag_window_features is not None:
df = _lag_window_summarizer(
df,
self.id_col,
self.target_col,
self.date_col,
self.lag_window_features,
)
if self.count_consecutive_values is not None:
df = _count_consecutive_values(
df,
self.id_col,
self.target_col,
self.date_col,
self.count_consecutive_values["value"],
self.count_consecutive_values["lags"],
)
if self.history_length:
df = _history_length(df, self.id_col, self.date_col)
if self.date_features is not None:
df = _date_features(df, self.date_col, self.date_features)
if input_type == "df_pandas":
df = df.toPandas()
df[self.date_col] = pd.to_datetime(df[self.date_col])
return df