天天看点

41. Python Queue 多进程的消息队列 PIPE

消息队列:

消息队列是在消息传输过程中保存消息的容器。

消息队列最经典的用法就是消费者和生产者之间通过消息管道来传递消息,消费者和生产生是不通的进程。生产者往管道中写消息,消费者从管道中读消息。

相当于水管,有一个入口和出口,水从入口流入出口流出,这就是一个消息队列

线程或进程往队列里面添加数据,出口从队列里面读数据

左侧多线程往入口处添加完数据,任务就结束了;右侧只要依次从水管里取数据就行了。

异步完成的任务

比如京东下单,下单后付完钱,相当于把消息堆在了水管里,后台会有线程去接收这个单的消息,然后去库房,发货,走物流,直到接收货物并签收完,点击完成,整个流程才走完。

客户交完钱后,丢了个消息在这个队列中,会给客户返回一个结果,告知你已经买了这个商品;而后面接收订单消息,发货,物流都是后面的"进程"或"线程"干的事情。

所以,一般在异步处理问题时候,都会用到消息队列处理的这种思想。

使用multiprocessing里面的Queue来实现消息队列。

语法:

<code>from</code> <code>mutliprocessing </code><code>import</code> <code>Queue</code>

<code>q </code><code>=</code> <code>Queue</code>

<code>q.put(data)</code>

<code>data </code><code>=</code> <code>q.get(data)</code>

举例:

<code>from</code> <code>multiprocessing </code><code>import</code> <code>Queue, Process</code>

<code>def</code> <code>write(q):</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>[</code><code>'a'</code><code>,</code><code>'b'</code><code>,</code><code>'c'</code><code>,</code><code>'d'</code><code>]:</code>

<code>        </code><code>q.put(i)</code>

<code>        </code><code>print</code> <code>(</code><code>'put {0} to queue'</code><code>.</code><code>format</code><code>(i))</code>

<code>def</code> <code>read(q):</code>

<code>    </code><code>while</code> <code>1</code><code>:</code>

<code>        </code><code>result </code><code>=</code> <code>q.get()</code>

<code>        </code><code>print</code> <code>(</code><code>"get {0} from queue"</code><code>.</code><code>format</code><code>(result))</code>

<code>def</code> <code>main():</code>

<code>    </code><code>q </code><code>=</code> <code>Queue()</code>

<code>    </code><code>pw </code><code>=</code> <code>Process(target</code><code>=</code><code>write,args</code><code>=</code><code>(q,))</code>

<code>    </code><code>pr </code><code>=</code> <code>Process(target</code><code>=</code><code>read,args</code><code>=</code><code>(q,))</code>

<code>    </code><code>pw.start()</code>

<code>    </code><code>pr.start()</code>

<code>    </code><code>pw.join()</code>

<code>    </code><code>pr.terminate()  </code><code>#停止</code>

<code>    </code><code># 相当于join,等pr完成以后,当whlie没有任何执行后,结束。</code>

<code>if</code> <code>__name__ </code><code>=</code><code>=</code> <code>'__main__'</code><code>:</code>

<code>    </code><code>main()</code>

返回结果:

<code>put a to queue</code>

<code>get a from queue</code>

<code>put b to queue</code>

<code>get b from queue</code>

<code>put c to queue</code>

<code>get c from queue</code>

<code>put d to queue</code>

<code>get d from queue</code>

PIPE:

多进程里面有个pipe的方法来实现消息队列:

1. Pipe 方法返回(conn1, conn2)代表一个管道的两端。PIPE方法有个deplex参数,如果deplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接收消息,conn2负责发送消息。

2.send 和recv方法分别是发送和接受消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。

<code>import</code> <code>time</code>

<code>from</code> <code>multiprocessing </code><code>import</code> <code>Pipe, Process</code>

<code>def</code> <code>proc1(pipe):</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>xrange</code><code>(</code><code>1</code><code>, </code><code>10</code><code>):</code>

<code>        </code><code>pipe.send(i)</code>

<code>        </code><code>print</code> <code>(</code><code>"send {0} to pipe"</code><code>.</code><code>format</code><code>(i))</code>

<code>        </code><code>time.sleep(</code><code>1</code><code>)</code>

<code>def</code> <code>proc2(pipe):</code>

<code>    </code><code>n </code><code>=</code> <code>9</code>

<code>    </code><code>while</code> <code>n &gt; </code><code>0</code><code>:</code>

<code>        </code><code>result </code><code>=</code> <code>pipe.recv()</code>

<code>        </code><code>print</code> <code>(</code><code>"recv {0} from pipe"</code><code>.</code><code>format</code><code>(result))</code>

<code>        </code><code>n </code><code>-</code><code>=</code> <code>1</code>

<code>    </code><code>pipe </code><code>=</code> <code>Pipe(duplex</code><code>=</code><code>False</code><code>)</code>

<code>    </code><code>print</code> <code>(</code><code>type</code><code>(pipe))</code>

<code>    </code><code>p1 </code><code>=</code> <code>Process(target</code><code>=</code><code>proc1, args</code><code>=</code><code>(pipe[</code><code>1</code><code>],))</code>

<code>    </code><code>p2 </code><code>=</code> <code>Process(target</code><code>=</code><code>proc2, args</code><code>=</code><code>(pipe[</code><code>0</code><code>],)) </code><code>#接收写0</code>

<code>    </code><code>p1.start()</code>

<code>    </code><code>p2.start()</code>

<code>    </code><code>p1.join()</code>

<code>    </code><code>p2.join()</code>

<code>    </code><code>pipe[</code><code>0</code><code>].close()</code>

<code>    </code><code>pipe[</code><code>1</code><code>].close()</code>

返回结果(逐行打印):

本文转自 听丶飞鸟说 51CTO博客,原文链接:http://blog.51cto.com/286577399/2051155