天天看點

一個分布式java爬蟲架構JLiteSpiderJLiteSpider

JLiteSpider

A lite distributed Java spider framework.

這是一個輕量級的分布式java爬蟲架構

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E7%89%B9%E7%82%B9 特點

這是一個強大,但又輕量級的分布式爬蟲架構。jlitespider天生具有分布式的特點,各個worker之間需要通過一個或者多個消息隊列來連接配接。消息隊列我的選擇是

rabbitmq

。worker和消息之間可以是一對一,一對多,多對一或多對多的關系,這些都可以自由而又簡單地配置。消息隊列中存儲的消息分為四種:url,頁面源碼,解析後的結果以及自定義的消息。同樣的,worker的工作也分為四部分:下載下傳頁面,解析頁面,資料持久化和自定義的操作。

使用者隻需要在配置檔案中,規定好worker和消息隊列之間的關系。接着在代碼中,定義好worker的四部分工作。即可完成爬蟲的編寫。

總體的使用流程如下:

  • 啟動rabbitmq。
  • 在配置檔案中定義worker和消息隊列之間的關系。
  • 在代碼中編寫worker的工作。
  • 最後,啟動爬蟲。

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E5%AE%89%E8%A3%85 安裝

使用maven:
<dependency>
  <groupId>com.github.luohaha</groupId>
  <artifactId>jlitespider</artifactId>
  <version>0.4.3</version>
</dependency>      
直接下載下傳jar包:

點選

下載下傳

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E8%AE%BE%E8%AE%A1%E6%80%9D%E6%83%B3 設計思想

雖然JLiteSpider将抓取流程抽象成了幾個部分,但這并不意味着你就必須遵從這種抽象,你應該根據自己的應用場景,來作出最符合效率最大化的使用決策。比如,如果你抓取的網頁源碼較大,如果把網頁源碼也存入消息隊列,會導緻消息隊列負擔過大。是以這個時候比較好的做法是将下載下傳和解析的流程合并,直接向消息隊列輸出解析後的結果。

是以,雖然JLiteSpider幫你抽象出了抓取過程中的不同階段,但這完全是選擇性的,使用者完全是自由的。我在設計JLiteSpider的時候,盡力保障了自由。後面要介紹到的Worker和消息隊列的自由配置,以及添加了

freeman

,同樣是這種設計思路的展現。

說到這裡,也給大家推薦一個架構交流學習群:835544715,裡面會分享一些資深架構師錄制的視訊錄像:有Spring,MyBatis,Netty源碼分析,高并發、高性能、分布式、微服務架構的原理,JVM性能優化這些成為架構師必備的知識體系。還能領取免費的學習資源,相信對于已經工作和遇到技術瓶頸的碼友,在這個群裡會有你需要的内容。

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#worker%E5%92%8C%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E4%B9%8B%E9%97%B4%E5%85%B3%E7%B3%BB Worker和消息隊列之間關系

worker和消息隊列之間的關系可以是一對一,多對一,一對多,多對多,都是可以配置的。在配置檔案中,寫上要監聽的消息隊列和要發送的消息隊列。例如:

{    "workerid" : 2,    "mq" : [{        "name" : "one",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "url"
    },
    {        "name" : "two",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "hello"
    }],    "sendto" : ["two"],    "recvfrom" : ["one", "two"]
}      

workerid : worker的id号

mq : 各個消息隊列所在的位置,和配置資訊。

name

字段為這個消息隊列的唯一辨別符,供消息隊列的擷取使用。

host

為消息隊列所在的主機ip,

port

為消息隊列的監聽端口号(rabbitmq中預設為5672)。

qos

為消息隊列每次将消息發給worker時的消息個數。

queue

為消息隊列的名字。

host

+

port

queue

可以了解為是消息隊列的唯一位址。

sendto : 要發送到的消息隊列,填入的資訊為

mq

中的

name

字段中的辨別符。

recvfrom : 要監聽的消息隊列,消息隊列會把消息分發到這個worker中。填入的資訊同樣為

mq

name

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E6%B6%88%E6%81%AF%E7%9A%84%E8%AE%BE%E8%AE%A1 消息的設計

