天天看點

PySpark-prophet預測1.導入庫和初始化設定2.資料預處理3.模組化4.讀取hive資料,調用spark進行prophet模型預測

大家好,又見面了,我是你們的朋友全棧君。

簡介

Prophet是facebook開源的時間序列預測工具,使用時間序列分解與機器學習拟合的方法進行模組化預測,關于prophet模型優點本文不再累述,網絡上的文章也比較多了,各種可視化,參數的解釋與demo示範,但是真正用到工業上大規模的可供學習的中文材料并不多。

本文打算使用PySpark進行多序列預測模組化,會給出一個比較詳細的腳本,供交流學習,重點在于使用hive資料/分布式,資料預處理,以及pandas_udf對多條序列進行循環執行。

tips:背景說明,在十萬級别的sku序列上使用prophet預測每個序列未來七天的銷售。

文章目錄

  • 1.導入庫和初始化設定
  • 2.資料預處理
  • 3.模組化
  • 4.讀取hive資料,調用spark進行prophet模型預測

1.導入庫和初始化設定

Pandas Udf 建構在 Apache Arrow 之上,是以具有低開銷,高性能的特點,udf對每條記錄都會操作一次,資料在 JVM 和 Python 中傳輸,pandas_udf就是使用 Java 和 Scala 中定義 UDF,然後在 python 中調用。

#導入庫
import datetime
from dateutil.relativedelta import relativedelta
from fbprophet import Prophet
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

#初始化
spark = SparkSession. \
    Builder(). \
    config("spark.sql.execution.arrow.enabled", "true"). \
    enableHiveSupport(). \
    getOrCreate()           

複制

其中初始化config:開啟spark df與pandas df 互相轉化的性能優化配置.

2.資料預處理

def sale_ds(df):
    df['ds'] = pd.to_datetime(df['ds'])
    df = df[['store_sku', 'ds', 'y']]
    # 控制長度,周期不用太長,關注最近的幾個完整周期即可
    start_day = (
            df['ds'].max() -
            relativedelta(
                days=63)).strftime('%Y-%m-%d')
    df = df[df['ds'] >= start_day][['store_sku', 'ds', 'y']]
    # 篩選條件:1 序列長度大于等于14,且過去最少有七天的銷售記錄;
    # 條件1,保障模型有兩個完整的周期資料;
    # 條件2,避免出現0,0,0,0,0,0,1,0,1這樣資料稀疏的資料出現
    sale_set = df.groupby(
        ['store_sku']).filter(
        lambda x: len(x) >= 14 and np.sum(
            x['y']) > 7)
    return sale_set


def replace_fill(data):
    """ 先嘗試使用上周的資料填補,再針對極端的資料進行cap,保障序列的完整和平滑性 :param data:單個序列 :param name: 序列名稱,store_sku :return: 修複後的一條序列 """
    data['ds'] = pd.to_datetime(data['ds'], format='%Y-%m-%d')
    data['y'] = data['y'].astype(float)
    data.loc[data['y'] <= 0, 'y'] = np.NaN
    data.loc[data['y'].isnull(), 'y'] = data['y'].shift(7).values[0]
    data.loc[data['y'].isnull(), 'y'] = data['y'].shift(-7).values[0]
    data.loc[data['y'].isnull(), 'y'] = data['y'].shift(-14).values[0]
    data.loc[data['y'].isnull(), 'y'] = data['y'].shift(14).values[0]
    data.loc[data['y'].isnull(), 'y'] = data['y'].interpolate(methon='nearest', order=3)
    low = data[data['y'] > 0]['y'].quantile(0.10)
    high = data[data['y'] > 0]['y'].quantile(0.90)
    data.loc[data['y'] < low, 'y'] = np.NaN
    data.loc[data['y'] > high, 'y'] = np.NaN
    data['y'] = data['y'].fillna(data['y'].mean())
    data['y'] = np.log1p(data['y'])
    return data           

複制

以上為資料預處理,具體内容見注釋.

放入模型中的時間和y值名稱必須是ds和y,首先控制資料的周期長度,如果預測天這種粒度的任務,則使用最近的4-6周即可。

因為是放入了長度不一的多個序列,為了讓預測更加可靠,對序列的長度有一定的限定,比如,序列長度至少有14天,還要一個需要注意的問題是,如果出現0,0,0,0,0,0,1,0,1這樣資料稀疏的資料的時候,prophet會報錯,報錯内容大緻為,std太低,反推回去就是放入的資料類似于常量,模型無法拟合。

至于缺失值的填充,prophet可以設定y為nan,模型在拟合過程中也會自動填充一個預測值,因為我們預測的為sku銷量,是具有星期這種周期性的,是以如果出現某一天的缺失,我們傾向于使用最近幾周同期資料進行填充,沒有優先使用均值或衆數進行填充,是因為,均值和衆數會掩蓋序列的周期性,破壞整個序列的規律,為了進一步對資料進行平滑,對于異常值還進行了分位數蓋帽,因為時序資料往往是偏态分布,是以我們對原始值做了取對數處理。

