Save/Load ForecastFlowML#
This guide shows how the ForecastFlowML can be saved and loaded to be used afterwards.
Import packages#
from forecastflowml import ForecastFlowML
from forecastflowml import FeatureExtractor
from forecastflowml.data.loader import load_walmart_m5
from lightgbm import LGBMRegressor
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pickle
import sys
import os
os.environ["PYSPARK_PYTHON"] = sys.executable
Initialize Spark#
spark = (
SparkSession.builder.master("local[4]")
.config("spark.driver.memory", "4g")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.sql.execution.pyarrow.enabled", "true")
.getOrCreate()
)
Sample Dataset#
df = load_walmart_m5(spark)
df.show(10)
+--------------------+-----------+-------+------+--------+--------+----------+-----+
| id| item_id|dept_id|cat_id|store_id|state_id| date|sales|
+--------------------+-----------+-------+------+--------+--------+----------+-----+
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-15| 3.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-16| 0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-17| 1.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-18| 0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-19| 0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-20| 0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-21| 0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-22| 0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-23| 0.0|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-24| 0.0|
+--------------------+-----------+-------+------+--------+--------+----------+-----+
only showing top 10 rows
Feature Engineering#
feature_extractor = FeatureExtractor(
id_col="id",
date_col="date",
target_col="sales",
lag_window_features={
"lag": [7 * (i + 1) for i in range(4)],
},
)
df_features = feature_extractor.transform(df).localCheckpoint()
df_features.show(10)
+--------------------+-----------+-------+------+--------+--------+----------+-----+-----+------+------+------+
| id| item_id|dept_id|cat_id|store_id|state_id| date|sales|lag_7|lag_14|lag_21|lag_28|
+--------------------+-----------+-------+------+--------+--------+----------+-----+-----+------+------+------+
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-15| 3.0| null| null| null| null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-16| 0.0| null| null| null| null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-17| 1.0| null| null| null| null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-18| 0.0| null| null| null| null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-19| 0.0| null| null| null| null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-20| 0.0| null| null| null| null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-21| 0.0| null| null| null| null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-22| 0.0| 3.0| null| null| null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-23| 0.0| 0.0| null| null| null|
|FOODS_1_002_TX_1_...|FOODS_1_002|FOODS_1| FOODS| TX_1| TX|2015-01-24| 0.0| 1.0| null| null| null|
+--------------------+-----------+-------+------+--------+--------+----------+-----+-----+------+------+------+
only showing top 10 rows
Train/Test Dataset#
df_train = df_features.filter(F.col("date") < "2016-04-25")
df_test = df_features.filter(F.col("date") >= "2016-04-25")
Initialize Model#
forecast_flow = ForecastFlowML(
group_col="store_id",
id_col="id",
date_col="date",
target_col="sales",
date_frequency="days",
model_horizon=7,
max_forecast_horizon=28,
model=LGBMRegressor(),
)
Distributed Results#
Save#
forecast_flow.train(df_train).write.parquet("trained_models.parquet")
with open("forecast_flow.pickle", "wb") as f:
pickle.dump(forecast_flow, f)
Load#
trained_models = spark.read.parquet("trained_models.parquet")
with open("forecast_flow.pickle", "rb") as f:
forecast_flow = pickle.load(f)
forecast_flow.predict(df_test, trained_models).show(10)
+--------+--------------------+-------------------+----------+
|store_id| id| date|prediction|
+--------+--------------------+-------------------+----------+
| CA_1|FOODS_1_064_CA_1_...|2016-04-25 00:00:00| 0.5733835|
| CA_1|FOODS_1_064_CA_1_...|2016-04-26 00:00:00| 0.5733835|
| CA_1|FOODS_1_064_CA_1_...|2016-04-27 00:00:00| 0.5733835|
| CA_1|FOODS_1_064_CA_1_...|2016-04-28 00:00:00| 1.0461307|
| CA_1|FOODS_1_064_CA_1_...|2016-04-29 00:00:00| 1.69175|
| CA_1|FOODS_1_064_CA_1_...|2016-04-30 00:00:00| 2.5920947|
| CA_1|FOODS_1_064_CA_1_...|2016-05-01 00:00:00| 1.0461307|
| CA_1|FOODS_1_121_CA_1_...|2016-04-25 00:00:00| 1.0461307|
| CA_1|FOODS_1_121_CA_1_...|2016-04-26 00:00:00| 1.0461307|
| CA_1|FOODS_1_121_CA_1_...|2016-04-27 00:00:00| 1.0461307|
+--------+--------------------+-------------------+----------+
only showing top 10 rows
Local Results#
Save#
forecast_flow.train(df_train, local_result=True)
with open("forecast_flow.pickle", "wb") as f:
pickle.dump(forecast_flow, f)
Load#
with open("forecast_flow.pickle", "rb") as f:
forecast_flow = pickle.load(f)
forecast_flow.predict(df_test, spark=spark).show(10)
+--------+--------------------+-------------------+----------+
|store_id| id| date|prediction|
+--------+--------------------+-------------------+----------+
| CA_1|FOODS_1_064_CA_1_...|2016-04-25 00:00:00| 0.5733835|
| CA_1|FOODS_1_064_CA_1_...|2016-04-26 00:00:00| 0.5733835|
| CA_1|FOODS_1_064_CA_1_...|2016-04-27 00:00:00| 0.5733835|
| CA_1|FOODS_1_064_CA_1_...|2016-04-28 00:00:00| 1.0461307|
| CA_1|FOODS_1_064_CA_1_...|2016-04-29 00:00:00| 1.69175|
| CA_1|FOODS_1_064_CA_1_...|2016-04-30 00:00:00| 2.5920947|
| CA_1|FOODS_1_064_CA_1_...|2016-05-01 00:00:00| 1.0461307|
| CA_1|FOODS_1_121_CA_1_...|2016-04-25 00:00:00| 1.0461307|
| CA_1|FOODS_1_121_CA_1_...|2016-04-26 00:00:00| 1.0461307|
| CA_1|FOODS_1_121_CA_1_...|2016-04-27 00:00:00| 1.0461307|
+--------+--------------------+-------------------+----------+
only showing top 10 rows