天天看點

python使用tushare擷取資料,實作多因子雙重排序選股回測

1.導入所需的庫,設定全局變量

import pandas as pd
import tushare as ts
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns

#全局變量
part = 5
start_date = '20160101'
end_date = '20201231'
interval = 22
           

資料源采用的是tushare平台Tushare大資料社群 (waditu.com),除使用免費的basic接口外,還可以通過送出學生證明擷取積分使用大多數接口完成代碼測試任務,詳情可見連結:Tushare大資料社群 (waditu.com)

 2.擷取資料

#tushare接口
token='你的token 可在tushare網頁檢視'
ts.set_token(token)
pro=ts.pro_api(token)

#擷取開盤日清單
trade_date = pro.trade_cal(exchange='',start_date = start_date, end_date = end_date)
trade_date = trade_date[trade_date['is_open'].isin([1])]
date_list = trade_date['cal_date'].tolist()

#僅運作一遍
#間隔interval日期從接口擷取資料,存入csv
days = len(date_list)
rebalance_day = 0
rebalance_days = [0]
df_list = pro.daily_basic(ts_code='', trade_date=date_list[0], \
                          fields='ts_code,trade_date,turnover_rate,pb,close,total_mv')
while rebalance_day + interval + 1 <= days:
    rebalance_day = rebalance_day + interval
    rebalance_days.append(rebalance_day)
    df = pro.daily_basic(ts_code='', trade_date=date_list[rebalance_day],\
                         fields='ts_code,trade_date,turnover_rate,pb,close,total_mv')
    df_list = pd.concat([df_list,df])
df_list.to_csv('Data.csv',index=0) 

#讀取csv檔案
data = pd.read_csv('Data.csv')
data['trade_date']=pd.to_datetime(data['trade_date'],format='%Y%m%d')
           

3.編寫雙重排序函數

def bivariate_sort(interval, date_list, weight = 'total_mv',part = 3, remove = True):
    returns = []
    data = pd.read_csv('Data.csv')
    data['trade_date']=pd.to_datetime(data['trade_date'],format='%Y%m%d')
    days = len(date_list)
    rebalance_day = 0
    #rebalance_days = [0]
    df = data[data['trade_date']==date_list[rebalance_day]]
    #s設定分組範圍及标簽
    q=[0]
    for i in range(1,part):
        q.append(i/part)
    q.append(1)
    BM_list_labels = ['Low']
    for i in range(2,part):
        BM_list_labels.append(str(i))
    BM_list_labels.append('High')
    total_mv_list_labels = ['Small']
    for i in range(2,part):
        total_mv_list_labels.append(str(i))
    total_mv_list_labels.append('Big')
    while rebalance_day + interval + 1 <= days:
        #資料清洗,循環遞進
        df = df.dropna(axis= 0)
        #df = df.dropna(axis= 0,subset=['pb'])
        df['BM'] = df['pb'].map(lambda x: 1/x) ## calculate BM and add to the dataframe
        
        #rebalance_days.append(rebalance_day)
        #移除30%小市值以及四個大市值異常資料
        if remove:
            remove_quantile = df['total_mv'].quantile(q=[0.3]).tolist()[0]
            df = df.drop(df[df['total_mv'] < remove_quantile].index)
            remove_ts_code = ['601857.SH', '600028.SH', '601088.SH', '601766.SH']
            df = df.drop(df[df['ts_code'].isin(remove_ts_code)].index)
        #資料切分
        BM_quantile_list = df['BM'].quantile(q=q).tolist()
        df['BM_level'] = pd.cut(df['BM'], bins=BM_quantile_list,\
                                labels=BM_list_labels ,duplicates='drop',include_lowest=True)
        total_mv_quantile_list = df['total_mv'].quantile(q=q).tolist()
        df['total_mv_level'] = pd.cut(df['total_mv'], bins=total_mv_quantile_list, \
                                      labels=total_mv_list_labels,duplicates='drop', include_lowest=True)
        #資料分組
        df_list = []
        for i in range(part):
            for j in range(part):
                df_list.append(df[df['BM_level'].isin([BM_list_labels[i]])
                                  & df['total_mv_level'].isin([total_mv_list_labels[j]])])
        #分組計算收益
        rebalance_day = rebalance_day + interval  
        df = data[data['trade_date']==date_list[rebalance_day]]
        current_returns = []
        for i in range(part*part):
            buy_return = 0
            df_list[i].index = range(len(df_list[i]))
            if weight == 'equally_weighted':
                for j in range(df_list[i].shape[0]):
                    start_close = df_list[i]['close'][j]
                    ts_code = df_list[i]['ts_code'][j]
                    end_close_list = df[df['ts_code'].isin([ts_code])]['close'].tolist()
                    if end_close_list == []:
                        continue
                    end_close = end_close_list[0]
                    
                    buy_return += 1/len(df_list[i])*(end_close-start_close)/start_close
            else:
                total_value = df_list[i][weight].sum()
                for j in range(df_list[i].shape[0]):
                    start_close = df_list[i]['close'][j]
                    mv = df_list[i][weight][j]
                    ts_code = df_list[i]['ts_code'][j]
                    end_close_list = df[df['ts_code'].isin([ts_code])]['close'].tolist()
                    if end_close_list == []:
                        continue
                    end_close = end_close_list[0]
                    
                    buy_return += mv/total_value*(end_close-start_close)/start_close
            current_returns.append(buy_return)
        
        returns.append(current_returns)
        returns_of_group = []
        nums = len(returns)
        for i in range(part*part):
            single_returns = []
            for j in range(nums):
                single_returns.append(returns[j][i])
            returns_of_group.append(single_returns)
                
    return  returns_of_group
           

4.運作并得到結果

returns_by_month = bivariate_sort(interval, date_list, weight='total_mv', part = part, remove = True)

for i in range(part*part):
    mean_list.append(np.mean(returns_by_month[i]))
    se_list.append(np.std(returns_by_month[i])/np.sqrt(len(returns_by_month[0])))
    t_list.append(stats.ttest_1samp(returns_by_month[i], 0)[0])
    p_list.append(stats.ttest_1samp(returns_by_month[i], 0)[1])

mean_array = np.array(mean_list) * 100
mean_list = mean_array.tolist()
se_array = np.array(se_list) * 100
se_list = se_array.tolist()

results_by_month = pd.DataFrame(np.array([mean_list,se_list,t_list,p_list]))
results_by_month.index=['mean(%)','se(%)','t-value','p-value']
results_by_month.columns=['lamda_turnover_rate','lamda_total_mv']

#作圖
BM_list_labels = ['Low']
for i in range(2,part):
    BM_list_labels.append(str(i))
BM_list_labels.append('High')
total_mv_list_labels = ['Small']
for i in range(2,part):
    total_mv_list_labels.append(str(i))
total_mv_list_labels.append('Big')
sns.set_style('whitegrid')
sns.heatmap(results_in_groups, 
            cmap=sns.diverging_palette(20, 220, n=200), 
            annot=True, # 注入資料
            center = 0,  # 繪制有色資料時将色彩映射居中的值
           )
plt.xlabel('BM')
plt.ylabel('total_mv')
plt.show()
           
python使用tushare擷取資料,實作多因子雙重排序選股回測

高BM和低市值表現良好,符合預期。