1.[url=http://www.beepcore.org/]BEEP[/url],全称 块可扩展交换协议(Blocks Extensible Exchange Protocol),是一个P2P应用程序协议框架(RFC3080, RFC3081),用于面向连接的、异步请求/响应消息。支持在一个传输连接上使用多路复用的消息流。支持二进制和文本消息, TLS, SASL/Anonymous, SASL/OTP。
2.有一个java的实现java beep core,目前最新版本是 0.9.08,可在[url]http://sourceforge.net/projects/beepcore-java/[/url]下载。
3.一些基本概念
Peer 对等连接,端点,可以是客户端或服务端
Session 会话,代表了两个peer之间的连接
Channel 通道,用来收发消息。当Channel启动时会被关联一个Profile。可以有多个Channel共存在同一个Session中。这些端只需要一个IP连接,这个连接随后被多路复用以创建通道。
Message 消息,是从一个peer发到另一个peer的request或reply。这个类封装 MSG、RPY、ANS、ERR和NULL消息类型
Reply 回应,用于处理来自对等点的异步回答
Profile 概要文件,定义了合法的request或reply格式
BEEP中有两种角色,发起人和监听者。
BEEP采用request-reply模型,request的消息类型是MSG,reply的消息类型有RPY、ANS、ERR和NULL。每个请求必须得有一个响应。
4.好处
[list]
[*]简化TCP编程,关于这点我暂时还没有体会到。
[*]另外为什么不用web service呢?
BEEP 和web service的不同点是,BEEP是直接建立在TCP/IP 层的,而web service是建立在HTTP层之上的。
[/list]
5.一个例子
客户端发MSG消息,说我要添加一条Person记录到数据库里,服务端回应RPY消息,给出了用户存到表里的id
C: MSG <person>
<name>Edd</name>
<tel>1-234-567-7890</tel>
</person>
S: RPY <person uid="123"/>
5.1客户端
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import org.beepcore.beep.core.BEEPError;
import org.beepcore.beep.core.BEEPException;
import org.beepcore.beep.core.BEEPInterruptedException;
import org.beepcore.beep.core.Channel;
import org.beepcore.beep.core.ProfileRegistry;
import org.beepcore.beep.core.Session;
import org.beepcore.beep.core.StringOutputDataStream;
import org.beepcore.beep.lib.Reply;
import org.beepcore.beep.transport.tcp.TCPSessionCreator;
public class Client {
public static void main(String[] args) {
Session session = null;
try {
//1.创建Session
session = TCPSessionCreator.initiate("localhost", 6000,
new ProfileRegistry());
} catch (BEEPException e) {
e.printStackTrace();
}
Channel channel = null;
try {
//2.启动Channel
channel = session.startChannel("http://example.org/profiles/ADDRESSBOOK");
} catch (BEEPError e) {
e.printStackTrace();
} catch (BEEPException e) {
e.printStackTrace();
}
String message = "<person><name>Edd</name>"
+ "<tel>1-234-567-7890</tel></person>";
StringOutputDataStream request = new StringOutputDataStream(message);
request.setContentType("text/xml");
Reply reply = new Reply();
try {
//3.发送请求消息
channel.sendMSG(request, reply);
} catch (BEEPException e) {
e.printStackTrace();
}
try {
//4.接收响应消息
InputStream is = reply.getNextReply().getDataStream().getInputStream();
StringBuilder reptxt = new StringBuilder();
byte inputbuf[] = new byte[1024];
int inputlen;
while ((inputlen = is.read(inputbuf)) != -1) {
reptxt.append(new String(inputbuf, 0, inputlen, "utf-8"));
}
System.out.println("got back:" + reptxt);
} catch (BEEPInterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
try {
//5.最后不要忘了关闭资源,不然服务端会报错。
channel.close();
session.close();
} catch (BEEPException e) {
e.printStackTrace();
}
}
}
5.2 服务端
import org.beepcore.beep.core.BEEPError;
import org.beepcore.beep.core.BEEPException;
import org.beepcore.beep.core.Channel;
import org.beepcore.beep.core.CloseChannelException;
import org.beepcore.beep.core.MessageMSG;
import org.beepcore.beep.core.ProfileRegistry;
import org.beepcore.beep.core.RequestHandler;
import org.beepcore.beep.core.Session;
import org.beepcore.beep.core.StartChannelException;
import org.beepcore.beep.core.StartChannelListener;
import org.beepcore.beep.core.StringOutputDataStream;
import org.beepcore.beep.profile.Profile;
import org.beepcore.beep.profile.ProfileConfiguration;
import org.beepcore.beep.transport.tcp.TCPSessionCreator;
public class Server implements Profile, StartChannelListener, RequestHandler {
@Override
public StartChannelListener init(String uri, ProfileConfiguration config)
throws BEEPException {
System.out.println("Server.init");
return this;
}
@Override
public boolean advertiseProfile(Session session) throws BEEPException {
System.out.println("Server.advertiseProfile");
return true;
}
@Override
public void startChannel(Channel channel, String encoding, String data)
throws StartChannelException {
System.out.println("Server.startChannel");
channel.setRequestHandler(this);
}
@Override
public void closeChannel(Channel channel) throws CloseChannelException {
System.out.println("Server.closeChannel");
channel.setRequestHandler(null);
}
@Override
public void receiveMSG(MessageMSG message) {
//Server的业务方法就是实现receiveMSG
String replytxt = "<person uid=\"123\"/>";
StringOutputDataStream reply = new StringOutputDataStream(replytxt);
reply.setContentType("text/xml");
try {
//发送回应
message.sendRPY(reply);
} catch (BEEPException e) {
try {
message.sendERR(BEEPError.CODE_REQUESTED_ACTION_ABORTED,
"Error sending RPY");
} catch (BEEPException x) {
message.getChannel().getSession().terminate(x.getMessage());
}
}
}
public static void main(String[] args) {
Server server = new Server();
ProfileRegistry reg = new ProfileRegistry();
String uri = "http://example.org/profiles/ADDRESSBOOK";
// Initialize the profile and add it to the advertised profiles
try {
reg.addStartChannelListener(uri,
server.init(uri, new ProfileConfiguration()),
null);
while (true) {
//死循环用来监听多个客户端的连接
System.out.println("Server listening.....");
TCPSessionCreator.listen(6000, reg);
}
} catch (BEEPException e) {
e.printStackTrace();
}
}
}
6.你也可以研究官方example里自带的Ping的例子。这个例子实现了客户端ping服务端。
运行方法和结果如下
服务端
D:\eclipse\workspace\beep\bin>java -cp .;../lib/* org.beepcore.beep.example.Beepd -config ../src/org/beepcore/beep/example/beepd-config.xml
Beepd: started
客户端
D:\eclipse\workspace\beep\bin>java -cp .;../lib/* org.beepcore.beep.example.Bing localhost
Reply from localhost: bytes=1024 time=2ms
Reply from localhost: bytes=1024 time=1ms
Reply from localhost: bytes=1024 time=3ms
Reply from localhost: bytes=1024 time=1ms
7.依赖的jar包
[list]
[*]commons-logging.jar
[*]concurrent.jar
[*]beeptls-jsse.jar (如果用到[url=http://zh.wikipedia.org/wiki/TLS]TLS[/url]的话需要)
[/list]
8.一些好的参考资料
XML 观察: 剖析 BEEP http://www.ibm.com/developerworks/cn/xml/x-watch/part2/index.html 我第5节的代码就是根据这个页面改的,因为原文历史比较久远了,一些代码已经和目前的beep core对不上了,所以我改了一下以通过测试。