Save/Load ForecastFlowML#

This guide shows how the ForecastFlowML can be saved and loaded to be used afterwards.

Import packages#

from forecastflowml import ForecastFlowML
from forecastflowml import FeatureExtractor
from forecastflowml.data.loader import load_walmart_m5
from lightgbm import LGBMRegressor
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pickle
import sys
import os

os.environ["PYSPARK_PYTHON"] = sys.executable

Initialize Spark#

spark = (
    SparkSession.builder.master("local[4]")
    .config("spark.driver.memory", "4g")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.sql.execution.pyarrow.enabled", "true")
    .getOrCreate()
)

Sample Dataset#

df = load_walmart_m5(spark)
df.show(10)
+--------------------+-----------+-------+------+--------+--------+----------+-----+
|                  id|    item_id|dept_id|cat_id|store_id|state_id|      date|sales|
+--------------------+-----------+-------+------+--------+--------+----------+-----+
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-15|  3.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-16|  0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-17|  1.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-18|  0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-19|  0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-20|  0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-21|  0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-22|  0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-23|  0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-24|  0.0|
+--------------------+-----------+-------+------+--------+--------+----------+-----+
only showing top 10 rows

Feature Engineering#

feature_extractor = FeatureExtractor(
    id_col="id",
    date_col="date",
    target_col="sales",
    lag_window_features={
        "lag": [7 * (i + 1) for i in range(4)],
    },
)
df_features = feature_extractor.transform(df).localCheckpoint()
df_features.show(10)
+--------------------+-----------+-------+------+--------+--------+----------+-----+-----+------+------+------+
|                  id|    item_id|dept_id|cat_id|store_id|state_id|      date|sales|lag_7|lag_14|lag_21|lag_28|
+--------------------+-----------+-------+------+--------+--------+----------+-----+-----+------+------+------+
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-15|  3.0| null|  null|  null|  null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-16|  0.0| null|  null|  null|  null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-17|  1.0| null|  null|  null|  null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-18|  0.0| null|  null|  null|  null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-19|  0.0| null|  null|  null|  null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-20|  0.0| null|  null|  null|  null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-21|  0.0| null|  null|  null|  null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-22|  0.0|  3.0|  null|  null|  null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-23|  0.0|  0.0|  null|  null|  null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS|    TX_1|      TX|2015-01-24|  0.0|  1.0|  null|  null|  null|
+--------------------+-----------+-------+------+--------+--------+----------+-----+-----+------+------+------+
only showing top 10 rows

Train/Test Dataset#

df_train = df_features.filter(F.col("date") < "2016-04-25")
df_test = df_features.filter(F.col("date") >= "2016-04-25")

Initialize Model#

forecast_flow = ForecastFlowML(
    group_col="store_id",
    id_col="id",
    date_col="date",
    target_col="sales",
    date_frequency="days",
    model_horizon=7,
    max_forecast_horizon=28,
    model=LGBMRegressor(),
)

Distributed Results#

Save#

forecast_flow.train(df_train).write.parquet("trained_models.parquet")
with open("forecast_flow.pickle", "wb") as f:
    pickle.dump(forecast_flow, f)

Load#

trained_models = spark.read.parquet("trained_models.parquet")
with open("forecast_flow.pickle", "rb") as f:
    forecast_flow = pickle.load(f)
forecast_flow.predict(df_test, trained_models).show(10)
+--------+--------------------+-------------------+----------+
|store_id|                  id|               date|prediction|
+--------+--------------------+-------------------+----------+
|    CA_1|FOODS_1_064_CA_1_...|2016-04-25 00:00:00| 0.5733835|
|    CA_1|FOODS_1_064_CA_1_...|2016-04-26 00:00:00| 0.5733835|
|    CA_1|FOODS_1_064_CA_1_...|2016-04-27 00:00:00| 0.5733835|
|    CA_1|FOODS_1_064_CA_1_...|2016-04-28 00:00:00| 1.0461307|
|    CA_1|FOODS_1_064_CA_1_...|2016-04-29 00:00:00|   1.69175|
|    CA_1|FOODS_1_064_CA_1_...|2016-04-30 00:00:00| 2.5920947|
|    CA_1|FOODS_1_064_CA_1_...|2016-05-01 00:00:00| 1.0461307|
|    CA_1|FOODS_1_121_CA_1_...|2016-04-25 00:00:00| 1.0461307|
|    CA_1|FOODS_1_121_CA_1_...|2016-04-26 00:00:00| 1.0461307|
|    CA_1|FOODS_1_121_CA_1_...|2016-04-27 00:00:00| 1.0461307|
+--------+--------------------+-------------------+----------+
only showing top 10 rows

Local Results#

Save#

forecast_flow.train(df_train, local_result=True)
with open("forecast_flow.pickle", "wb") as f:
    pickle.dump(forecast_flow, f)

Load#

with open("forecast_flow.pickle", "rb") as f:
    forecast_flow = pickle.load(f)
forecast_flow.predict(df_test, spark=spark).show(10)
+--------+--------------------+-------------------+----------+
|store_id|                  id|               date|prediction|
+--------+--------------------+-------------------+----------+
|    CA_1|FOODS_1_064_CA_1_...|2016-04-25 00:00:00| 0.5733835|
|    CA_1|FOODS_1_064_CA_1_...|2016-04-26 00:00:00| 0.5733835|
|    CA_1|FOODS_1_064_CA_1_...|2016-04-27 00:00:00| 0.5733835|
|    CA_1|FOODS_1_064_CA_1_...|2016-04-28 00:00:00| 1.0461307|
|    CA_1|FOODS_1_064_CA_1_...|2016-04-29 00:00:00|   1.69175|
|    CA_1|FOODS_1_064_CA_1_...|2016-04-30 00:00:00| 2.5920947|
|    CA_1|FOODS_1_064_CA_1_...|2016-05-01 00:00:00| 1.0461307|
|    CA_1|FOODS_1_121_CA_1_...|2016-04-25 00:00:00| 1.0461307|
|    CA_1|FOODS_1_121_CA_1_...|2016-04-26 00:00:00| 1.0461307|
|    CA_1|FOODS_1_121_CA_1_...|2016-04-27 00:00:00| 1.0461307|
+--------+--------------------+-------------------+----------+
only showing top 10 rows