JLiteSpider
A lite distributed Java spider framework.
這是一個輕量級的分布式java爬蟲架構
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E7%89%B9%E7%82%B9 特點
這是一個強大,但又輕量級的分布式爬蟲架構。jlitespider天生具有分布式的特點,各個worker之間需要通過一個或者多個消息隊列來連接配接。消息隊列我的選擇是
rabbitmq。worker和消息之間可以是一對一,一對多,多對一或多對多的關系,這些都可以自由而又簡單地配置。消息隊列中存儲的消息分為四種:url,頁面源碼,解析後的結果以及自定義的消息。同樣的,worker的工作也分為四部分:下載下傳頁面,解析頁面,資料持久化和自定義的操作。
使用者隻需要在配置檔案中,規定好worker和消息隊列之間的關系。接着在代碼中,定義好worker的四部分工作。即可完成爬蟲的編寫。
總體的使用流程如下:
- 啟動rabbitmq。
- 在配置檔案中定義worker和消息隊列之間的關系。
- 在代碼中編寫worker的工作。
- 最後,啟動爬蟲。
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E5%AE%89%E8%A3%85 安裝
使用maven:
<dependency>
<groupId>com.github.luohaha</groupId>
<artifactId>jlitespider</artifactId>
<version>0.4.3</version>
</dependency>
直接下載下傳jar包:
點選
下載下傳。
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E8%AE%BE%E8%AE%A1%E6%80%9D%E6%83%B3 設計思想
雖然JLiteSpider将抓取流程抽象成了幾個部分,但這并不意味着你就必須遵從這種抽象,你應該根據自己的應用場景,來作出最符合效率最大化的使用決策。比如,如果你抓取的網頁源碼較大,如果把網頁源碼也存入消息隊列,會導緻消息隊列負擔過大。是以這個時候比較好的做法是将下載下傳和解析的流程合并,直接向消息隊列輸出解析後的結果。
是以,雖然JLiteSpider幫你抽象出了抓取過程中的不同階段,但這完全是選擇性的,使用者完全是自由的。我在設計JLiteSpider的時候,盡力保障了自由。後面要介紹到的Worker和消息隊列的自由配置,以及添加了
freeman
,同樣是這種設計思路的展現。
說到這裡,也給大家推薦一個架構交流學習群:835544715,裡面會分享一些資深架構師錄制的視訊錄像:有Spring,MyBatis,Netty源碼分析,高并發、高性能、分布式、微服務架構的原理,JVM性能優化這些成為架構師必備的知識體系。還能領取免費的學習資源,相信對于已經工作和遇到技術瓶頸的碼友,在這個群裡會有你需要的内容。
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#worker%E5%92%8C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E4%B9%8B%E9%97%B4%E5%85%B3%E7%B3%BB Worker和消息隊列之間關系
worker和消息隊列之間的關系可以是一對一,多對一,一對多,多對多,都是可以配置的。在配置檔案中,寫上要監聽的消息隊列和要發送的消息隊列。例如:
{ "workerid" : 2, "mq" : [{ "name" : "one", "host" : "localhost", "port" : 5672, "qos" : 3 , "queue" : "url"
},
{ "name" : "two", "host" : "localhost", "port" : 5672, "qos" : 3 , "queue" : "hello"
}], "sendto" : ["two"], "recvfrom" : ["one", "two"]
}
workerid : worker的id号
mq : 各個消息隊列所在的位置,和配置資訊。
字段為這個消息隊列的唯一辨別符,供消息隊列的擷取使用。
name
為消息隊列所在的主機ip,
host
為消息隊列的監聽端口号(rabbitmq中預設為5672)。
port
為消息隊列每次将消息發給worker時的消息個數。
qos
為消息隊列的名字。
queue
+
host
port
queue
可以了解為是消息隊列的唯一位址。
sendto : 要發送到的消息隊列,填入的資訊為
中的
mq
name
字段中的辨別符。
recvfrom : 要監聽的消息隊列,消息隊列會把消息分發到這個worker中。填入的資訊同樣為
mq
name
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E6%B6%88%E6%81%AF%E7%9A%84%E8%AE%BE%E8%AE%A1 消息的設計
在消息隊列中,消息一共有四種類型。分别是url,page,result和自定義類型。在worker的程式中,可以通過messagequeue的四種方法(sendUrl, sendPage, sendResult, send)來插入消息。worker的downloader會處理url消息,processor會處理page消息,saver會處理result消息,freeman會處理所有的自定義的消息。我們所要做的工作,就是實作好worker中的這四個函數。
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#worker%E6%8E%A5%E5%8F%A3%E7%9A%84%E8%AE%BE%E8%AE%A1 Worker接口的設計
JLiteSpider将整個的爬蟲抓取流程抽象成四個部分,由四個接口來定義。分别是downloader,processor,saver和freeman。它們分别處理上述提到的四種消息。
你所需要做的是,實作這個接口,并将想要抓取的url連結清單傳回。具體的實作細節,可以由你高度定制。
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#1-downloader 1. Downloader:
這部分實作的是頁面下載下傳的任務,将想要抓取的url連結清單,轉化(下載下傳後存儲)為相應的頁面資料連結清單。
接口設計如下:
public interface Downloader { /** * 下載下傳url所指定的頁面。 * @param url * 收到的由消息隊列傳過來的消息 * @param mQueue * 提供把消息發送到各個消息隊列的方法 * @throws IOException */
public void download(Object url, Map<String, MessageQueue> mQueue) throws IOException;
}
你同樣可以實作這個接口,具體的實作可由你自由定制,隻要實作
download
函數。
url
是消息隊列推送過來的消息,裡面不一定是一條
url
,具體是什麼内容,是由你當初傳入消息隊列時決定的。
mQueue
提供了消息發送到各個消息隊列的方法,通過
mQueue.get("...")
選取消息隊列,然後執行messagequeue的四種方法(sendUrl, sendPage, sendResult, send)來插入消息。
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#2-processor 2. Processor:
Processor
是解析器的接口,這裡會從網頁的原始檔案中提取出有用的資訊。
接口設計:
public interface Processor{ /** * 處理下載下傳下來的頁面源代碼 * @param page * 消息隊列推送過來的頁面源代碼資料消息 * @param mQueue * 提供把消息發送到各個消息隊列的方法 * @throws IOException */
public void process(Object page, Map<String, MessageQueue> mQueue) throws IOException;
}
實作這個接口,完成對頁面源碼的解析處理。
page
是由消息隊列推送過來的消息,具體格式同樣是由你在傳入時決定好的。
mQueue
使用同上。
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#3-saver 3. Saver:
Saver
實作的是對解析得到結果的處理,可以将你解析後得到的資料存入資料庫,檔案等等。或者将url重新存入消息隊列,實作疊代抓取。
接口的設計:
public interface Saver { /** * 處理最終解析得到的結果 * @param result * 消息隊列推送過來的結果消息 * @param mQueue * 提供把消息發送到各個消息隊列的方法 * @throws IOException */
public void save(Object result, Map<String, MessageQueue> mQueue) throws IOException;
}
通過實作這個接口,可以完成對結果的處理。你同樣可以實作這個接口,具體的實作可由你自由定制,隻要實作
download
result
是消息隊列推送過來的結果消息,具體的格式是由你當初傳入消息隊列時決定的。
mQueue
的使用同上。
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#4-freeman 4. Freeman:
通過上述的三個流程,可以實作爬蟲抓取的一個正常流程。但是同樣提供了自定義的功能,你可以完善,加強,改進甚至颠覆上述的抓取流程。
jlitespider
就是一個處理自定義消息格式的接口,實作它就可以定義自己的格式,以至于定義自己的流程。
freeman
public interface Freeman { /** * 自定義的處理函數 * @param key * key為自定義的消息标記 * @param msg * 消息隊列推送的消息 * @param mQueue * 提供把消息發送到各個消息隊列的方法 * @throws IOException */
public void doSomeThing(String key, Object msg, Map<String, MessageQueue> mQueue) throws IOException;
}
通過實作
doSomeThing
函數,你就可以處理來自消息隊列的自定義消息。
key
為消息的标記,
msg
為消息的内容。同樣,通過
mQueue
的
send
方法,可以實作向消息隊列發送自定義消息的操作。(需要注意,自定義的消息标記不能為:
url
,
page
result
。否則會被認為是
jlitespider
的保留消息,也就是由上述的三個接口函數來處理。)
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E6%80%BB%E7%BB%93%E8%AF%B4%E6%98%8E 總結說明
jlitespider
的設計可能會讓您有些疑惑,不過等您熟悉這一整套的設計之後,您就會發現
jlitespider
是多麼的靈活和易于使用。
###使用方法
JLiteSpider使用:
//worker的啟動Spider.create() //建立執行個體
.setDownloader(...) //設定實作了Downloader接口的下載下傳器
.setProcessor(...) //設定實作了Processor接口的解析器
.setSaver(...) //設定實作了Saver接口的資料持久化方法
.setFreeman(...) //設定自定義消息的處理函數
.setSettingFile(...) //設定配置檔案
.begin(); //開始爬蟲//消息隊列中初始消息添加器的使用。隻有向消息隊列中添加初始的消息後,整個爬蟲系統才能啟動,是以稱其為spider的lighter(點火器)。SpiderLighter.locateMQ("localhost", 5672, "MQ's name") // 定位到要通路的消息隊列
.addUrl(...) //向消息隊列添加url類型的消息
.addPage(...) //向消息隊列添加page類型的消息
.addResult(...) //向消息隊列添加result類型的消息
.add(..., ...) //向消息隊列添加自定義類型的消息
.close() //關閉連接配接,一定要記得在最後調用!
以豆瓣電影的頁面為例子,假設我們要抓取豆瓣電影的愛情分類中的所有電影名稱,并存入txt檔案中:
- 首先,需要設計消息隊列和worker之間的關系。我的設計是有兩個worker和兩個消息隊列,其中一個worker在main消息隊列上,負責下載下傳,解析并把最終結果傳入data消息隊列。第二個worker從data消息隊列中取資料,并存入txt檔案中。兩個worker的配置檔案如下:
第一個worker:
{ "workerid" : 1, "mq" : [{ "name" : "main", "host" : "localhost", "port" : 5672, "qos" : 3 , "queue" : "main"
}, { "name" : "data", "host" : "localhost", "port" : 5672, "qos" : 3 , "queue" : "data"
}], "sendto" : ["main", "data"], "recvfrom" : ["main"]
}
第二個worker:
{ "workerid" : 2, "mq" : [{ "name" : "main", "host" : "localhost", "port" : 5672, "qos" : 3 , "queue" : "main"
}, { "name" : "data", "host" : "localhost", "port" : 5672, "qos" : 3 , "queue" : "data"
}], "sendto" : [], "recvfrom" : ["data"]
}
- 接着,編寫第一個worker的代碼,如下:
//下載下傳頁面資料,并存入main隊列。public class DoubanDownloader implements Downloader { private Logger logger = Logger.getLogger("DoubanDownloader"); @Override
public void download(Object url, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
String result = ""; try {
result = Network.create()
.setUserAgent("...")
.setCookie("...")
.downloader(url.toString()); //下載下傳成功,将頁面資料放入main消息隊列
mQueue.get("main").sendPage(result);
} catch (IOException e) {
logger.info("本次下載下傳失敗!重新下載下傳!"); //因為下載下傳失敗,是以将url重新放入main隊列中
mQueue.get("main").sendUrl(url);
}
}
}
//解析頁面資料,将結果放入main消息隊列。同時,後面頁面的url資訊同樣需要放入隊列,以便疊代抓取。public class DoubanProcessor implements Processor {//url去重複
private Set<String> urlset = new HashSet<>(); @Override
public void process(Object page, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
String path = "//[@id=content]/div/div[1]/div[2]/table/tbody/tr/td[1]/a/@title"; List<String> result = Xsoup.compile(path).evaluate(Jsoup.parse(page.toString())).list(); //将結果放入main消息隊列
mQueue.get("main").sendResult(result);
path = "//[@id=content]/div/div[1]/div[3]/a/@href"; List<String> url = Xsoup.compile(path).evaluate(Jsoup.parse(page.toString())).list(); for (String each : url) { if (!urlset.contains(each)) { //如果url之前并未抓取過,則加入main隊列,作為接下來要抓取的url
mQueue.get("main").sendUrl(each);
urlset.add(each);
}
}
}
}
//把最終的資料放入data消息隊列public class DoubanSaver implements Saver { @Override
public void save(Object result, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
List<String> rList = (List<String>) result; for (String each : rList) { //把資料發往data消息隊列
mQueue.get("data").send("cc", each);
}
}
}
//啟動worker的主程式public class DoubanSpider { public static void main(String[] args) { try { Spider.create().setDownloader(new DoubanDownloader())
.setProcessor(new DoubanProcessor())
.setSaver(new DoubanSaver())
.setSettingFile("./conf/setting.json")
.begin();
} catch (ShutdownSignalException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (ConsumerCancelledException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (SpiderSettingFileException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
}
- 接下來,還要寫第二個worker的代碼。
//接收data消息隊列中的資料,寫入txtpublic class SaveToFile implements Freeman { @Override
public void doSomeThing(String key, Object msg, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
File file = new File("./output/name.txt"); FileWriter fileWriter = new FileWriter(file, true);
fileWriter.write(msg.toString() + "\n");
fileWriter.flush();
fileWriter.close();
}
}
//第二個worker的啟動主程式public class SaveToFileSpider { public static void main(String[] args) { try { Spider.create().setFreeman(new SaveToFile())
.setSettingFile("./conf/setting2.json")
.begin();
} catch (ShutdownSignalException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (ConsumerCancelledException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (SpiderSettingFileException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
}
- 還要編寫一個main消息隊列的初始化程式(點火程式),把第一個入口url放入main消息隊列中。
//把入口url放入main消息隊列public class AddUrls { public static void main(String[] args) { try { // 首先定位到要通路的消息隊列,隊列在localhost:5672/main
// 然後向這個消息隊列添加url
// 最後關閉lighter
SpiderLighter.locateMQ("localhost", 5672, "main")
.addUrl("https://movie.douban.com/tag/%E7%88%B1%E6%83%85?start=0&type=T")
.close();
} catch (IOException e) { // TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
}
- 最後,依次啟動程式。啟動的順序是:rabbitmq -> worker1/2 -> 初始化消息程式。關于rabbitmq的使用,它的官方網站上有詳細的安裝和使用文檔,可用于快速搭建rabbitmq的server。
https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E8%BE%85%E5%8A%A9%E5%B7%A5%E5%85%B7 輔助工具
目前版本的能提供的輔助工具并不多,您在使用
jlitespider
的過程中,可以将您實作的輔助工具合并到
jlitespider
中來,一起來完善
jlitespider
的功能。輔助工具在包
jlitespider
中。
com.github.luohaha.jlitespider.extension
- Network
簡單的網絡下載下傳器,輸入url,傳回頁面源代碼。使用如下:
String result = Network.create()
.setCookie("...")
.setProxy("...")
.setTimeout(...)
.setUserAgent("...")
.downloader(url);
不推薦使用這個網絡下載下傳器,因為它是同步的,會阻塞程序。
- AsyncNetwork
異步非阻塞的網絡下載下傳器,推薦使用這個作為頁面下載下傳器,因為它不會阻塞程序。
// 建立下載下傳器AsyncNetwork asyncNetwork = new AsyncNetwork();// 設定cookieasyncNetwork.setCookie(cookies);// 設定代理asyncNetwork.setProxy("...");// 設定agentasyncNetwork.setUserAgent("...");// 啟動下載下傳器asyncNetwork.begin();
在異步下載下傳器啟動後,可以随時往下載下傳器中添加url,和對應的回調處理對象。
// 添加要下載下傳的頁面的url,和下載下傳完成後的處理函數。asyncNetwork.addUrl("...", new DownloadCallback() {
@Override
public void onReceived(String result, String url) { // 下載下傳成功後,執行這個函數。result為下載下傳下來的頁面資訊,url為對應的url連結。
}
@Override
public void onFailed(Exception exception, String url) { // 下載下傳失敗時,執行這個函數。exception為失敗原因。
}
});
- 解析工具
項目中依賴了兩個很常用的解析工具:xsoup 和 jsoup。
想要學習Java高架構、分布式架構、高可擴充、高性能、高并發、性能優化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分布式項目實戰學習架構師視訊免費擷取 架構群:835544715
點選連結加入群聊【JAVA進階架構】:https://jq.qq.com/?_wv=1027&k=5dbERkY