天天看點

binance 币本位交割合約期限套利交易(三)--擷取基差資料問題一:websocket報價資料不更新問題二:啟動階段報價資料為空

期限套利政策,要能夠穩定獲得各币種期貨和現貨的報價,計算各币種的基差資料,然後當基差突破門檻值時觸發交易。以下代碼可以得到穩定報價,計算基差資料,然後當突破門檻值時收到提示郵件(還沒寫好交易子產品)。

這裡記錄一下,開發過程遇到的主要問題,以及如何解決的,如果不感興趣,可以跳過,直接看代碼。

問題一:websocket報價資料不更新

簡單的ws.run_forever,通常在運作兩三天後,會出現報價卡在某個時點,不能更新的情況。例如,現貨報價停留在2021年12月25日11:00。

解決方案:用while true + try except語句,一旦websock連結有問題,就發送郵件通知(不知道為啥,一直沒有收到報錯郵件,發送郵件的代碼可能有問題),回收記憶體,再重新連結。

def s_start(self):
        while True:
            try:
                s_ws=WebSocketApp(
                    url='wss://stream.binance.com:9443/ws',
                    on_open=self._s_on_open,
                    on_message=self._s_on_message,
                    on_error=self._s_on_error,
                    on_close=self._s_on_close
                    )
                s_ws.run_forever() 
            except Exception as e:
                 content="%s the spot webscoket is broken,check if it restart"%e
                 self._email(content)
                 gc.collect()   
                
           

問題二:啟動階段報價資料為空

因為涉及到多币種,剛剛建立連結的時候,報價資料擷取有先後,有可能btc資料有了,但是bnb資料要到10秒甚至20秒後才能收到。這樣程式會因報錯而中斷。

解決方案:定義好報錯的類型,用@retry裝飾器,保證程式在遇到特定類型報錯的時候,能夠持續運作下去。

