天天看點

Python量化交易學習筆記(47)——因子擴充

用這篇文章記錄一下目前學習過程中使用到的因子,包括使用純名額規則以及使用機器學習方法所使用到的因子,以便于後續文章描述使用。這裡以日線資料為例,周線、月線的因子擴充需要注意适度減小均線周期大小等參數。

規則選股因子擴充

全部因子擴充代碼在本節末,這裡進行幾點說明:

  • 隻實作了部分因子的擴充,便于後面規則選股時使用。可以根據具體需要,增删相應因子。
  • 在首次運作代碼時,會進行擴充因子的全量計算;當後續日線資料更新後,再運作代碼,隻會計算更新日期的擴充因子,避免了全量計算而帶來的過長時間消耗。即實作了增量計算。
  • MACD及均線名額實作,參考了字王的topq_talib包。
  • macd_ext名額實作的是,目前時間點向前,第1、2、3塊紅柱、綠柱的面積,可用于輔助背離判斷。
  • 異動量及收複參考了微網誌大V煎蛋的因子。
  • 底分型的實作隻用了近3根K線資料,未實作纏論中的合并規則。
  • 使用shift的操作,将前n日因子合并到目前日期上,便于後續選股使用。合并後的名稱采用“因子_na”的形式來表示,例如close_3a表示3天前的收盤價。
import os.path  # 用于管理路徑
import sys  # 用于在argvTo[0]中找到腳本名稱
import pandas as pd
import time

# 擷取目前目錄
proj_path = os.path.dirname(os.path.abspath(sys.argv[0])) + '/../'

g_ma_list = [5, 10, 20, 30, 60, 120, 250]
g_vol_ma_list = [5, 10, 135]
g_shift_n = 5
g_min_period = 150


# macd
def MACD(df, n_fast, n_slow, ksgn='close'):
    xnam = 'mdiff'  # 'macd'
    xnam2 = 'mdea'  # 'msign'
    xnam3 = 'macd'  # 'mdiff'
    EMAfast = df[ksgn].ewm(span=n_fast, min_periods=n_fast - 1).mean()
    EMAslow = df[ksgn].ewm(span=n_slow, min_periods=n_slow - 1).mean()
    mdiff = pd.Series(EMAfast - EMAslow, name=xnam)  # dif
    xnum = max(int((n_fast + n_slow) / 4), 2)
    mdea = mdiff.ewm(span=xnum, min_periods=xnum - 1).mean()  # DEA or DEM
    mdea.name = xnam2
    macd = pd.Series(mdiff - mdea, name=xnam3).map(lambda x: x * 2)
    df = df.join(macd)
    df = df.join(mdea)
    df = df.join(mdiff)
    return df


# 均線
def MA_n(df, n, ksgn='close'):
    xnam = '{}ma_{}'.format('' if 'close' == ksgn else ksgn + '_', n)
    ds2 = pd.Series(df[ksgn], name=xnam, index=df.index)
    ds5 = ds2.rolling(center=False, window=n).mean()
    df = df.join(ds5)
    return df


# macd名額中,前n段中,紅色、綠色柱面積
def macd_ext(df, n):
    df['macd_1a'] = df[['macd']].shift(1)
    df['macd_switch'] = df.apply(
        lambda x: 1 if x.macd > 0 and x.macd_1a < 0 else (
            -1 if x.macd < 0 and x.macd_1a > 0 else 0), axis=1
    )

    red = []
    green = []
    # 深拷貝
    for i in range(n):
        red.append([0.0] * df.shape[0])
        green.append([0.0] * df.shape[0])

    curr_red = [0.0] * n
    curr_green = [0.0] * n
    accu_value = 0

    for i in range(df.shape[0]):
        if pd.isna(df['macd'].iloc[i]):
            continue
        if 1 == df['macd_switch'].iloc[i]:
            for j in range(n - 1, 0, -1):
                curr_green[j] = curr_green[j - 1]
            curr_green[0] = accu_value
            accu_value = df['macd'].iloc[i]
        elif -1 == df['macd_switch'].iloc[i]:
            for j in range(n - 1, 0, -1):
                curr_red[j] = curr_red[j - 1]
            curr_red[0] = accu_value
            accu_value = df['macd'].iloc[i]
        else:
            accu_value += df['macd'].iloc[i]
        for j in range(n):
            red[j][i] = curr_red[j]
            green[j][i] = curr_green[j]

    for i in range(n):
        temp_series = pd.Series(red[i], name='red{}'.format(i))
        temp_series.index = df.index
        df = df.join(temp_series)

        temp_series = pd.Series(green[i], name='green{}'.format(i))
        temp_series.index = df.index
        df = df.join(temp_series)

    return df