在消息隊列中,消息一共有四種類型。分别是url,page,result和自定義類型。在worker的程式中,可以通過messagequeue的四種方法(sendUrl, sendPage, sendResult, send)來插入消息。worker的downloader會處理url消息,processor會處理page消息,saver會處理result消息,freeman會處理所有的自定義的消息。我們所要做的工作,就是實作好worker中的這四個函數。

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#worker%E6%8E%A5%E5%8F%A3%E7%9A%84%E8%AE%BE%E8%AE%A1 Worker接口的設計

JLiteSpider将整個的爬蟲抓取流程抽象成四個部分,由四個接口來定義。分别是downloader,processor,saver和freeman。它們分别處理上述提到的四種消息。

你所需要做的是,實作這個接口,并将想要抓取的url連結清單傳回。具體的實作細節,可以由你高度定制。

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#1-downloader 1. Downloader:

這部分實作的是頁面下載下傳的任務,将想要抓取的url連結清單,轉化(下載下傳後存儲)為相應的頁面資料連結清單。

接口設計如下:

public interface Downloader {	/**	 * 下載下傳url所指定的頁面。	 * @param url 	 * 收到的由消息隊列傳過來的消息	 * @param mQueue 	 * 提供把消息發送到各個消息隊列的方法  * @throws IOException	 */
public void download(Object url, Map<String, MessageQueue> mQueue) throws IOException;
}      

你同樣可以實作這個接口,具體的實作可由你自由定制,隻要實作

download

函數。

url

是消息隊列推送過來的消息,裡面不一定是一條

url

,具體是什麼内容,是由你當初傳入消息隊列時決定的。

mQueue

提供了消息發送到各個消息隊列的方法,通過

mQueue.get("...")

選取消息隊列,然後執行messagequeue的四種方法(sendUrl, sendPage, sendResult, send)來插入消息。

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#2-processor 2. Processor:

Processor

是解析器的接口,這裡會從網頁的原始檔案中提取出有用的資訊。

接口設計:

public interface Processor{	/**	 * 處理下載下傳下來的頁面源代碼	 * @param page	 * 消息隊列推送過來的頁面源代碼資料消息	 * @param mQueue	 * 提供把消息發送到各個消息隊列的方法  * @throws IOException	 */
public void process(Object page, Map<String, MessageQueue> mQueue) throws IOException;
}      

實作這個接口,完成對頁面源碼的解析處理。

page

是由消息隊列推送過來的消息,具體格式同樣是由你在傳入時決定好的。

mQueue

使用同上。

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#3-saver 3. Saver:

Saver

實作的是對解析得到結果的處理,可以将你解析後得到的資料存入資料庫,檔案等等。或者将url重新存入消息隊列,實作疊代抓取。

接口的設計:

public interface Saver {	/**	 * 處理最終解析得到的結果	 * @param result 	 * 消息隊列推送過來的結果消息	 * @param mQueue 	 * 提供把消息發送到各個消息隊列的方法  * @throws IOException	 */
public void save(Object result, Map<String, MessageQueue> mQueue) throws IOException;
}      

通過實作這個接口,可以完成對結果的處理。你同樣可以實作這個接口,具體的實作可由你自由定制,隻要實作

download

result

是消息隊列推送過來的結果消息,具體的格式是由你當初傳入消息隊列時決定的。

mQueue

的使用同上。

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#4-freeman 4. Freeman:

通過上述的三個流程,可以實作爬蟲抓取的一個正常流程。但是

jlitespider

同樣提供了自定義的功能,你可以完善,加強,改進甚至颠覆上述的抓取流程。

freeman

就是一個處理自定義消息格式的接口,實作它就可以定義自己的格式,以至于定義自己的流程。
public interface Freeman {	/**	 * 自定義的處理函數	 * @param key	 * key為自定義的消息标記	 * @param msg	 * 消息隊列推送的消息	 * @param mQueue	 * 提供把消息發送到各個消息隊列的方法	 * @throws IOException	 */
public void doSomeThing(String key, Object msg, Map<String, MessageQueue> mQueue) throws IOException;
}      

通過實作

doSomeThing

函數,你就可以處理來自消息隊列的自定義消息。

key

為消息的标記,

msg

為消息的内容。同樣,通過

mQueue

send

方法,可以實作向消息隊列發送自定義消息的操作。(需要注意,自定義的消息标記不能為:

url

page

result

。否則會被認為是

jlitespider

的保留消息,也就是由上述的三個接口函數來處理。)

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E6%80%BB%E7%BB%93%E8%AF%B4%E6%98%8E 總結說明

