天天看點

Redis筆記(七)Java實作Redis消息隊列

這裡我使用redis的釋出、訂閱功能實作簡單的消息隊列,基本的指令有publish、subscribe等。

在jedis中,有對應的java方法,但是隻能釋出字元串消息。為了傳輸對象,需要将對象進行序列化,并封裝成字元串進行處理。

1.封裝一個消息對象

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

<code>public</code> <code>class</code> <code>message </code><code>implements</code> <code>serializable{</code>

<code>private</code> <code>static</code> <code>final</code> <code>long</code> <code>serialversionuid = 1l;</code>

<code>private</code> <code>string titile;</code>

<code>private</code> <code>string info;</code>

<code>public</code> <code>message(string titile,string info){</code>

<code>this</code><code>.titile=titile;</code>

<code>this</code><code>.info=info;</code>

<code>}</code>

<code>public</code> <code>string gettitile() {</code>

<code>return</code> <code>titile;</code>

<code>public</code> <code>void</code> <code>settitile(string titile) {</code>

<code>this</code><code>.titile = titile;</code>

<code>public</code> <code>string getinfo() {</code>

<code>return</code> <code>info;</code>

<code>public</code> <code>void</code> <code>setinfo(string info) {</code>

<code>this</code><code>.info = info;</code>

  

2.為這個消息對象提供序列化方法

<code>public</code> <code>class</code> <code>messageutil {</code>

<code>//convert to string</code>

<code>public</code> <code>static</code> <code>string converttostring(object obj,string charset) </code><code>throws</code> <code>ioexception{</code>

<code>bytearrayoutputstream bo = </code><code>new</code> <code>bytearrayoutputstream();</code>

<code>objectoutputstream oo = </code><code>new</code> <code>objectoutputstream(bo);</code>

<code>oo.writeobject(obj);</code>

<code>string str = bo.tostring(charset);</code>

<code>bo.close();</code>

<code>oo.close();</code>

<code>return</code> <code>str;</code>

<code>//convert to message</code>

<code>public</code> <code>static</code> <code>object converttomessage(</code><code>byte</code><code>[] bytes) </code><code>throws</code> <code>exception{</code>

<code>bytearrayinputstream in = </code><code>new</code> <code>bytearrayinputstream(bytes);</code>

<code>objectinputstream sin = </code><code>new</code> <code>objectinputstream(in);</code>

<code>return</code> <code>sin.readobject();</code>

3.從jedis連接配接池中擷取連接配接

<code>public</code> <code>class</code> <code>redisutil {</code>

<code>/**</code>

<code>* jedis connection pool</code>

<code>* @title: config</code>

<code>*/</code>

<code>public</code> <code>static</code> <code>jedispool getjedispool(){</code>

<code>resourcebundle bundle=resourcebundle.getbundle(</code><code>"redis"</code><code>);</code>

<code>string host=bundle.getstring(</code><code>"host"</code><code>);</code>

<code>int</code> <code>port=integer.valueof(bundle.getstring(</code><code>"port"</code><code>));</code>

<code>int</code> <code>timeout=integer.valueof(bundle.getstring(</code><code>"timeout"</code><code>));</code>

<code>//  string password=bundle.getstring("password");</code>

<code>jedispoolconfig config=</code><code>new</code> <code>jedispoolconfig();</code>

<code>config.setmaxactive(integer.valueof(bundle.getstring(</code><code>"maxactive"</code><code>)));</code>

<code>config.setmaxwait(integer.valueof(bundle.getstring(</code><code>"maxwait"</code><code>)));</code>

<code>config.settestonborrow(boolean.valueof(bundle.getstring(</code><code>"testonborrow"</code><code>)));</code>

<code>config.settestonreturn(boolean.valueof(bundle.getstring(</code><code>"testonreturn"</code><code>)));</code>

<code>jedispool pool=</code><code>new</code> <code>jedispool(config, host, port, timeout);</code>

<code>return</code> <code>pool;</code>

<code>public</code> <code>class</code> <code>producer {</code>

<code>private</code> <code>jedis jedis;</code>

<code>private</code> <code>jedispool pool;</code>

<code>public</code> <code>producer(){</code>

<code>pool=redisutil.getjedispool();</code>

<code>jedis = pool.getresource();</code>

<code>public</code> <code>void</code> <code>provide(string channel,message message) </code><code>throws</code> <code>ioexception{</code>

<code>string str1=messageutil.converttostring(channel,</code><code>"utf-8"</code><code>);</code>

<code>string str2=messageutil.converttostring(message,</code><code>"utf-8"</code><code>);</code>

<code>jedis.publish(str1, str2);</code>

<code>//close the connection</code>

<code>public</code> <code>void</code> <code>close() </code><code>throws</code> <code>ioexception {</code>

<code>//将jedis對象歸還給連接配接池,關閉連接配接</code>

<code>pool.returnresource(jedis);</code>

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

<code>public</code> <code>class</code> <code>consumer {</code>

<code>public</code> <code>consumer(){</code>

<code>public</code> <code>void</code> <code>consum(string channel) </code><code>throws</code> <code>ioexception{</code>

<code>jedispubsub jedispubsub = </code><code>new</code> <code>jedispubsub() {</code>

<code>// 取得訂閱的消息後的處理</code>

<code>public</code> <code>void</code> <code>onmessage(string channel, string message) {</code>

<code>system.out.println(</code><code>"channel:"</code><code>+channel);</code>

<code>system.out.println(</code><code>"message:"</code><code>+message.tostring());</code>

<code>// 初始化訂閱時候的處理</code>

<code>public</code> <code>void</code> <code>onsubscribe(string channel, </code><code>int</code> <code>subscribedchannels) {</code>

<code>system.out.println(</code><code>"onsubscribe:"</code><code>+channel);</code>

<code>// 取消訂閱時候的處理</code>

<code>public</code> <code>void</code> <code>onunsubscribe(string channel, </code><code>int</code> <code>subscribedchannels) {</code>

<code>system.out.println(</code><code>"onunsubscribe:"</code><code>+channel);</code>

<code>// 初始化按表達式的方式訂閱時候的處理</code>

<code>public</code> <code>void</code> <code>onpsubscribe(string pattern, </code><code>int</code> <code>subscribedchannels) {</code>

<code>// system.out.println(pattern + "=" + subscribedchannels);</code>

<code>// 取消按表達式的方式訂閱時候的處理</code>

<code>public</code> <code>void</code> <code>onpunsubscribe(string pattern, </code><code>int</code> <code>subscribedchannels) {</code>

<code>// 取得按表達式的方式訂閱的消息後的處理</code>

<code>public</code> <code>void</code> <code>onpmessage(string pattern, string channel, string message) {</code>

<code>system.out.println(pattern + </code><code>"="</code> <code>+ channel + </code><code>"="</code> <code>+ message);</code>

<code>};</code>

<code>jedis.subscribe(jedispubsub, channel);</code>

<code>//将jedis對象歸還給連接配接池</code>

<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args){</code>

<code>message msg=</code><code>new</code> <code>message(</code><code>"hello!"</code><code>, </code><code>"this is the first message!"</code><code>);</code>

<code>producer producer=</code><code>new</code> <code>producer();</code>

<code>consumer consumer=</code><code>new</code> <code>consumer();</code>

<code>try</code> <code>{</code>

<code>producer.provide(</code><code>"chn1"</code><code>,msg);</code>

<code>consumer.consum(</code><code>"chn1"</code><code>);</code>

<code>} </code><code>catch</code> <code>(ioexception e) {</code>

<code>e.printstacktrace();</code>