我的方法是利用存儲過程,遊标,先根據條件擷取要删除的主鍵,然後依據主鍵删除,考慮到删除50億條記錄耗費将近7天的時間(事後得出),必須背景執行。使用python 工具寫一個腳本,可以針對多個伺服器進行并行操作。
1 在各個伺服器上建立存過!
delimiter //
CREATE PROCEDURE `proc_del_tab`(in com_num int , in push_time datetime )
begin
declare curid bigint ;
DECLARE rowid bigint ;
declare no_more_departments int ;
declare curs cursor for
select id
from
tab
WHERE
v3
DECLARE CONTINUE HANDLER FOR NOT FOUND SET no_more_departments = 1;
SET no_more_departments=0;
set rowid = 1 ;
set autocommit = 0 ;
open curs ;
REPEAT
fetch curs into curid ;
delete from tab where id = curid ;
set rowid = rowid + 1 ;
if rowid % com_num = 0
then
commit;
end if ;
UNTIL no_more_departments
END REPEAT;
commit ;
close curs ;
end;
//
delimiter ;
2 部署python 腳本:
#!/usr/bin/env python
from MySQLdb import *
import sys
import threading
import time
import os
def now() :
#return str('2011-01-31 00:00:00')
return str( time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime() ) )
def log( strs , logs ) :
f = file( logs , 'a' , 0 )
f.write( now() + ' ' + str(strs) + '\n' )
f.close()
def delining( cur , logs ) :
sql = "SET SQL_LOG_BIN=0"
try :
cur['dsn'].execute( sql )
except Exception , e :
log( 'Set SQL_LOG_BIN OFF' + str(e) , logs )
sql = "call proc_del_tab_yang( 3000 , '%s' )" % ('2011-01-31 00:00:00')
log( 'starting process %s' % ( cur['addr'] ) , logs )
log( 'Execute Procedure ' + str(e) , logs )
sql = "SET SQL_LOG_BIN=1"
log( 'Set SQL_LOG_BIN ON' + str(e) , logs )
log( 'process %s End' % ( cur['addr'] ) , logs )
def main() :
logs = "/root/yangql/python/del_test_tab.log"
server_list=['10.250.7.110']
luser="yang"
lpasswd="yang"
con = []
for addr in server_list :
cons = None
try :
cons = connect( host = addr , user = luser , passwd = lpasswd , port = 3307 , db = 'newcloudapp' )
except Exception , e :
log( 'On Connect %s ' % ( addr ) + str(e) , logs )
continue
con.append( { 'dsn':cons , 'addr':addr } )
cur = []
for cons in con :
cur.append( { 'dsn':cons['dsn'].cursor( cursorclass = cursors.DictCursor ) , 'addr':cons['addr'] } )
log( 'On Cusros %s ' % ( cons['addr'] ) + str(e) , logs )
thpool = []
for curs in cur :
th = threading.Thread(target = delining ,args=( curs , logs ) )
thpool.append( th )
for th in thpool :
th.start()
threading.Thread.join( th )
while True :
if threading.activeCount()
break
else :
time.sleep(1)
curs['dsn'].close()
log( 'On Close Cusros %s ' % ( curs['addr'] ) + str(e) , logs )
cons['dsn'].close()
log( 'On Close Connect %s ' % ( str(e) ) , logs )
if __name__ == '__main__' :
main()
歡迎大家提出更好的方法。。