這裡我使用redis的釋出、訂閱功能實作簡單的消息隊列,基本的指令有publish、subscribe等。
在jedis中,有對應的java方法,但是隻能釋出字元串消息。為了傳輸對象,需要将對象進行序列化,并封裝成字元串進行處理。
1.封裝一個消息對象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<code>public</code> <code>class</code> <code>message </code><code>implements</code> <code>serializable{</code>
<code>private</code> <code>static</code> <code>final</code> <code>long</code> <code>serialversionuid = 1l;</code>
<code>private</code> <code>string titile;</code>
<code>private</code> <code>string info;</code>
<code>public</code> <code>message(string titile,string info){</code>
<code>this</code><code>.titile=titile;</code>
<code>this</code><code>.info=info;</code>
<code>}</code>
<code>public</code> <code>string gettitile() {</code>
<code>return</code> <code>titile;</code>
<code>public</code> <code>void</code> <code>settitile(string titile) {</code>
<code>this</code><code>.titile = titile;</code>
<code>public</code> <code>string getinfo() {</code>
<code>return</code> <code>info;</code>
<code>public</code> <code>void</code> <code>setinfo(string info) {</code>
<code>this</code><code>.info = info;</code>
2.為這個消息對象提供序列化方法
<code>public</code> <code>class</code> <code>messageutil {</code>
<code>//convert to string</code>
<code>public</code> <code>static</code> <code>string converttostring(object obj,string charset) </code><code>throws</code> <code>ioexception{</code>
<code>bytearrayoutputstream bo = </code><code>new</code> <code>bytearrayoutputstream();</code>
<code>objectoutputstream oo = </code><code>new</code> <code>objectoutputstream(bo);</code>
<code>oo.writeobject(obj);</code>
<code>string str = bo.tostring(charset);</code>
<code>bo.close();</code>
<code>oo.close();</code>
<code>return</code> <code>str;</code>
<code>//convert to message</code>
<code>public</code> <code>static</code> <code>object converttomessage(</code><code>byte</code><code>[] bytes) </code><code>throws</code> <code>exception{</code>
<code>bytearrayinputstream in = </code><code>new</code> <code>bytearrayinputstream(bytes);</code>
<code>objectinputstream sin = </code><code>new</code> <code>objectinputstream(in);</code>
<code>return</code> <code>sin.readobject();</code>
3.從jedis連接配接池中擷取連接配接
<code>public</code> <code>class</code> <code>redisutil {</code>
<code>/**</code>
<code>* jedis connection pool</code>
<code>* @title: config</code>
<code>*/</code>
<code>public</code> <code>static</code> <code>jedispool getjedispool(){</code>
<code>resourcebundle bundle=resourcebundle.getbundle(</code><code>"redis"</code><code>);</code>
<code>string host=bundle.getstring(</code><code>"host"</code><code>);</code>
<code>int</code> <code>port=integer.valueof(bundle.getstring(</code><code>"port"</code><code>));</code>
<code>int</code> <code>timeout=integer.valueof(bundle.getstring(</code><code>"timeout"</code><code>));</code>
<code>// string password=bundle.getstring("password");</code>
<code>jedispoolconfig config=</code><code>new</code> <code>jedispoolconfig();</code>
<code>config.setmaxactive(integer.valueof(bundle.getstring(</code><code>"maxactive"</code><code>)));</code>
<code>config.setmaxwait(integer.valueof(bundle.getstring(</code><code>"maxwait"</code><code>)));</code>
<code>config.settestonborrow(boolean.valueof(bundle.getstring(</code><code>"testonborrow"</code><code>)));</code>
<code>config.settestonreturn(boolean.valueof(bundle.getstring(</code><code>"testonreturn"</code><code>)));</code>
<code>jedispool pool=</code><code>new</code> <code>jedispool(config, host, port, timeout);</code>
<code>return</code> <code>pool;</code>
<code>public</code> <code>class</code> <code>producer {</code>
<code>private</code> <code>jedis jedis;</code>
<code>private</code> <code>jedispool pool;</code>
<code>public</code> <code>producer(){</code>
<code>pool=redisutil.getjedispool();</code>
<code>jedis = pool.getresource();</code>
<code>public</code> <code>void</code> <code>provide(string channel,message message) </code><code>throws</code> <code>ioexception{</code>
<code>string str1=messageutil.converttostring(channel,</code><code>"utf-8"</code><code>);</code>
<code>string str2=messageutil.converttostring(message,</code><code>"utf-8"</code><code>);</code>
<code>jedis.publish(str1, str2);</code>
<code>//close the connection</code>
<code>public</code> <code>void</code> <code>close() </code><code>throws</code> <code>ioexception {</code>
<code>//将jedis對象歸還給連接配接池,關閉連接配接</code>
<code>pool.returnresource(jedis);</code>
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<code>public</code> <code>class</code> <code>consumer {</code>
<code>public</code> <code>consumer(){</code>
<code>public</code> <code>void</code> <code>consum(string channel) </code><code>throws</code> <code>ioexception{</code>
<code>jedispubsub jedispubsub = </code><code>new</code> <code>jedispubsub() {</code>
<code>// 取得訂閱的消息後的處理</code>
<code>public</code> <code>void</code> <code>onmessage(string channel, string message) {</code>
<code>system.out.println(</code><code>"channel:"</code><code>+channel);</code>
<code>system.out.println(</code><code>"message:"</code><code>+message.tostring());</code>
<code>// 初始化訂閱時候的處理</code>
<code>public</code> <code>void</code> <code>onsubscribe(string channel, </code><code>int</code> <code>subscribedchannels) {</code>
<code>system.out.println(</code><code>"onsubscribe:"</code><code>+channel);</code>
<code>// 取消訂閱時候的處理</code>
<code>public</code> <code>void</code> <code>onunsubscribe(string channel, </code><code>int</code> <code>subscribedchannels) {</code>
<code>system.out.println(</code><code>"onunsubscribe:"</code><code>+channel);</code>
<code>// 初始化按表達式的方式訂閱時候的處理</code>
<code>public</code> <code>void</code> <code>onpsubscribe(string pattern, </code><code>int</code> <code>subscribedchannels) {</code>
<code>// system.out.println(pattern + "=" + subscribedchannels);</code>
<code>// 取消按表達式的方式訂閱時候的處理</code>
<code>public</code> <code>void</code> <code>onpunsubscribe(string pattern, </code><code>int</code> <code>subscribedchannels) {</code>
<code>// 取得按表達式的方式訂閱的消息後的處理</code>
<code>public</code> <code>void</code> <code>onpmessage(string pattern, string channel, string message) {</code>
<code>system.out.println(pattern + </code><code>"="</code> <code>+ channel + </code><code>"="</code> <code>+ message);</code>
<code>};</code>
<code>jedis.subscribe(jedispubsub, channel);</code>
<code>//将jedis對象歸還給連接配接池</code>
<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args){</code>
<code>message msg=</code><code>new</code> <code>message(</code><code>"hello!"</code><code>, </code><code>"this is the first message!"</code><code>);</code>
<code>producer producer=</code><code>new</code> <code>producer();</code>
<code>consumer consumer=</code><code>new</code> <code>consumer();</code>
<code>try</code> <code>{</code>
<code>producer.provide(</code><code>"chn1"</code><code>,msg);</code>
<code>consumer.consum(</code><code>"chn1"</code><code>);</code>
<code>} </code><code>catch</code> <code>(ioexception e) {</code>
<code>e.printstacktrace();</code>