def retry_if_index_error(exception):
        return isinstance(exception,IndexError)
    
    @retry(retry_on_exception=retry_if_index_error)
    def get_basis(self):
           
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Dec  5 20:15:41 2021
It seems to work really good, the price and basis data is correct after running 13 days.
2021/12/13 16:30 begin
@author: yszhu
"""


from websocket import WebSocketApp
import json,threading
import pysnooper
import time
import smtplib
from email.mime.text import MIMEText
from retrying import retry
import logging
import gc

class basis_dif(object):
    
    """
    create variable save dilivery contract price,spot price and basis difference
    """
    def __init__(self,contract_date=220325):
        #delivery contract price
        self.d_eth_price=None
        self.d_eth_time=None
        self.d_btc_price=None
        self.d_btc_time=None
        self.d_bnb_price=None
        self.d_bnb_price=None
        
        
        #spot price

        self.eth_price=None
        self.eth_time=None
        self.btc_price=None
        self.btc_time=None
        self.bnb_price=None
        self.bnb_time=None
        #delivery and spot basis difference
        self.btc_basis_dif=None
        self.eth_basis_dif=None
        self.bnb_basis_dif=None
        self.contract_date=contract_date
        self.basis_dif_threshold=0.035
        
        #email

        self._mail_host="smtp.qq.com"
        self._mail_user="315960451"
        self._mail_pwd="brvjltflaofrbhcb"
        
        self._sender="[email protected]"
        self._receivers="[email protected]"
        
        # I use self._break_threshold to avoid sending email repeatedly.
        # when self._break_threshold = True ,it means the basis diffrence
        # now is greater than the threshold,so if the diffence becomes smaller
        # than the threshold, this is the first time of break during one operation period.
        # So ,I will receive the email ,and then operate my account.
        self._break_threshold = True
            
        
    #websocket for delivery contrat price 
    def _d_on_open(self,d_ws):
        data={
            "method":"SUBSCRIBE",
            "params":
            [
            "btcusd_%[email protected]"%self.contract_date,
            "ethusd_%[email protected]"%self.contract_date,
            "bnbusd_%[email protected]"%self.contract_date
            ],
            "id": 1
            }                        
        d_ws.send(json.dumps(data))
    
    def _d_on_message(self,d_ws,d_msg):
        d_msg=json.loads(d_msg)
        
        if 's' in d_msg and d_msg['s']=="BTCUSD_%s"%self.contract_date:
            self.d_btc_price=float(d_msg['p'])
            self.d_btc_time=d_msg['T']
            self.d_btc_time=int(self.d_btc_time/1000)
            self.d_btc_time=time.localtime(self.d_btc_time)
            self.d_btc_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_btc_time)
            

        if 's' in d_msg and d_msg['s']=="ETHUSD_%s"%self.contract_date:
            self.d_eth_price=float(d_msg['p'])
            self.d_eth_time=d_msg['T']
            self.d_eth_time=int(self.d_eth_time/1000)
            self.d_eth_time=time.localtime(self.d_eth_time)
            self.d_eth_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_eth_time)
            
            
        if 's' in d_msg and d_msg['s']=="BNBUSD_%s"%self.contract_date:
            self.d_bnb_price=float(d_msg['p'])
            self.d_bnb_time=d_msg['T']
            self.d_bnb_time=int(self.d_bnb_time/1000)
            self.d_bnb_time=time.localtime(self.d_bnb_time)
            self.d_bnb_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_bnb_time)
        
    def _d_on_close(self,d_ws):
        print("##connection closed##")
        
    def _d_on_error(self,d_ws,error):
        print(f"on error:{error}")
    
    def d_start(self):
        while True:
            try:
                d_ws=WebSocketApp(
                    url='wss://dstream.binance.com/ws',
                    on_open=self._d_on_open,
                    on_message=self._d_on_message,
                    on_error=self._d_on_error,
                    on_close=self._d_on_close
                    )
                d_ws.run_forever()
            except Exception as e:
                content="%s the future webscoket is broken,check if it restart"%e
                self._email(content)
                gc.collect()


    #websocket for spot price 
    def _s_on_open(self,s_ws):
        data={
              "method": "SUBSCRIBE",
              "params": [
                "[email protected]",
                "[email protected]",
                 "[email protected]"
              ],
              "id": 2
             }               
        s_ws.send(json.dumps(data))
    
    def _s_on_message(self,s_ws,s_msg):
        s_msg=json.loads(s_msg)
        
        if 's' in s_msg and s_msg['s']=="BTCUSDT":
            self.btc_price=float(s_msg['p'])
            self.btc_time=s_msg['T']
            self.btc_time=int(self.btc_time/1000)
            self.btc_time=time.localtime(self.btc_time)
            self.btc_time=time.strftime("%Y-%m-%d %H:%M:%S",self.btc_time)


        if 's' in s_msg and s_msg['s']=="ETHUSDT":
            self.eth_price=float(s_msg['p'])
            self.eth_time=s_msg['T']
            self.eth_time=int(self.eth_time/1000)
            self.eth_time=time.localtime(self.eth_time)
            self.eth_time=time.strftime("%Y-%m-%d %H:%M:%S",self.eth_time)

            
            
        if 's' in s_msg and s_msg['s']=="BNBUSDT":
            self.bnb_price=float(s_msg['p'])
            self.bnb_time=s_msg['T']
            self.bnb_time=int(self.bnb_time/1000)
            self.bnb_time=time.localtime(self.bnb_time)
            self.bnb_time=time.strftime("%Y-%m-%d %H:%M:%S",self.bnb_time)


        
    def _s_on_close(self,s_ws):
        print("##connection closed##")
        
    def _s_on_error(self,s_ws,error):
        print(f"on error:{error}")
        
    def s_start(self):
        while True:
            try:
                s_ws=WebSocketApp(
                    url='wss://stream.binance.com:9443/ws',
                    on_open=self._s_on_open,
                    on_message=self._s_on_message,
                    on_error=self._s_on_error,
                    on_close=self._s_on_close
                    )
                s_ws.run_forever() 
            except Exception as e:
                 content="%s the spot webscoket is broken,check if it restart"%e
                 self._email(content)
                 gc.collect()   
                
    
    #because there are 7 kind of coin with spot and future price , so at
    #the begining , there maybe no data for self.bnb_price for the lack of liquidity,
    #In this case , python will raise IndexError
    #we need trying when TypeError is raised. 
    def retry_if_index_error(exception):
        return isinstance(exception,IndexError)
    
    @retry(retry_on_exception=retry_if_index_error)
    def get_basis(self):
        while True:
            self.btc_basis_dif = self.d_btc_price/self.btc_price-1
            self.eth_basis_dif = self.d_eth_price/self.eth_price-1
            self.bnb_basis_dif = self.d_bnb_price/self.bnb_price-1
            
            print("btc_basis_dif is %f" % self.btc_basis_dif)
            print("btc_d_price is %f %s"%(self.d_btc_price,self.d_btc_time))
            print("btc_s_price is %f %s"%(self.btc_price,self.btc_time))

            print("eth_basis_dif is %f"%self.eth_basis_dif)
            print("eth_d_price is %f %s"%(self.d_eth_price,self.d_eth_time))
            print("eth_s_price is %f %s"%(self.eth_price,self.eth_time))
            
            print("bnb_basis_dif is %f"%self.bnb_basis_dif)
            print("bnb_d_price is %f %s"%(self.d_bnb_price,self.d_bnb_time))
            print("bnb_s_price is %f %s"%(self.bnb_price,self.bnb_time))
            
            basis_dif_dict={
                "btc":[self.btc_basis_dif,self.btc_price,self.d_btc_price],
                "eth":[self.eth_basis_dif,self.eth_price,self.d_eth_price],
                "bnb":[self.bnb_basis_dif,self.bnb_price,self.d_bnb_price],
                }
            
            basis_dif_dict=sorted(basis_dif_dict.items(),key=lambda x:x[1],reverse=True)
            greatest_basis_dif=basis_dif_dict[0][1][0]        
            print("the greatest basis is %s %f,Spot pirice %f,future price %f"%(
                                basis_dif_dict[0][0],
                                greatest_basis_dif,
                                basis_dif_dict[0][1][1],
                                basis_dif_dict[0][1][2]
                                ))

            
            if greatest_basis_dif>self.basis_dif_threshold:
                if self._break_threshold == True:
                    content="the greatest basis is %s %f,Spot pirice %f,future price %f"%(
                                basis_dif_dict[0][0],
                                greatest_basis_dif,
                                basis_dif_dict[0][1][1],
                                basis_dif_dict[0][1][2]
                                )
                    self._email(content)
                    self._break_threshold = False
                    
            if greatest_basis_dif<self.basis_dif_threshold:
                self._break_threshold= True
            
            
            
    def _email(self,content):
        '''
        if the basis_dif reached the threshold, send an email

        param:content: which coin's bais_dif has reach the threshold at what price and when
        
        '''
        
        message=MIMEText(content,'plain',"utf-8")
        message['Subject']="from new greatest basis dif:basis difference reached the threshold"
        message['From']=self._sender
        message['to']=self._receivers[0]
        smtpObj=smtplib.SMTP()
        smtpObj.connect(self._mail_host,25)
        smtpObj.login(self._mail_user,self._mail_pwd)
        smtpObj.sendmail(self._sender,self._receivers,message.as_string())
        smtpObj.quit()
    

if __name__=="__main__":
    test=basis_dif()
    # because I want to recieve spot and contract price at the same time, so I create two threads 
    t1=threading.Thread(target=test.d_start)
    t2=threading.Thread(target=test.s_start)
    t1.start()
    t2.start()
    test.get_basis()