# 縮量陰線,前1日暴漲
def shrink_negative_line(df):
    df['shrink_negative_line'] = df.apply(
        lambda x: 1 if ((x.close_1 - x.close_2) / x.close_2) > 0.09 and \
                       x.volume < x.volume_1 and \
                       x.close < x.open and \
                       x.low > x.low_1 and \
                       x.close < x.close_1 else 0, axis=1
    )
    return df


# 縮量
def shrink_volume(df):
    df['shrink_volume'] = df.apply(
        lambda x: 1 if x.volume < x.volume_1a else 0, axis=1
    )
    return df


# 暴量,成交量大于135日均量線
def volume_boom(df):
    df['volume_boom'] = df.apply(
        lambda x: 1 if x.volume > x.volume_ma_135 else 0, axis=1)
    return df


# 暴漲,漲幅大于9%
def value_boom(df):
    df['value_boom'] = df.apply(
        lambda x: 1 if (x.close - x.close_1a) / x.close_1a > 0.09 else 0, axis=1)
    return df


# 底分型
def bottom_shape(df):
    df['bottom_shape'] = df.apply(
        lambda x: 1 if x.low_1a < x.low_2a and x.low_1a < x.low and x.high_1a < x.high_2a and x.high_1a < x.high else 0,
        axis=1)
    return df


# 基于異動量計算異動量收複
def retrieve_special_volume(df):
    # 按條件生成新列
    df['retrieve_special_volume'] = df.apply(
        lambda x: 1 if 1 == x.special_volume_1a and x.close > x.high_1a and x.close > x.open else 0, axis=1)
    return df


# 陽線
def positive(df):
    df['positive'] = df.apply(
        lambda x: 1 if x.close > x.open else 0, axis=1
    )
    return df


# 陰線
def negative(df):
    df['negative'] = df.apply(
        lambda x: 1 if x.close < x.open else 0, axis=1
    )
    return df


# 異動量
def special_volume(df):
    # 按條件生成新列
    df['special_volume'] = df.apply(
        lambda x: 1 if x.open > x.close and x.close < x.close_1a and x.volume > x.volume_1a else 0, axis=1)
    return df


# 将前n日的名額列入當日名額
def shift_till_n(df, indicator_list, n):
    for i in range(n):
        shift_i(df, indicator_list, i + 1)
    return df


# 将第前n日的名額列入當日名額
def shift_i(df, indicator_list, i):
    for ind in indicator_list:
        df['{}_{}a'.format(ind, i)] = df[ind].shift(i)
    return df