以上的資料預處理比較簡單,其中多數可以使用hive進行操作,會更加高效,這裡放出來的目的是示範一種思路以及python函數和最後的pandas_udf互動。

3.模組化

def prophet_train(data):
    model = Prophet(
        daily_seasonality=False,
        yearly_seasonality=False,
        holidays=holiday_df,
        holidays_prior_scale=10)
    model.add_seasonality(
        name='weekly',
        period=7,
        fourier_order=3,
        prior_scale=0.10)
    model.fit(data)
    future = model.make_future_dataframe(periods=7, freq='d')
    forecast = model.predict(future)
    forecast['pro_pred'] = np.expm1(forecast['yhat'])
    forecast_df=forecast[['store_sku','ds','pro_pred']]
    # 對預測值修正
    forecast_df.loc[forecast_df['pro_pred'] < 0, 'pro_pred'] = 0
    low = (1 + 0.1) * data['y'].min()
    hight = min((1 + 0.05) * data['y'].max(), 10000)
    forecast_df.loc[forecast_df['pro_pred'] < low, 'pro_pred'] = low
    forecast_df.loc[forecast_df['pro_pred'] > hight, 'pro_pred'] = hight
    return forecast_df           

複制

以上參數設定詳見https://zhuanlan.zhihu.com/p/52330017

函數内部的holiday_df是假日資料,資料格式需要按照文檔要求進行定義,改函數部分也會和整個代碼一起放在github,如果序列中最近呈現出較大的下滑或者增長,那麼預測值很容易得到負數或者非常大,這個時候我們依然需要對預測值進行修正,而非完全交給模型,當然你也可以在放入資料中設定上下限。

data['cap'] = 1000  #上限
data['floor'] = 6  #下限           

複制

該函數把前面的資料預處理函數和模型訓練函數放在一個函數中,類似于主函數,目的是使用統一的輸入和輸出。

def prophet_main(data):
    true_time = pd.datetime.now().strftime('%Y-%m-%d')
    data.dropna(inplace=True)
    data['ds'] = pd.to_datetime(data['ds'])
    data = data[data['ds'] < true_time]
    data['ds'] = data['ds'].astype(str)
    data['ds'] = pd.to_datetime(data['ds'])
    # 異常值替換
    data = replace_fill(data)
    pro_back = prophet_train(data)
    return pro_back           

複制

4.讀取hive資料,調用spark進行prophet模型預測

schema = StructType([
    StructField("store_sku", StringType()),
    StructField("ds", StringType()),
    StructField("pro_pred", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def run_model(data):
    data['store_sku']=data['store_sku'].astype(str)
    df = prophet_main(data)
    uuid = data['store_sku'].iloc[0]
    df['store_sku']=unid
    df['ds']=df['ds'].astype(str)
    df['pro_pred']=df['pro_pred'].astype(float)
    cols=['store_sku','ds','pro_pred']
    return df[cols]           

複制

假設我們希望輸出的結果為三列,分别是store_sku,ds,pro_pred,則定義它們的資料類型,定義的資料類型和順序要和放入的資料類型一緻,然後通過

@pandas_udf

進行裝飾,PandasUDFType有兩種類型一種是

Scalar

(标量映射),另一種是

Grouped Map

(分組映射).我們顯然是要使用分組映射,通過store_sku作為id進行分組,進而實作

split-apply-combine

以上是純python内容,下面展示通過hive資料庫讀取和運作python并把結果寫入hive中。

data = spark.sql(
    """ select concat(store_code,'_',goods_code) as store_sku,qty_fix as y,ds from scmtemp.redsku_store_sku_sale_fix_d""")
data.createOrReplaceTempView('data')
sale_predict = data.groupby(['store_sku']).apply(run_model)
sale_predict.createOrReplaceTempView('test_read_data')
# 儲存到資料庫
spark.sql(f"drop table if exists scmtemp.store_sku_sale_prophet")
spark.sql(f"create table scmtemp.store_sku_sale_prophet as select * from store_sku_predict_29 ")
print('完成預測')           

複制

當然也可以不用pandas_udf的形式進行

,在舊版spark中使用sc.parallelize()實作分組并行化

如:sc.parallelize(data,800).map(run_model).reduce(merge)

上文還有一個節假日資料沒有給出來,限于篇幅有限,整個代碼就放在github上了,如需要請自取。

基本交代清楚了,暫更于此。

完整代碼[pyspark_prophet]

釋出者:全棧程式員棧長,轉載請注明出處:https://javaforall.cn/151737.html原文連結:https://javaforall.cn