广告处理
在我们建立了发现监听器之后, 它将不停的加入一些新发现的module说明通告到我们的本地缓冲中。 每次processPrimes()方法被调用的时候, 客户peer将尝试连接module说明通告代表的peer, 接入到他们的输入通道中,传递一个消息去初始化这个peer的质数发现服务。)
这个方法的第一个元素就是决定我们能够委托以工作的peer集合, 应该记得一个通告有一个期限限制, 因此我们要消除那些不在有效的通告。
Public int[] processPrimes(int low, int high) {
Set setCopy = null;
synchronized(adverts) {
Set setCopy = (Set) adverts.clone();
}
ArrayList workingList = new ArrayList();
ArrayList expired = new ArrayList();
long currentTime = System.getCurrentTimeMillis();
Iterator it = workingSet.iterator();
while (it.hasNext()) {
ModuleSpecAdvertisement ad = (ModuleSpecAdvertisement)it.next();
if (ad.getLocalExpirationTime() > currentTime + (2 * 60 *1000)) {
workingList.addElement(ad);
} else {
expired.addElement(ad);
}
}
removeExpired(expired);
前述的程序段执行了对一个简单缓冲中被发现的服务的管理, 让removeExpired()方法去删除那些过期和即将过期的通告(这里没有详细说明removeExpired())。
当我们有了一个有效通告的集合之后,我们能够开始对它们进行操作以获得我们需要用来发送消息的管道通告。 .
注意这个工作的分发是人为的: 一些peer可能比其它peer更有能力一些, 一些又可能 网络连接方面更好些。 这些不同应该在分配工作的时候都被考虑到, 事实上, 也不会故意把工作分配为过多的片段, 因为网络通信时间可能比花在实际计算质数的时间更多。 (这个例子的目的是说明怎么从一个ModuleSpecAdvertisement得到一个管道通告, 这样创建一个新的消息, 然后怎样通过管道发送一个消息。)
Listing 16.14展示了这些自然数列是如何被分为一个一个的子列的,每个子列又能和一个将送往别的peer的消息相对应。 消息被插入到一个hash表映射中, 映射的键值显示了消息的状态: 是否已经发送了? 我们是否收到了它的结果?
Listing 16.14 Creating New Messages
Map messageMap = new HashMap();
int size = workingList.size()
int mod = high % size;
high -= mod;
int perPiece = high / size;
for (int i=0; i < size; i++) {
//create a new message
Message msg = pipeSvc.createMessage();
msg.setString(ServiceConstants.LOW_INT, low);
//last message will get to compute a bit more
if (i == size-1) {
high = low + perPiece ?1 + mod;
} else {
high = low + perPiece -1;
}
msg.setString(ServiceConstants.HIGH_INT, high);
low += perPiece;
//we neither sent the message, nor did we get a response
StatusMap statusMap = new StatusMap(false, false);
StatusMap statusMap = new StatusMap(false, false);
messageMap.put(statusMap, msg);
}
StatusMap 就是一对布尔值,这里并没有列出
我们的最后一步就是从每个ModuleSpecAdvertisement中提取管道通告, 打开每个管道, 发送一个消息到那管道, 然后将这个消息标记为“已经发送”。
应记得一个通告就是一个结构化的文档, 类似于XML文档, 它能轻易的被转换为一个文本文档然后打印出来, 在开发和测试的时候查阅通告的内容是非常有好处的。
Listing 16.15 Printing an Advertisement
Collection ads = messageMap.values();
Iterator it = ads.iterator();
while (it.hasNext()) {
ModuleSpecAdvertisement ad = (ModuleSpecAdvertisement)it.next();
//First, print out ModuleSpec advertisement on standard output
StructuredTextDocument doc =
(StructuredTextDocument)ad.getDocument(new MimeMediaType ("text/plain"));
try {
StringWriter out = new StringWriter();
doc.sendToWriter(out);
System.out.println(out);
out.close();
} catch (IOException e) {
}
...
正如我们先前讨论的, 一个StructuredTextDocument类包括了很多元素,其中一个是一个参数。 当我们为我们的服务构造了ModuleSpecAdvertisement的时候,我们将它作为参数进入这个服务的管道通告。这个参数正好是另外一个StructuredDocument元素, 我们可以用操作XML文档一样的方法 操纵它。
在解析通告的参数元素的过程中,我们首先获得这个通告的ID 和类型, 通道的ID与URN说明相统一, 在JXTA 说明中有概述, JXTA说明将管道的特殊鉴定器用128-bit编码, 下面是这个URN的一个例子。
urn:jxta:uuid-59616261646162614E5047205032503382CCB236202640F5A242ACE15A8F9D7C04
IDFactory类有能力根据每个URN建造一个PipeID对象。 这就是我们用pipe ID去关联一个pipe通告的机制。
Listing 16.16 Working with Advertisement Parameters
StructuredTextDocument param = (StructuredTextDocument)ad.getParam();
String pipeID = null;
String pipeType = null;
Enumeration en = null;
if (param != null) {
en = param.getChildren("jxta:PipeAdvertisement");
}
Enumeration child = null;
if (en != null) {
child = ((TextElement)en.nextElement()).getChildren();
}
if (child != null) {
while (child.hasMoreElements()) {
TextElement el = (TextElement)child.nextElement();
String elementName = el.getName();
if (elementName.equals("Id")) {
pipeID = el.getTextValue();
}
if (elementName.equals("Type")) {
pipeType = el.getTextValue();
}
}
}
if (pipeID != null || pipeType != null) {
PipeAdvertisement pipeAdvert = (PipeAdvertisement)
AdvertisementFactory.newAdvertisement(
PipeAdvertisement.getAdvertisementType());
try {
URL pidURL = new URL(pipeID);
PipeID pid = (PipeID)IDFactory.fromURL(pidURL);
pipeAdvert.setPipeID(pid);
} catch (MalformedURLException e) {
System.out.println("Wrong URL: " + e.getMessage());
return;
} catch (UnknownServiceException e) {
System.out.println("Unknown Service: " + e.getMessage());
return;
}
}
基于这个PipeAdvertisement, 我们现在有能力去构造一个输出管道去连接远程peer的输入管道了。 如Listing 16.17, 应该记得一个管道是一个单向的沟通渠道,因此我们没有期望从这个管道上获得远程peer的回执, 远程peer进行一个必要的类似的工作, 打开一个管道连接回客户端, 然后连同结果一起发回消息。
Listing 16.17 Creating an Output Pipe
try {
outputPipe = pipeSvc.createOutputPipe(pipeAdvert, 30000);
outputPipe.send(msg);
System.out.println("Sent message on output pipe");
} catch (IOException e) {
System.out.println("Can't send message through pipe: " + e.getMessage());
}
}
有趣的是,这个管道创建机制是:一个peer可能在发送ModuleSpecAdvertisement和一个客户连接到它之间改变网络身份, 虽然如此,这个peer的虚拟身份在JXTA网络上将不会改变, 运行时的服务保证被ModuleSpecAdvertisement所通告的管道一直处于连接状态。
为了开启应用服务, 确定所有的JXTA类都在classpath里面, 然后输入下面这个命令。
java primecruncher.PrimePeer
你也可以运行下面的参数, 这些参数允许你饶过JXTA的登入界面。
java –D net.jxta.tls.principal=USERNAME
-Dnet.jxta.tls.password=PASSWORD primecruncher.PrimePeer
By substituting your JXTA username and password you can run the client similarly:(取代你的JXTA用户名和密码,类似的你可以运行客户端。)
java -Dnet.jxta.tls.princincipal=USERNAME
-Dnet.jxta.tls.password=PASSWORD primecruncher.PrimeClient
这是介绍P2P的简单开发的文档, 包括了一些大概意思,希望对大家有一点点帮助,我的翻译水平也有限,希望得到谅解。