if __name__ == '__main__':
    # 程式開始時的時間
    time_start = time.time()
    # 讀入股票代碼
    stock_codes = pd.read_csv(proj_path + 'data/tdx/all_stock_codes.csv', encoding='unicode_escape')
    # 建立寫出目錄
    out_dir = proj_path + 'data/extension/d/hard_rules/'
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)

    # 循環處理每隻股票
    for code in stock_codes['code']:
        print('processing {}...'.format(code))
        input_file = proj_path + 'data/tdx/day/' + code + '.csv'
        if not os.path.exists(input_file):
            continue
        output_file = out_dir + code + '.csv'
        exist_df = pd.DataFrame()
        df = pd.read_csv(input_file)
        df = df.sort_index(ascending=True)
        # 用于更新資料時,減少計算規模
        g_min_period = max(g_min_period, g_shift_n, max(g_vol_ma_list), max(g_ma_list))
        new_lines = 0

        # 已有部分計算結果
        if os.path.exists(output_file):
            exist_df = pd.read_csv(output_file)
            last_date = exist_df['date'].iloc[-1]
            date_index = df[df.date == last_date].index.tolist()[0]
            new_lines = df.shape[0] - date_index - 1
            df = df.iloc[- g_min_period - new_lines:]

            # 沒有要新計算的行
            if not new_lines:
                continue

        # vol_MA
        for i in g_vol_ma_list:
            df = MA_n(df, i, 'volume')

        # ma
        for i in g_ma_list:
            df = MA_n(df, i)

        # 計算復原參數
        indicator_list = ['open', 'high', 'low', 'close', 'volume', 'amount']
        indicator_list.extend(list(map(lambda x: 'ma_' + str(x), g_ma_list)))
        indicator_list.extend(list(map(lambda x: 'volume_ma_' + str(x), g_vol_ma_list)))
        df = shift_till_n(df, indicator_list, g_shift_n)

        # 計算異動量
        df = special_volume(df)
        df = shift_till_n(df, ['special_volume'], g_shift_n)

        # 異動量收複
        df = retrieve_special_volume(df)

        # 底分型
        df = bottom_shape(df)

        # MACD
        df = MACD(df, 12, 26)
        df = macd_ext(df, 3)

        # 計算暴漲
        df = value_boom(df)
        df = shift_till_n(df, ['value_boom'], g_shift_n)

        # 計算量暴漲
        df = volume_boom(df)
        df = shift_till_n(df, ['volume_boom'], g_shift_n)

        # 計算縮量
        df = shrink_volume(df)
        df = shift_till_n(df, ['shrink_volume'], g_shift_n)
        # df = shrink_negative_line(df)

        # 計算陽線、陰線
        df = positive(df)
        df = negative(df)
        df = shift_till_n(df, ['positive', 'negative'], g_shift_n)

        if new_lines:
            df = exist_df.append(df.iloc[-new_lines:])

        # 寫出檔案
        df.to_csv(output_file, index=False)
        print(code + ' done!')

    # 程式結束時系統時間
    time_end = time.time()

    print('程式所耗時間:', time_end - time_start)
           

機器學習選股因子擴充

全部擴充因子在本節末,幾點說明:

  • 這裡的擴充因子拟應用于機器學習,将選股處理成二分類問題,是以需要計算标簽資訊。使用class_label方法來計算相應的标簽值。
  • 複用了上一節的因子,也使用pandas_ta實作了大量因子的計算。關鍵代碼:

其中,去除的因子在很多時候沒有輸出值,會影響機器學習的計算。處理後因子總次元為303。

  • 未實作增量計算,即每次都對全量因子進行計算。主要原因是沒有對pandas_ta進行深度研究,無法判斷增量計算的結果。
  • 對因子和最後結果的相關性進行計算,發現與成交量相關的因子和最後的結果相關性最高。
import os.path  # 用于管理路徑
import sys  # 用于在argvTo[0]中找到腳本名稱
import pandas as pd
import time
import pandas_ta as ta

# 擷取目前目錄
proj_path = os.path.dirname(os.path.abspath(sys.argv[0])) + '/../'

g_ma_list = [5, 10, 20, 30, 60, 120, 250]
g_vol_ma_list = [5, 10, 135]
g_shift_n = 5
g_ml_min_period = 1500


# macd
def MACD(df, n_fast, n_slow, ksgn='close'):
    xnam = 'mdiff'  # 'macd'
    xnam2 = 'mdea'  # 'msign'
    xnam3 = 'macd'  # 'mdiff'
    EMAfast = df[ksgn].ewm(span=n_fast, min_periods=n_fast - 1).mean()
    EMAslow = df[ksgn].ewm(span=n_slow, min_periods=n_slow - 1).mean()
    mdiff = pd.Series(EMAfast - EMAslow, name=xnam)  # dif
    xnum = max(int((n_fast + n_slow) / 4), 2)
    mdea = mdiff.ewm(span=xnum, min_periods=xnum - 1).mean()  # DEA or DEM
    mdea.name = xnam2
    macd = pd.Series(mdiff - mdea, name=xnam3).map(lambda x: x * 2)
    df = df.join(macd)
    df = df.join(mdea)
    df = df.join(mdiff)
    return df


# 均線
def MA_n(df, n, ksgn='close'):
    xnam = '{}ma_{}'.format('' if 'close' == ksgn else ksgn + '_', n)
    ds2 = pd.Series(df[ksgn], name=xnam, index=df.index)
    ds5 = ds2.rolling(center=False, window=n).mean()
    df = df.join(ds5)
    return df


