天天看點

【MySQL】删除大量資料的具體實作

我的方法是利用存儲過程,遊标,先根據條件擷取要删除的主鍵,然後依據主鍵删除,考慮到删除50億條記錄耗費将近7天的時間(事後得出),必須背景執行。使用python 工具寫一個腳本,可以針對多個伺服器進行并行操作。

1 在各個伺服器上建立存過!

delimiter //

CREATE  PROCEDURE `proc_del_tab`(in com_num int , in push_time datetime ) 

begin

    declare curid bigint ;

    DECLARE rowid bigint ;

    declare no_more_departments int ;

    declare curs cursor for

        select id

        from

            tab

        WHERE

            v3

    DECLARE CONTINUE HANDLER FOR NOT FOUND SET no_more_departments = 1;

    SET no_more_departments=0;

    set rowid = 1 ;

    set autocommit = 0 ;

    open curs ;

    REPEAT

        fetch curs into curid ;

        delete from tab where id = curid ;

        set rowid = rowid + 1 ;

        if rowid % com_num = 0

        then

            commit;

        end if ;

    UNTIL no_more_departments

    END REPEAT;

    commit ;

    close curs ;

end;

//

delimiter ; 

2 部署python 腳本:

#!/usr/bin/env python

from MySQLdb import *

import sys

import threading

import time

import os

def now() :

        #return str('2011-01-31 00:00:00')

        return str( time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime() ) )

def log( strs , logs ) :

        f = file( logs , 'a' , 0 )

        f.write( now() + ' ' + str(strs) + '\n' )

        f.close()

def delining( cur , logs ) :

        sql = "SET SQL_LOG_BIN=0"

        try :

                cur['dsn'].execute( sql )

        except Exception , e :

                log( 'Set SQL_LOG_BIN OFF' + str(e) , logs )

        sql = "call proc_del_tab_yang( 3000 , '%s' )" % ('2011-01-31 00:00:00')

        log( 'starting process %s' % ( cur['addr'] ) , logs )

                log( 'Execute Procedure ' + str(e) , logs )

        sql = "SET SQL_LOG_BIN=1"

                log( 'Set SQL_LOG_BIN ON' + str(e) , logs )

        log( 'process %s End' % ( cur['addr'] ) , logs )

def main() :

        logs = "/root/yangql/python/del_test_tab.log"

        server_list=['10.250.7.110']

        luser="yang"

        lpasswd="yang"

        con = []

        for addr in server_list :

                cons = None

                try :

                        cons = connect( host = addr , user = luser , passwd = lpasswd , port = 3307 , db = 'newcloudapp' )

                except Exception , e :

                        log( 'On Connect %s ' % ( addr ) + str(e) , logs )

                        continue

                con.append(  { 'dsn':cons , 'addr':addr } )

        cur = []

        for cons in con :

                        cur.append( { 'dsn':cons['dsn'].cursor( cursorclass = cursors.DictCursor ) , 'addr':cons['addr'] } )

                        log( 'On Cusros %s ' % ( cons['addr'] ) + str(e)  , logs )

        thpool = []

        for curs in cur :

                th = threading.Thread(target = delining ,args=( curs , logs ) )

                thpool.append( th )

        for th in thpool :

                th.start()

                threading.Thread.join( th )

        while True :

                if threading.activeCount()

                        break

                else :

                        time.sleep(1)

                        curs['dsn'].close()

                        log( 'On Close Cusros %s ' % ( curs['addr'] ) + str(e)  , logs )

                        cons['dsn'].close()

                        log( 'On Close Connect %s ' % ( str(e)  ) , logs )

if __name__ == '__main__' :

        main()

歡迎大家提出更好的方法。。