天天看點

java beep core入門

1.[url=http://www.beepcore.org/]BEEP[/url],全稱 塊可擴充交換協定(Blocks Extensible Exchange Protocol),是一個P2P應用程式協定架構(RFC3080, RFC3081),用于面向連接配接的、異步請求/響應消息。支援在一個傳輸連接配接上使用多路複用的消息流。支援二進制和文本消息, TLS, SASL/Anonymous, SASL/OTP。

2.有一個java的實作java beep core,目前最新版本是 0.9.08,可在[url]http://sourceforge.net/projects/beepcore-java/[/url]下載下傳。

3.一些基本概念

Peer 對等連接配接,端點,可以是用戶端或服務端

Session 會話,代表了兩個peer之間的連接配接

Channel 通道,用來收發消息。當Channel啟動時會被關聯一個Profile。可以有多個Channel共存在同一個Session中。這些端隻需要一個IP連接配接,這個連接配接随後被多路複用以建立通道。

Message 消息,是從一個peer發到另一個peer的request或reply。這個類封裝 MSG、RPY、ANS、ERR和NULL消息類型

Reply 回應,用于處理來自對等點的異步回答

Profile 概要檔案,定義了合法的request或reply格式

BEEP中有兩種角色,發起人和監聽者。

BEEP采用request-reply模型,request的消息類型是MSG,reply的消息類型有RPY、ANS、ERR和NULL。每個請求必須得有一個響應。

4.好處

[list]

[*]簡化TCP程式設計,關于這點我暫時還沒有體會到。

[*]另外為什麼不用web service呢?

BEEP 和web service的不同點是,BEEP是直接建立在TCP/IP 層的,而web service是建立在HTTP層之上的。

[/list]

5.一個例子

用戶端發MSG消息,說我要添加一條Person記錄到資料庫裡,服務端回應RPY消息,給出了使用者存到表裡的id

C: MSG <person>
         <name>Edd</name>
         <tel>1-234-567-7890</tel>
       </person>
S: RPY <person uid="123"/>
           

5.1用戶端

import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;

import org.beepcore.beep.core.BEEPError;
import org.beepcore.beep.core.BEEPException;
import org.beepcore.beep.core.BEEPInterruptedException;
import org.beepcore.beep.core.Channel;
import org.beepcore.beep.core.ProfileRegistry;
import org.beepcore.beep.core.Session;
import org.beepcore.beep.core.StringOutputDataStream;
import org.beepcore.beep.lib.Reply;
import org.beepcore.beep.transport.tcp.TCPSessionCreator;

public class Client {

	public static void main(String[] args) {
		Session session = null;
		try {
			//1.建立Session
			session = TCPSessionCreator.initiate("localhost", 6000,
					new ProfileRegistry());
		} catch (BEEPException e) {
			e.printStackTrace();
		}

		Channel channel = null;
		try {
			//2.啟動Channel
			channel = session.startChannel("http://example.org/profiles/ADDRESSBOOK");
		} catch (BEEPError e) {
			e.printStackTrace();
		} catch (BEEPException e) {
			e.printStackTrace();
		}

		String message = "<person><name>Edd</name>"
				+ "<tel>1-234-567-7890</tel></person>";
		StringOutputDataStream request = new StringOutputDataStream(message);
		request.setContentType("text/xml");
		Reply reply = new Reply();
		try {
			//3.發送請求消息
			channel.sendMSG(request, reply);
		} catch (BEEPException e) {
			e.printStackTrace();
		}

		try {
			//4.接收響應消息
            InputStream is = reply.getNextReply().getDataStream().getInputStream();
			StringBuilder reptxt = new StringBuilder();
			byte inputbuf[] = new byte[1024];
			int inputlen;
            while ((inputlen = is.read(inputbuf)) != -1) {
            	reptxt.append(new String(inputbuf, 0, inputlen, "utf-8"));
            }
			System.out.println("got back:" + reptxt);
		} catch (BEEPInterruptedException e) {
			e.printStackTrace();
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}

		try {
			//5.最後不要忘了關閉資源,不然服務端會報錯。
			channel.close();
			session.close();
		} catch (BEEPException e) {
			e.printStackTrace();
		}
	}

}
           

5.2 服務端

import org.beepcore.beep.core.BEEPError;
import org.beepcore.beep.core.BEEPException;
import org.beepcore.beep.core.Channel;
import org.beepcore.beep.core.CloseChannelException;
import org.beepcore.beep.core.MessageMSG;
import org.beepcore.beep.core.ProfileRegistry;
import org.beepcore.beep.core.RequestHandler;
import org.beepcore.beep.core.Session;
import org.beepcore.beep.core.StartChannelException;
import org.beepcore.beep.core.StartChannelListener;
import org.beepcore.beep.core.StringOutputDataStream;
import org.beepcore.beep.profile.Profile;
import org.beepcore.beep.profile.ProfileConfiguration;
import org.beepcore.beep.transport.tcp.TCPSessionCreator;

public class Server implements Profile, StartChannelListener, RequestHandler {

	@Override
	public StartChannelListener init(String uri, ProfileConfiguration config)
			throws BEEPException {
		System.out.println("Server.init");
		return this;
	}

	@Override
	public boolean advertiseProfile(Session session) throws BEEPException {
		System.out.println("Server.advertiseProfile");
		return true;
	}

	@Override
	public void startChannel(Channel channel, String encoding, String data)
			throws StartChannelException {
		System.out.println("Server.startChannel");
		channel.setRequestHandler(this);
	}

	@Override
	public void closeChannel(Channel channel) throws CloseChannelException {
		System.out.println("Server.closeChannel");
		channel.setRequestHandler(null);
	}

	@Override
	public void receiveMSG(MessageMSG message) {
		//Server的業務方法就是實作receiveMSG
		String replytxt = "<person uid=\"123\"/>";
		StringOutputDataStream reply = new StringOutputDataStream(replytxt);
		reply.setContentType("text/xml");
		try {
			//發送回應
			message.sendRPY(reply);
		} catch (BEEPException e) {
            try {
                message.sendERR(BEEPError.CODE_REQUESTED_ACTION_ABORTED,
                                "Error sending RPY");
            } catch (BEEPException x) {
                message.getChannel().getSession().terminate(x.getMessage());
            }
		}
	}

	public static void main(String[] args) {

		Server server = new Server();
		ProfileRegistry reg = new ProfileRegistry();
		String uri = "http://example.org/profiles/ADDRESSBOOK";
        // Initialize the profile and add it to the advertised profiles
        try {
			reg.addStartChannelListener(uri,
					server.init(uri, new ProfileConfiguration()),
					null);
			while (true) {
				//死循環用來監聽多個用戶端的連接配接
				System.out.println("Server listening.....");
				TCPSessionCreator.listen(6000, reg);
			}
		} catch (BEEPException e) {
			e.printStackTrace();
		}


	}

}
           

6.你也可以研究官方example裡自帶的Ping的例子。這個例子實作了用戶端ping服務端。

運作方法和結果如下

服務端

D:\eclipse\workspace\beep\bin>java -cp .;../lib/* org.beepcore.beep.example.Beepd -config ../src/org/beepcore/beep/example/beepd-config.xml
Beepd: started
           

用戶端

D:\eclipse\workspace\beep\bin>java -cp .;../lib/* org.beepcore.beep.example.Bing localhost
Reply from localhost: bytes=1024 time=2ms
Reply from localhost: bytes=1024 time=1ms
Reply from localhost: bytes=1024 time=3ms
Reply from localhost: bytes=1024 time=1ms
           

7.依賴的jar包

[list]

[*]commons-logging.jar

[*]concurrent.jar

[*]beeptls-jsse.jar (如果用到[url=http://zh.wikipedia.org/wiki/TLS]TLS[/url]的話需要)

[/list]

8.一些好的參考資料

XML 觀察: 剖析 BEEP http://www.ibm.com/developerworks/cn/xml/x-watch/part2/index.html 我第5節的代碼就是根據這個頁面改的,因為原文曆史比較久遠了,一些代碼已經和目前的beep core對不上了,是以我改了一下以通過測試。

繼續閱讀