jlitespider

的設計可能會讓您有些疑惑,不過等您熟悉這一整套的設計之後,您就會發現

jlitespider

是多麼的靈活和易于使用。

###使用方法

JLiteSpider使用:

//worker的啟動Spider.create() //建立執行個體
      .setDownloader(...) //設定實作了Downloader接口的下載下傳器
      .setProcessor(...) //設定實作了Processor接口的解析器
      .setSaver(...) //設定實作了Saver接口的資料持久化方法
      .setFreeman(...) //設定自定義消息的處理函數
      .setSettingFile(...) //設定配置檔案
      .begin(); //開始爬蟲//消息隊列中初始消息添加器的使用。隻有向消息隊列中添加初始的消息後,整個爬蟲系統才能啟動,是以稱其為spider的lighter(點火器)。SpiderLighter.locateMQ("localhost", 5672, "MQ's name") // 定位到要通路的消息隊列
                 .addUrl(...) //向消息隊列添加url類型的消息
                 .addPage(...) //向消息隊列添加page類型的消息
                 .addResult(...) //向消息隊列添加result類型的消息
                 .add(..., ...) //向消息隊列添加自定義類型的消息
                 .close() //關閉連接配接,一定要記得在最後調用!      

以豆瓣電影的頁面為例子,假設我們要抓取豆瓣電影的愛情分類中的所有電影名稱,并存入txt檔案中:

  • 首先,需要設計消息隊列和worker之間的關系。我的設計是有兩個worker和兩個消息隊列,其中一個worker在main消息隊列上,負責下載下傳,解析并把最終結果傳入data消息隊列。第二個worker從data消息隊列中取資料,并存入txt檔案中。兩個worker的配置檔案如下:

第一個worker:

{    "workerid" : 1,    "mq" : [{        "name" : "main",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "main"
    }, {        "name" : "data",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "data"
    }],    "sendto" : ["main", "data"],    "recvfrom" : ["main"]
}      

第二個worker:

{    "workerid" : 2,    "mq" : [{        "name" : "main",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "main"
    }, {        "name" : "data",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "data"
    }],    "sendto" : [],    "recvfrom" : ["data"]
}      
  • 接着,編寫第一個worker的代碼,如下:
//下載下傳頁面資料,并存入main隊列。public class DoubanDownloader implements Downloader { private Logger logger = Logger.getLogger("DoubanDownloader");	@Override
public void download(Object url, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
String result = "";	try {
result = Network.create()
            .setUserAgent("...")
            .setCookie("...")
            .downloader(url.toString());	//下載下傳成功,将頁面資料放入main消息隊列
mQueue.get("main").sendPage(result);
} catch (IOException e) {
logger.info("本次下載下傳失敗!重新下載下傳!");	//因為下載下傳失敗,是以将url重新放入main隊列中
mQueue.get("main").sendUrl(url);
}
}

}      
//解析頁面資料,将結果放入main消息隊列。同時,後面頁面的url資訊同樣需要放入隊列,以便疊代抓取。public class DoubanProcessor implements Processor {//url去重複
private Set<String> urlset = new HashSet<>();	@Override
public void process(Object page, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
String path = "//[@id=content]/div/div[1]/div[2]/table/tbody/tr/td[1]/a/@title"; List<String> result = Xsoup.compile(path).evaluate(Jsoup.parse(page.toString())).list(); //将結果放入main消息隊列
mQueue.get("main").sendResult(result);
path = "//[@id=content]/div/div[1]/div[3]/a/@href"; List<String> url = Xsoup.compile(path).evaluate(Jsoup.parse(page.toString())).list(); for (String each : url) {	if (!urlset.contains(each)) {	//如果url之前并未抓取過,則加入main隊列,作為接下來要抓取的url
mQueue.get("main").sendUrl(each);
urlset.add(each);
}
}
}

}      
//把最終的資料放入data消息隊列public class DoubanSaver implements Saver {	@Override
public void save(Object result, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
List<String> rList = (List<String>) result;	for (String each : rList) {	//把資料發往data消息隊列
mQueue.get("data").send("cc", each);
}
}

}      
//啟動worker的主程式public class DoubanSpider { public static void main(String[] args) {	try { Spider.create().setDownloader(new DoubanDownloader())
               .setProcessor(new DoubanProcessor())
               .setSaver(new DoubanSaver())
               .setSettingFile("./conf/setting.json")
               .begin();
} catch (ShutdownSignalException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (ConsumerCancelledException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (SpiderSettingFileException e) {	// TODO Auto-generated catch block
e.printStackTrace();
}
}
}      
  • 接下來,還要寫第二個worker的代碼。
