天天看點

Redis實作類似同步方法調用的功能(二)

接上一篇,這麼幹純粹是為了好玩。

上一篇的部落格中的例子隻能處理一個Server對一個Client的情況,今天修改了一版,可以支援一個Server對多個Client。實作方式就是Server每派發一個動作就扔到一個線程裡去,Client也類似每收到一個資料,就起一個線程去做自己的邏輯。這樣看起來就有點像socket變成了。

import redis
import time
import json
import threading

host = 'localhost'
port = 6322
queue = 'myqueue'


class Server(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        pool = redis.BlockingConnectionPool(host=host, port=port, db='0')
        conn = redis.Redis(connection_pool=pool)
        idx = 0

        while True:
            idx = idx + 1

            key = str(idx)
            data = "request_" + key

            threading.Thread(target=ServerHandler(conn, key, data).handle).start()

            time.sleep(1)


class ServerHandler(object):

    def __init__(self, conn, key, data):
        self.conn = conn
        self.key = key
        self.data = data

    def handle(self):
        request = {'id': self.key, 'data': self.data}
        print 'Server: Send request: %s' % request
        self.conn.lpush(queue, json.dumps(request))

        response = self.conn.brpop(self.key, 2)
        if response:
            print 'Server: Receive response: %s' % response[1]
        else:
            print "Server: Timeout!!!"


class Client(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        pool = redis.BlockingConnectionPool(host=host, port=port, db='0')
        conn = redis.Redis(connection_pool=pool)

        while True:
            msg = conn.brpop(queue)[1]
            threading.Thread(target=ClientHandler(conn, msg).handle).start()


class ClientHandler(object):

    def __init__(self, conn, msg):
        self.conn = conn
        self.msg = msg

    def handle(self):
        print 'Client: Receive request: %s' % self.msg

        time.sleep(0.1)

        d = json.loads(self.msg)
        key = d.get('id')
        d['data'] = "response_" + key
        print 'Client: Send response: %s' % d
        self.conn.lpush(key, json.dumps(d))
        self.conn.expire(key, 5)


server = Server()
server.start()

client = Client()
client.start()           

複制