1.[url=http://www.beepcore.org/]BEEP[/url],全稱 塊可擴充交換協定(Blocks Extensible Exchange Protocol),是一個P2P應用程式協定架構(RFC3080, RFC3081),用于面向連接配接的、異步請求/響應消息。支援在一個傳輸連接配接上使用多路複用的消息流。支援二進制和文本消息, TLS, SASL/Anonymous, SASL/OTP。
2.有一個java的實作java beep core,目前最新版本是 0.9.08,可在[url]http://sourceforge.net/projects/beepcore-java/[/url]下載下傳。
3.一些基本概念
Peer 對等連接配接,端點,可以是用戶端或服務端
Session 會話,代表了兩個peer之間的連接配接
Channel 通道,用來收發消息。當Channel啟動時會被關聯一個Profile。可以有多個Channel共存在同一個Session中。這些端隻需要一個IP連接配接,這個連接配接随後被多路複用以建立通道。
Message 消息,是從一個peer發到另一個peer的request或reply。這個類封裝 MSG、RPY、ANS、ERR和NULL消息類型
Reply 回應,用于處理來自對等點的異步回答
Profile 概要檔案,定義了合法的request或reply格式
BEEP中有兩種角色,發起人和監聽者。
BEEP采用request-reply模型,request的消息類型是MSG,reply的消息類型有RPY、ANS、ERR和NULL。每個請求必須得有一個響應。
4.好處
[list]
[*]簡化TCP程式設計,關于這點我暫時還沒有體會到。
[*]另外為什麼不用web service呢?
BEEP 和web service的不同點是,BEEP是直接建立在TCP/IP 層的,而web service是建立在HTTP層之上的。
[/list]
5.一個例子
用戶端發MSG消息,說我要添加一條Person記錄到資料庫裡,服務端回應RPY消息,給出了使用者存到表裡的id
C: MSG <person>
<name>Edd</name>
<tel>1-234-567-7890</tel>
</person>
S: RPY <person uid="123"/>
5.1用戶端
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import org.beepcore.beep.core.BEEPError;
import org.beepcore.beep.core.BEEPException;
import org.beepcore.beep.core.BEEPInterruptedException;
import org.beepcore.beep.core.Channel;
import org.beepcore.beep.core.ProfileRegistry;
import org.beepcore.beep.core.Session;
import org.beepcore.beep.core.StringOutputDataStream;
import org.beepcore.beep.lib.Reply;
import org.beepcore.beep.transport.tcp.TCPSessionCreator;
public class Client {
public static void main(String[] args) {
Session session = null;
try {
//1.建立Session
session = TCPSessionCreator.initiate("localhost", 6000,
new ProfileRegistry());
} catch (BEEPException e) {
e.printStackTrace();
}
Channel channel = null;
try {
//2.啟動Channel
channel = session.startChannel("http://example.org/profiles/ADDRESSBOOK");
} catch (BEEPError e) {
e.printStackTrace();
} catch (BEEPException e) {
e.printStackTrace();
}
String message = "<person><name>Edd</name>"
+ "<tel>1-234-567-7890</tel></person>";
StringOutputDataStream request = new StringOutputDataStream(message);
request.setContentType("text/xml");
Reply reply = new Reply();
try {
//3.發送請求消息
channel.sendMSG(request, reply);
} catch (BEEPException e) {
e.printStackTrace();
}
try {
//4.接收響應消息
InputStream is = reply.getNextReply().getDataStream().getInputStream();
StringBuilder reptxt = new StringBuilder();
byte inputbuf[] = new byte[1024];
int inputlen;
while ((inputlen = is.read(inputbuf)) != -1) {
reptxt.append(new String(inputbuf, 0, inputlen, "utf-8"));
}
System.out.println("got back:" + reptxt);
} catch (BEEPInterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
try {
//5.最後不要忘了關閉資源,不然服務端會報錯。
channel.close();
session.close();
} catch (BEEPException e) {
e.printStackTrace();
}
}
}
5.2 服務端
import org.beepcore.beep.core.BEEPError;
import org.beepcore.beep.core.BEEPException;
import org.beepcore.beep.core.Channel;
import org.beepcore.beep.core.CloseChannelException;
import org.beepcore.beep.core.MessageMSG;
import org.beepcore.beep.core.ProfileRegistry;
import org.beepcore.beep.core.RequestHandler;
import org.beepcore.beep.core.Session;
import org.beepcore.beep.core.StartChannelException;
import org.beepcore.beep.core.StartChannelListener;
import org.beepcore.beep.core.StringOutputDataStream;
import org.beepcore.beep.profile.Profile;
import org.beepcore.beep.profile.ProfileConfiguration;
import org.beepcore.beep.transport.tcp.TCPSessionCreator;
public class Server implements Profile, StartChannelListener, RequestHandler {
@Override
public StartChannelListener init(String uri, ProfileConfiguration config)
throws BEEPException {
System.out.println("Server.init");
return this;
}
@Override
public boolean advertiseProfile(Session session) throws BEEPException {
System.out.println("Server.advertiseProfile");
return true;
}
@Override
public void startChannel(Channel channel, String encoding, String data)
throws StartChannelException {
System.out.println("Server.startChannel");
channel.setRequestHandler(this);
}
@Override
public void closeChannel(Channel channel) throws CloseChannelException {
System.out.println("Server.closeChannel");
channel.setRequestHandler(null);
}
@Override
public void receiveMSG(MessageMSG message) {
//Server的業務方法就是實作receiveMSG
String replytxt = "<person uid=\"123\"/>";
StringOutputDataStream reply = new StringOutputDataStream(replytxt);
reply.setContentType("text/xml");
try {
//發送回應
message.sendRPY(reply);
} catch (BEEPException e) {
try {
message.sendERR(BEEPError.CODE_REQUESTED_ACTION_ABORTED,
"Error sending RPY");
} catch (BEEPException x) {
message.getChannel().getSession().terminate(x.getMessage());
}
}
}
public static void main(String[] args) {
Server server = new Server();
ProfileRegistry reg = new ProfileRegistry();
String uri = "http://example.org/profiles/ADDRESSBOOK";
// Initialize the profile and add it to the advertised profiles
try {
reg.addStartChannelListener(uri,
server.init(uri, new ProfileConfiguration()),
null);
while (true) {
//死循環用來監聽多個用戶端的連接配接
System.out.println("Server listening.....");
TCPSessionCreator.listen(6000, reg);
}
} catch (BEEPException e) {
e.printStackTrace();
}
}
}
6.你也可以研究官方example裡自帶的Ping的例子。這個例子實作了用戶端ping服務端。
運作方法和結果如下
服務端
D:\eclipse\workspace\beep\bin>java -cp .;../lib/* org.beepcore.beep.example.Beepd -config ../src/org/beepcore/beep/example/beepd-config.xml
Beepd: started
用戶端
D:\eclipse\workspace\beep\bin>java -cp .;../lib/* org.beepcore.beep.example.Bing localhost
Reply from localhost: bytes=1024 time=2ms
Reply from localhost: bytes=1024 time=1ms
Reply from localhost: bytes=1024 time=3ms
Reply from localhost: bytes=1024 time=1ms
7.依賴的jar包
[list]
[*]commons-logging.jar
[*]concurrent.jar
[*]beeptls-jsse.jar (如果用到[url=http://zh.wikipedia.org/wiki/TLS]TLS[/url]的話需要)
[/list]
8.一些好的參考資料
XML 觀察: 剖析 BEEP http://www.ibm.com/developerworks/cn/xml/x-watch/part2/index.html 我第5節的代碼就是根據這個頁面改的,因為原文曆史比較久遠了,一些代碼已經和目前的beep core對不上了,是以我改了一下以通過測試。