# macd名額中,前n段中,紅色、綠色柱面積
def macd_ext(df, n):
    df['macd_1a'] = df[['macd']].shift(1)
    df['macd_switch'] = df.apply(
        lambda x: 1 if x.macd > 0 and x.macd_1a < 0 else (
            -1 if x.macd < 0 and x.macd_1a > 0 else 0), axis=1
    )

    red = []
    green = []
    # 深拷貝
    for i in range(n):
        red.append([0.0] * df.shape[0])
        green.append([0.0] * df.shape[0])

    curr_red = [0.0] * n
    curr_green = [0.0] * n
    accu_value = 0

    for i in range(df.shape[0]):
        if pd.isna(df['macd'].iloc[i]):
            continue
        if 1 == df['macd_switch'].iloc[i]:
            for j in range(n - 1, 0, -1):
                curr_green[j] = curr_green[j - 1]
            curr_green[0] = accu_value
            accu_value = df['macd'].iloc[i]
        elif -1 == df['macd_switch'].iloc[i]:
            for j in range(n - 1, 0, -1):
                curr_red[j] = curr_red[j - 1]
            curr_red[0] = accu_value
            accu_value = df['macd'].iloc[i]
        else:
            accu_value += df['macd'].iloc[i]
        for j in range(n):
            red[j][i] = curr_red[j]
            green[j][i] = curr_green[j]

    for i in range(n):
        temp_series = pd.Series(red[i], name='red{}'.format(i))
        temp_series.index = df.index
        df = df.join(temp_series)

        temp_series = pd.Series(green[i], name='green{}'.format(i))
        temp_series.index = df.index
        df = df.join(temp_series)

    return df


# 縮量陰線,前1日暴漲
def shrink_negative_line(df):
    df['shrink_negative_line'] = df.apply(
        lambda x: 1 if ((x.close_1 - x.close_2) / x.close_2) > 0.09 and \
                       x.volume < x.volume_1 and \
                       x.close < x.open and \
                       x.low > x.low_1 and \
                       x.close < x.close_1 else 0, axis=1
    )
    return df


# 縮量
def shrink_volume(df):
    df['shrink_volume'] = df.apply(
        lambda x: 1 if x.volume < x.volume_1a else 0, axis=1
    )
    return df


# 暴量,成交量大于135日均量線
def volume_boom(df):
    df['volume_boom'] = df.apply(
        lambda x: 1 if x.volume > x.volume_ma_135 else 0, axis=1)
    return df


# 暴漲,漲幅大于9%
def value_boom(df):
    df['value_boom'] = df.apply(
        lambda x: 1 if (x.close - x.close_1a) / x.close_1a > 0.09 else 0, axis=1)
    return df


# 底分型
def bottom_shape(df):
    df['bottom_shape'] = df.apply(
        lambda x: 1 if x.low_1a < x.low_2a and x.low_1a < x.low and x.high_1a < x.high_2a and x.high_1a < x.high else 0,
        axis=1)
    return df


# 基于異動量計算異動量收複
def retrieve_special_volume(df):
    # 按條件生成新列
    df['retrieve_special_volume'] = df.apply(
        lambda x: 1 if 1 == x.special_volume_1a and x.close > x.high_1a and x.close > x.open else 0, axis=1)
    return df


# 陽線
def positive(df):
    df['positive'] = df.apply(
        lambda x: 1 if x.close > x.open else 0, axis=1
    )
    return df


# 陰線
def negative(df):
    df['negative'] = df.apply(
        lambda x: 1 if x.close < x.open else 0, axis=1
    )
    return df


# 異動量
def special_volume(df):
    # 按條件生成新列
    df['special_volume'] = df.apply(
        lambda x: 1 if x.open > x.close and x.close < x.close_1a and x.volume > x.volume_1a else 0, axis=1)
    return df


# 将前n日的名額列入當日名額
def shift_till_n(df, indicator_list, n):
    for i in range(n):
        shift_i(df, indicator_list, i + 1)
    return df


# 将第前n日的名額列入當日名額
def shift_i(df, indicator_list, i):
    for ind in indicator_list:
        df['{}_{}a'.format(ind, i)] = df[ind].shift(i)
    return df


# 計算最大收益
def max_profit(x, percent_change=0.1):
    ret = 0
    if (max(x) - x.iloc[-1]) / x.iloc[-1] >= percent_change:
        ret = 1
    return ret