//接收data消息隊列中的資料,寫入txtpublic class SaveToFile implements Freeman { @Override
public void doSomeThing(String key, Object msg, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
File file = new File("./output/name.txt"); FileWriter fileWriter = new FileWriter(file, true);
fileWriter.write(msg.toString() + "\n");
fileWriter.flush();
fileWriter.close();
}
}      
//第二個worker的啟動主程式public class SaveToFileSpider { public static void main(String[] args) {	try { Spider.create().setFreeman(new SaveToFile())
               .setSettingFile("./conf/setting2.json")
               .begin();
} catch (ShutdownSignalException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (ConsumerCancelledException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (SpiderSettingFileException e) {	// TODO Auto-generated catch block
e.printStackTrace();
}
}
}      
  • 還要編寫一個main消息隊列的初始化程式(點火程式),把第一個入口url放入main消息隊列中。
//把入口url放入main消息隊列public class AddUrls { public static void main(String[] args) {	try {	// 首先定位到要通路的消息隊列,隊列在localhost:5672/main
// 然後向這個消息隊列添加url
// 最後關閉lighter
SpiderLighter.locateMQ("localhost", 5672, "main")
             .addUrl("https://movie.douban.com/tag/%E7%88%B1%E6%83%85?start=0&type=T")
             .close();
} catch (IOException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {	// TODO Auto-generated catch block
e.printStackTrace();
}
}
}      
  • 最後,依次啟動程式。啟動的順序是:rabbitmq -> worker1/2 -> 初始化消息程式。關于rabbitmq的使用,它的官方網站上有詳細的安裝和使用文檔,可用于快速搭建rabbitmq的server。

https://github.com/luohaha/jlitespider/blob/master/README.md?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io#%E8%BE%85%E5%8A%A9%E5%B7%A5%E5%85%B7 輔助工具

目前版本的

jlitespider

能提供的輔助工具并不多,您在使用

jlitespider

的過程中,可以将您實作的輔助工具合并到

jlitespider

中來,一起來完善

jlitespider

的功能。輔助工具在包

com.github.luohaha.jlitespider.extension

中。
  • Network

簡單的網絡下載下傳器,輸入url,傳回頁面源代碼。使用如下:

String result = Network.create()
.setCookie("...")
.setProxy("...")
.setTimeout(...)
.setUserAgent("...")
.downloader(url);      
不推薦使用這個網絡下載下傳器,因為它是同步的,會阻塞程序。
  • AsyncNetwork

異步非阻塞的網絡下載下傳器,推薦使用這個作為頁面下載下傳器,因為它不會阻塞程序。

// 建立下載下傳器AsyncNetwork asyncNetwork = new AsyncNetwork();// 設定cookieasyncNetwork.setCookie(cookies);// 設定代理asyncNetwork.setProxy("...");// 設定agentasyncNetwork.setUserAgent("...");// 啟動下載下傳器asyncNetwork.begin();      

在異步下載下傳器啟動後,可以随時往下載下傳器中添加url,和對應的回調處理對象。

// 添加要下載下傳的頁面的url,和下載下傳完成後的處理函數。asyncNetwork.addUrl("...", new DownloadCallback() {	
@Override
public void onReceived(String result, String url) {	// 下載下傳成功後,執行這個函數。result為下載下傳下來的頁面資訊,url為對應的url連結。

}	
@Override
public void onFailed(Exception exception, String url) {	// 下載下傳失敗時,執行這個函數。exception為失敗原因。

}
});      
  • 解析工具

項目中依賴了兩個很常用的解析工具:xsoup 和 jsoup。

想要學習Java高架構、分布式架構、高可擴充、高性能、高并發、性能優化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分布式項目實戰學習架構師視訊免費擷取   架構群:835544715

點選連結加入群聊【JAVA進階架構】:https://jq.qq.com/?_wv=1027&k=5dbERkY