期限套利政策,要能夠穩定獲得各币種期貨和現貨的報價,計算各币種的基差資料,然後當基差突破門檻值時觸發交易。以下代碼可以得到穩定報價,計算基差資料,然後當突破門檻值時收到提示郵件(還沒寫好交易子產品)。
這裡記錄一下,開發過程遇到的主要問題,以及如何解決的,如果不感興趣,可以跳過,直接看代碼。
問題一:websocket報價資料不更新
簡單的ws.run_forever,通常在運作兩三天後,會出現報價卡在某個時點,不能更新的情況。例如,現貨報價停留在2021年12月25日11:00。
解決方案:用while true + try except語句,一旦websock連結有問題,就發送郵件通知(不知道為啥,一直沒有收到報錯郵件,發送郵件的代碼可能有問題),回收記憶體,再重新連結。
def s_start(self):
while True:
try:
s_ws=WebSocketApp(
url='wss://stream.binance.com:9443/ws',
on_open=self._s_on_open,
on_message=self._s_on_message,
on_error=self._s_on_error,
on_close=self._s_on_close
)
s_ws.run_forever()
except Exception as e:
content="%s the spot webscoket is broken,check if it restart"%e
self._email(content)
gc.collect()
問題二:啟動階段報價資料為空
因為涉及到多币種,剛剛建立連結的時候,報價資料擷取有先後,有可能btc資料有了,但是bnb資料要到10秒甚至20秒後才能收到。這樣程式會因報錯而中斷。
解決方案:定義好報錯的類型,用@retry裝飾器,保證程式在遇到特定類型報錯的時候,能夠持續運作下去。
def retry_if_index_error(exception):
return isinstance(exception,IndexError)
@retry(retry_on_exception=retry_if_index_error)
def get_basis(self):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Dec 5 20:15:41 2021
It seems to work really good, the price and basis data is correct after running 13 days.
2021/12/13 16:30 begin
@author: yszhu
"""
from websocket import WebSocketApp
import json,threading
import pysnooper
import time
import smtplib
from email.mime.text import MIMEText
from retrying import retry
import logging
import gc
class basis_dif(object):
"""
create variable save dilivery contract price,spot price and basis difference
"""
def __init__(self,contract_date=220325):
#delivery contract price
self.d_eth_price=None
self.d_eth_time=None
self.d_btc_price=None
self.d_btc_time=None
self.d_bnb_price=None
self.d_bnb_price=None
#spot price
self.eth_price=None
self.eth_time=None
self.btc_price=None
self.btc_time=None
self.bnb_price=None
self.bnb_time=None
#delivery and spot basis difference
self.btc_basis_dif=None
self.eth_basis_dif=None
self.bnb_basis_dif=None
self.contract_date=contract_date
self.basis_dif_threshold=0.035
#email
self._mail_host="smtp.qq.com"
self._mail_user="315960451"
self._mail_pwd="brvjltflaofrbhcb"
self._sender="[email protected]"
self._receivers="[email protected]"
# I use self._break_threshold to avoid sending email repeatedly.
# when self._break_threshold = True ,it means the basis diffrence
# now is greater than the threshold,so if the diffence becomes smaller
# than the threshold, this is the first time of break during one operation period.
# So ,I will receive the email ,and then operate my account.
self._break_threshold = True
#websocket for delivery contrat price
def _d_on_open(self,d_ws):
data={
"method":"SUBSCRIBE",
"params":
[
"btcusd_%[email protected]"%self.contract_date,
"ethusd_%[email protected]"%self.contract_date,
"bnbusd_%[email protected]"%self.contract_date
],
"id": 1
}
d_ws.send(json.dumps(data))
def _d_on_message(self,d_ws,d_msg):
d_msg=json.loads(d_msg)
if 's' in d_msg and d_msg['s']=="BTCUSD_%s"%self.contract_date:
self.d_btc_price=float(d_msg['p'])
self.d_btc_time=d_msg['T']
self.d_btc_time=int(self.d_btc_time/1000)
self.d_btc_time=time.localtime(self.d_btc_time)
self.d_btc_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_btc_time)
if 's' in d_msg and d_msg['s']=="ETHUSD_%s"%self.contract_date:
self.d_eth_price=float(d_msg['p'])
self.d_eth_time=d_msg['T']
self.d_eth_time=int(self.d_eth_time/1000)
self.d_eth_time=time.localtime(self.d_eth_time)
self.d_eth_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_eth_time)
if 's' in d_msg and d_msg['s']=="BNBUSD_%s"%self.contract_date:
self.d_bnb_price=float(d_msg['p'])
self.d_bnb_time=d_msg['T']
self.d_bnb_time=int(self.d_bnb_time/1000)
self.d_bnb_time=time.localtime(self.d_bnb_time)
self.d_bnb_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_bnb_time)
def _d_on_close(self,d_ws):
print("##connection closed##")
def _d_on_error(self,d_ws,error):
print(f"on error:{error}")
def d_start(self):
while True:
try:
d_ws=WebSocketApp(
url='wss://dstream.binance.com/ws',
on_open=self._d_on_open,
on_message=self._d_on_message,
on_error=self._d_on_error,
on_close=self._d_on_close
)
d_ws.run_forever()
except Exception as e:
content="%s the future webscoket is broken,check if it restart"%e
self._email(content)
gc.collect()
#websocket for spot price
def _s_on_open(self,s_ws):
data={
"method": "SUBSCRIBE",
"params": [
"[email protected]",
"[email protected]",
"[email protected]"
],
"id": 2
}
s_ws.send(json.dumps(data))
def _s_on_message(self,s_ws,s_msg):
s_msg=json.loads(s_msg)
if 's' in s_msg and s_msg['s']=="BTCUSDT":
self.btc_price=float(s_msg['p'])
self.btc_time=s_msg['T']
self.btc_time=int(self.btc_time/1000)
self.btc_time=time.localtime(self.btc_time)
self.btc_time=time.strftime("%Y-%m-%d %H:%M:%S",self.btc_time)
if 's' in s_msg and s_msg['s']=="ETHUSDT":
self.eth_price=float(s_msg['p'])
self.eth_time=s_msg['T']
self.eth_time=int(self.eth_time/1000)
self.eth_time=time.localtime(self.eth_time)
self.eth_time=time.strftime("%Y-%m-%d %H:%M:%S",self.eth_time)
if 's' in s_msg and s_msg['s']=="BNBUSDT":
self.bnb_price=float(s_msg['p'])
self.bnb_time=s_msg['T']
self.bnb_time=int(self.bnb_time/1000)
self.bnb_time=time.localtime(self.bnb_time)
self.bnb_time=time.strftime("%Y-%m-%d %H:%M:%S",self.bnb_time)
def _s_on_close(self,s_ws):
print("##connection closed##")
def _s_on_error(self,s_ws,error):
print(f"on error:{error}")
def s_start(self):
while True:
try:
s_ws=WebSocketApp(
url='wss://stream.binance.com:9443/ws',
on_open=self._s_on_open,
on_message=self._s_on_message,
on_error=self._s_on_error,
on_close=self._s_on_close
)
s_ws.run_forever()
except Exception as e:
content="%s the spot webscoket is broken,check if it restart"%e
self._email(content)
gc.collect()
#because there are 7 kind of coin with spot and future price , so at
#the begining , there maybe no data for self.bnb_price for the lack of liquidity,
#In this case , python will raise IndexError
#we need trying when TypeError is raised.
def retry_if_index_error(exception):
return isinstance(exception,IndexError)
@retry(retry_on_exception=retry_if_index_error)
def get_basis(self):
while True:
self.btc_basis_dif = self.d_btc_price/self.btc_price-1
self.eth_basis_dif = self.d_eth_price/self.eth_price-1
self.bnb_basis_dif = self.d_bnb_price/self.bnb_price-1
print("btc_basis_dif is %f" % self.btc_basis_dif)
print("btc_d_price is %f %s"%(self.d_btc_price,self.d_btc_time))
print("btc_s_price is %f %s"%(self.btc_price,self.btc_time))
print("eth_basis_dif is %f"%self.eth_basis_dif)
print("eth_d_price is %f %s"%(self.d_eth_price,self.d_eth_time))
print("eth_s_price is %f %s"%(self.eth_price,self.eth_time))
print("bnb_basis_dif is %f"%self.bnb_basis_dif)
print("bnb_d_price is %f %s"%(self.d_bnb_price,self.d_bnb_time))
print("bnb_s_price is %f %s"%(self.bnb_price,self.bnb_time))
basis_dif_dict={
"btc":[self.btc_basis_dif,self.btc_price,self.d_btc_price],
"eth":[self.eth_basis_dif,self.eth_price,self.d_eth_price],
"bnb":[self.bnb_basis_dif,self.bnb_price,self.d_bnb_price],
}
basis_dif_dict=sorted(basis_dif_dict.items(),key=lambda x:x[1],reverse=True)
greatest_basis_dif=basis_dif_dict[0][1][0]
print("the greatest basis is %s %f,Spot pirice %f,future price %f"%(
basis_dif_dict[0][0],
greatest_basis_dif,
basis_dif_dict[0][1][1],
basis_dif_dict[0][1][2]
))
if greatest_basis_dif>self.basis_dif_threshold:
if self._break_threshold == True:
content="the greatest basis is %s %f,Spot pirice %f,future price %f"%(
basis_dif_dict[0][0],
greatest_basis_dif,
basis_dif_dict[0][1][1],
basis_dif_dict[0][1][2]
)
self._email(content)
self._break_threshold = False
if greatest_basis_dif<self.basis_dif_threshold:
self._break_threshold= True
def _email(self,content):
'''
if the basis_dif reached the threshold, send an email
param:content: which coin's bais_dif has reach the threshold at what price and when
'''
message=MIMEText(content,'plain',"utf-8")
message['Subject']="from new greatest basis dif:basis difference reached the threshold"
message['From']=self._sender
message['to']=self._receivers[0]
smtpObj=smtplib.SMTP()
smtpObj.connect(self._mail_host,25)
smtpObj.login(self._mail_user,self._mail_pwd)
smtpObj.sendmail(self._sender,self._receivers,message.as_string())
smtpObj.quit()
if __name__=="__main__":
test=basis_dif()
# because I want to recieve spot and contract price at the same time, so I create two threads
t1=threading.Thread(target=test.d_start)
t2=threading.Thread(target=test.s_start)
t1.start()
t2.start()
test.get_basis()