# 計算是否能夠在days日内的實作收益percent_change
def class_label(df, days, percent_change):
    df['label_{}_{}%'.format(days, percent_change * 100)] = (
                                                                df.iloc[::-1]['close'].rolling(days + 1).apply(
                                                                    max_profit,
                                                                    kwargs={'percent_change': percent_change})).iloc[
                                                            ::-1]
    return df


if __name__ == '__main__':
    # 程式開始時的時間
    time_start = time.time()

    # 機器學習
    stock_code_file = proj_path + 'data/tdx/ml_stock_code.csv'
    if not os.path.exists(stock_code_file):
        all_stock_code_file = proj_path + 'data/tdx/all_stock_codes.csv'
        stock_codes = pd.read_csv(all_stock_code_file, encoding='unicode_escape')
        ml_stock_list = []
        # 篩選股票,確定有充足的訓練資料
        for code in stock_codes['code']:
            input_file = proj_path + 'data/tdx/day/' + code + '.csv'
            if not os.path.exists(input_file):
                continue
            df = pd.read_csv(input_file)
            if df.shape[0] > g_ml_min_period:
                ml_stock_list.append(code)
        out_df = pd.DataFrame(ml_stock_list, columns=['code'])
        out_df.to_csv(stock_code_file, index=False)
    stock_codes = pd.read_csv(stock_code_file, encoding='unicode_escape')

    # 建立寫出目錄
    out_dir = proj_path + 'data/extension/d/ml/'
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)

    # 循環處理每隻股票
    for code in stock_codes['code']:
        print('processing {}...'.format(code))
        input_file = proj_path + 'data/tdx/day/' + code + '.csv'
        if not os.path.exists(input_file):
            continue
        output_file = out_dir + code + '.csv'
        exist_df = pd.DataFrame()
        df = pd.read_csv(input_file)
        df = df.sort_index(ascending=True)
        # 用于更新資料時,減少計算規模

        df.ta.strategy(exclude=['dpo', 'psar', 'supertrend', 'ichimoku', 'hilo'], verbose=True, timed=True)

        # vol_MA
        for i in g_vol_ma_list:
            df = MA_n(df, i, 'volume')

        # ma
        for i in g_ma_list:
            df = MA_n(df, i)

        # 計算復原參數
        indicator_list = ['open', 'high', 'low', 'close', 'volume', 'amount']
        indicator_list.extend(list(map(lambda x: 'ma_' + str(x), g_ma_list)))
        indicator_list.extend(list(map(lambda x: 'volume_ma_' + str(x), g_vol_ma_list)))
        df = shift_till_n(df, indicator_list, g_shift_n)

        # 計算異動量
        df = special_volume(df)
        df = shift_till_n(df, ['special_volume'], g_shift_n)

        # 異動量收複
        df = retrieve_special_volume(df)

        # 底分型
        df = bottom_shape(df)

        # MACD
        df = MACD(df, 12, 26)
        df = macd_ext(df, 3)

        # 計算暴漲
        df = value_boom(df)
        df = shift_till_n(df, ['value_boom'], g_shift_n)

        # 計算量暴漲
        df = volume_boom(df)
        df = shift_till_n(df, ['volume_boom'], g_shift_n)

        # 計算縮量
        df = shrink_volume(df)
        df = shift_till_n(df, ['shrink_volume'], g_shift_n)
        # df = shrink_negative_line(df)

        # 計算陽線、陰線
        df = positive(df)
        df = negative(df)
        df = shift_till_n(df, ['positive', 'negative'], g_shift_n)


        # 計算分類标準
        df = class_label(df, 1, 0.095)
        df = class_label(df, 2, 0.095)
        df = class_label(df, 5, 0.095)
        df = class_label(df, 10, 0.095)
        df = class_label(df, 2, 0.195)
        df = class_label(df, 5, 0.195)
        df = class_label(df, 10, 0.195)


        # 寫出檔案
        df.to_csv(output_file, index=False)
        print(code + ' done!')

    # 程式結束時系統時間
    time_end = time.time()

    print('程式所耗時間:', time_end - time_start)
           

歡迎大家關注、點贊、轉發、留言,感謝支援!

微信群用于學習交流,感興趣的讀者請掃碼加微信!

QQ群(676186743)用于資料共享,歡迎加入!

Python量化交易學習筆記(47)——因子擴充
Python量化交易學習筆記(47)——因子擴充

繼續閱讀