import pickle
import datetime
import sklearn
import pyspark
import pandas as pd
import pyspark.sql.functions as F
from forecastflowml.model_selection import (
_cross_val_predict,
_score_func,
_TimeBasedSplit,
)
from forecastflowml.direct_forecaster import _DirectForecaster
from typing import List, Optional, Union, Dict
[docs]class ForecastFlowML:
"""Create forecaster Instance
Parameters
----------
id_col
Time series identifer column.
group_col
Column to partition the dataframe.
date_col
Date column.
target_col
Target column.
date_frequency
Date frequency of the dataframe.
model_horizon
Forecast horizon for a single model.
max_forecast_horizon
Maximum horizon to generate the forecast. Needs to be multiple of
``model_horizon``.
model
Regressor compatible with ``scikit-learn`` API.
categorical_cols
List of columns to treat as categorical.
use_lag_range
Extra lag range to use in addition to allowed lag values.
"""
def __init__(
self,
id_col: str,
group_col: str,
date_col: str,
target_col: str,
date_frequency: str,
max_forecast_horizon: int,
model_horizon: int,
model: sklearn.base.BaseEstimator,
categorical_cols: Optional[List[str]] = None,
use_lag_range: int = 0,
) -> None:
self.id_col = id_col
self.group_col = group_col
self.date_col = date_col
self.target_col = target_col
self.categorical_cols = categorical_cols
self.date_frequency = date_frequency
self.model = model
self.max_forecast_horizon = max_forecast_horizon
self.model_horizon = model_horizon
self.use_lag_range = use_lag_range
@property
def model_(self) -> pd.DataFrame:
"""Trained models in pickled format"""
return self._model_
@model_.setter
def model_(self, value: pd.DataFrame) -> None:
"""Set models attribute after training
Parameters
----------
value
pandas DataFrame containing trained models
"""
self._model_ = value
def _check_input_type(self, df: pyspark.sql.DataFrame) -> None:
if not isinstance(df, pyspark.sql.dataframe.DataFrame):
raise NotImplementedError(
"Input is expected to be pyspark.sql.dataframe.DataFrame"
)
else:
pass
def _check_fitted(
self,
trained_models: pyspark.sql.DataFrame,
spark: pyspark.sql.SparkSession,
) -> None:
if (not hasattr(self, "model_")) & (trained_models is None):
raise ValueError(
"fit method should be called before predict or trained_models needs to be supplied"
)
if (trained_models is None) & (hasattr(self, "model_")) & ((spark is None)):
raise ValueError(
"spark instance must be supplied in case of local_result was set to True during training"
)
[docs] def get_feature_importance(
self,
df_model: Optional[pyspark.sql.DataFrame] = None,
) -> pd.DataFrame:
"""The feature importances.
Parameters
----------
df_model
pyspark DataFrame that contains the trained models. Only needs to be
supplied if ``local_result`` is set to ``False`` during training.
Returns
-------
DataFrame that includes the feature importances.
"""
group_col = self.group_col
def _feature_importance_udf(df):
group = df[group_col].iloc[0]
importance_list = []
for i in range(len(df["model"].iloc[0])):
model = pickle.loads(df["model"].iloc[0][i])
forecast_horizon = df["forecast_horizon"].iloc[0][i]
importance = pd.DataFrame(
zip(
[forecast_horizon] * model.n_features_,
model.feature_name_,
model.feature_importances_,
),
columns=["forecast_horizon", "feature", "importance"],
)
importance_list.append(importance)
df_importance = pd.concat(importance_list)
df_importance.insert(0, group_col, group)
return df_importance
if df_model is not None:
schema = (
f"{group_col}:string, forecast_horizon:array<int>, "
"feature:string, importance:float"
)
return (
df_model.groupby(group_col)
.applyInPandas(_feature_importance_udf, schema=schema)
.toPandas()
)
else:
return (
self.model_.groupby(group_col, group_keys=False)
.apply(_feature_importance_udf)
.reset_index(drop=True)
)
[docs] def train(
self,
df: pyspark.sql.DataFrame,
local_result: bool = False,
) -> Union[None, pd.DataFrame]:
"""Train models
Parameters
----------
df
Dataset to fit.
local_result
Whether to store trained models as attribute. Only provide ``True``
in case of the trained models are not expected to overload the driver node.
Returns
-------
None if ``local_result=True``. Otherwise, pyspark DataFrame that includes the trained models.
"""
id_col = self.id_col
date_col = self.date_col
date_frequency = self.date_frequency
categorical_cols = self.categorical_cols
model_horizon = self.model_horizon
group_col = self.group_col
target_col = self.target_col
max_forecast_horizon = self.max_forecast_horizon
use_lag_range = self.use_lag_range
model = self.model
self._check_input_type(df)
def _train_udf(df):
start = datetime.datetime.now()
forecaster = _DirectForecaster(
id_col=id_col,
group_col=group_col,
date_col=date_col,
date_frequency=date_frequency,
target_col=target_col,
categorical_cols=categorical_cols,
model=model,
model_horizon=model_horizon,
max_forecast_horizon=max_forecast_horizon,
use_lag_range=use_lag_range,
)
forecaster.fit(df)
end = datetime.datetime.now()
elapsed = end - start
seconds = round(elapsed.total_seconds(), 1)
return pd.DataFrame(
[
{
group_col: df[group_col].iloc[0],
"forecast_horizon": [list(x) for x in forecaster.model_.keys()],
"model": [pickle.dumps(x) for x in forecaster.model_.values()],
"start_time": start.strftime("%d-%b-%Y (%H:%M:%S)"),
"end_time": end.strftime("%d-%b-%Y (%H:%M:%S)"),
"elapsed_seconds": seconds,
},
]
)
schema = (
f"{group_col}:string, forecast_horizon:array<array<int>>, model:array<binary>,"
"start_time:string, end_time:string, elapsed_seconds:float"
)
model_ = (
df.withColumn(date_col, F.to_timestamp(date_col))
.groupby(group_col)
.applyInPandas(_train_udf, schema=schema)
)
if local_result:
self.model_ = model_.toPandas()
else:
return model_
[docs] def cross_validate(
self,
df: pyspark.sql.DataFrame,
n_cv_splits: int = 3,
max_train_size: Optional[int] = None,
cv_step_length: Optional[int] = None,
refit: bool = True,
) -> pyspark.sql.DataFrame:
"""Time series cross validation predictions
Parameters
----------
df
Dataset to fit.
n_cv_splits
Number of cross validation folds.
max_train_size
Number of max periods to use as training set.
cv_step_length
Number of periods to put between each cv folds.
refit
Whether to refit model for each training dataset.
Returns
-------
DataFrame that contains target and predictions over cross validation folds.
"""
id_col = self.id_col
target_col = self.target_col
categorical_cols = self.categorical_cols
model_horizon = self.model_horizon
date_col = self.date_col
date_frequency = self.date_frequency
max_forecast_horizon = self.max_forecast_horizon
group_col = self.group_col
use_lag_range = self.use_lag_range
model = self.model
cv_step_length = (
max_forecast_horizon if cv_step_length is None else cv_step_length
)
self._check_input_type(df)
def _cross_validate_udf(df):
forecaster = _DirectForecaster(
id_col=id_col,
group_col=group_col,
date_col=date_col,
date_frequency=date_frequency,
target_col=target_col,
categorical_cols=categorical_cols,
model=model,
model_horizon=model_horizon,
max_forecast_horizon=max_forecast_horizon,
use_lag_range=use_lag_range,
)
cv = _TimeBasedSplit(
date_col=date_col,
date_frequency=date_frequency,
n_splits=int(n_cv_splits),
forecast_horizon=list(range(1, max_forecast_horizon + 1)),
step_length=int(cv_step_length),
max_train_size=max_train_size,
).split(df)
cv_predictions = _cross_val_predict(
forecaster=forecaster,
df=df,
cv=cv,
refit=refit,
)
return cv_predictions
schema = (
f"{group_col}:string, {id_col}:string, {date_col}:timestamp, cv:string,"
f"{target_col}:float, prediction:float"
)
cv_result = (
df.withColumn(date_col, F.to_timestamp(date_col))
.groupby(group_col)
.applyInPandas(_cross_validate_udf, schema=schema)
)
return cv_result
[docs] def grid_search(
self,
df: pyspark.sql.DataFrame,
param_grid: Dict[str, List[Union[str, float, int]]],
n_cv_splits: int = 3,
max_train_size: Optional[int] = None,
cv_step_length: Optional[int] = None,
scoring_metric: str = "neg_mean_squared_error",
refit: bool = True,
) -> pyspark.sql.DataFrame:
"""Grid search with time series cross validation.
Parameters
----------
df
Dataset to fit.
param_grid
Dictionary with parameters as keys and lists of parameter settings
to try as values.
n_cv_splits
Number of cross validation folds.
max_train_size
Number of max periods to use as training set.
cv_step_length
Number of periods to put between each cv folds.
scoring_metric
``scikit-learn`` scoring metric.
See list of available metrics: https://scikit-learn.org/stable/modules/model_evaluation.html.
refit
Whether to refit model for each training dataset.
Returns
-------
DataFrame that includes score per parameter combination.
"""
group_col = self.group_col
id_col = self.id_col
model = self.model
target_col = self.target_col
date_col = self.date_col
date_frequency = self.date_frequency
categorical_cols = self.categorical_cols
model_horizon = self.model_horizon
use_lag_range = self.use_lag_range
max_forecast_horizon = self.max_forecast_horizon
cv_step_length = (
max_forecast_horizon if cv_step_length is None else cv_step_length
)
self._check_input_type(df)
def _grid_search_udf(df):
hyperparams = {param: df[param].iloc[0] for param in param_grid.keys()}
try_model = model.set_params(**hyperparams)
forecaster = _DirectForecaster(
id_col=id_col,
group_col=group_col,
date_col=date_col,
date_frequency=date_frequency,
target_col=target_col,
categorical_cols=categorical_cols,
model=try_model,
model_horizon=model_horizon,
max_forecast_horizon=max_forecast_horizon,
use_lag_range=use_lag_range,
)
cv = _TimeBasedSplit(
date_col=date_col,
date_frequency=date_frequency,
n_splits=int(n_cv_splits),
forecast_horizon=list(range(1, max_forecast_horizon + 1)),
step_length=int(cv_step_length),
max_train_size=max_train_size,
).split(df)
cv_predictions = _cross_val_predict(
forecaster=forecaster,
df=df,
cv=cv,
refit=refit,
)
score = (
cv_predictions.groupby("cv")
.apply(
lambda x: _score_func(
x[target_col], x["prediction"], scoring_metric
)
)
.mean()
)
return pd.DataFrame(
[
{
**{
group_col: df[group_col].iloc[0],
scoring_metric: score,
},
**hyperparams,
}
]
)
for key in param_grid.keys():
values = param_grid[key]
column = F.explode(F.array([F.lit(v) for v in values]))
df = df.withColumn(key, column)
schema = f"{group_col}:string, {scoring_metric}:float, " + ", ".join(
[f"{key} {type(value[0]).__name__}" for key, value in param_grid.items()]
)
result = (
df.withColumn(date_col, F.to_timestamp(date_col))
.groupby([group_col, *param_grid.keys()])
.applyInPandas(_grid_search_udf, schema=schema)
)
return result.orderBy(group_col, scoring_metric, ascending=False).toPandas()
def _serialize(self, df):
group_col = self.group_col
def _serialize_udf(df):
return pd.DataFrame(
[
{
group_col: df[group_col].iloc[0],
"data": pickle.dumps(df),
}
]
)
schema = f"{group_col}:string, data:binary"
return df.groupby(group_col).applyInPandas(_serialize_udf, schema=schema)
def _predict_grid(self, df, trained_models):
df = self._serialize(df)
df = df.join(
trained_models.select(self.group_col, "forecast_horizon", "model"),
on=self.group_col,
how="left",
)
return df
[docs] def predict(
self,
df: pyspark.sql.DataFrame,
trained_models=None,
spark=None,
) -> pyspark.sql.DataFrame:
"""Make predictions
Parameters
----------
df
Dataset to perform predictions on.
trained_models
pyspark DataFrame that contains the trained models.
Does not need to be provided in case ``local_result``
is set to ``True`` during training.
spark
Spark session instance. Only provide when ``local_result=True`` during training.
Returns
-------
DataFrame that contains predictions per time series.
"""
id_col = self.id_col
group_col = self.group_col
date_col = self.date_col
date_frequency = self.date_frequency
target_col = self.target_col
categorical_cols = self.categorical_cols
model = self.model
model_horizon = self.model_horizon
max_forecast_horizon = self.max_forecast_horizon
use_lag_range = self.use_lag_range
self._check_input_type(df)
self._check_fitted(trained_models, spark)
def _predict_udf(df):
data = pickle.loads(df["data"].iloc[0])
forecast_horizon_list = list(map(tuple, df["forecast_horizon"].iloc[0]))
model_list = [pickle.loads(m) for m in df["model"].iloc[0]]
model_ = {fh: model for fh, model in zip(forecast_horizon_list, model_list)}
forecaster = _DirectForecaster(
id_col=id_col,
group_col=group_col,
date_col=date_col,
date_frequency=date_frequency,
target_col=target_col,
categorical_cols=categorical_cols,
model=model,
model_horizon=model_horizon,
max_forecast_horizon=max_forecast_horizon,
use_lag_range=use_lag_range,
)
forecaster.model_ = model_
prediction = forecaster.predict(data)
return prediction
trained_models = (
spark.createDataFrame(self.model_)
if (trained_models is None)
else trained_models
)
df = df.withColumn(date_col, F.to_timestamp(date_col))
df = self._predict_grid(df, trained_models)
schema = f"{group_col}:string, {id_col}:string, {date_col}:timestamp, prediction:float"
predictions = df.groupby(group_col).applyInPandas(_predict_udf, schema=schema)
return predictions