天天看點

Python3:爬蟲入門-tornado任務隊列并發爬蟲python3在較複雜環境下利用tornado進行并發爬蟲

目錄

python3在較複雜環境下利用tornado進行并發爬蟲

1. 背景介紹

2.需求分析

3.代碼實作 

python3在較複雜環境下利用tornado進行并發爬蟲

1. 背景介紹

以下連結是tornado 項目用戶端并發縱深爬蟲demos,不過這個項目并沒有提供在構造請求對象情況下的複雜爬蟲,鄙人基于此改造了一篇攜帶請求頭及認證資訊的并發批量爬蟲代碼

https://github.com/tornadoweb/tornado/blob/master/demos/webspider/webspider.py

2.需求分析

2.1 get方式爬取某一站點的大批量的分頁資料

2.2 需要進行登入認證

2.3 需要儲存會話資訊

2.4 将api傳回的資料進行json格式的轉換

2.5 批量入庫

2.6 爬取完畢立刻停止爬蟲

2.7 将待爬取資源放入隊列,并統計最終的成功、失敗的連結數量

2.8 計算耗費時間

2.9 不應使用多線程、多程序,利用性能消耗更小的tornado的異步協程

3.代碼實作 

代碼實作裡面,資料已經脫敏,如要測試請自行找目标網站,更換相應的請求頭資訊和url拼接

#!/usr/bin/env python3

import time
import json
from datetime import timedelta
from tornado import gen, httpclient, ioloop, queues

page = 1
pageSize = 20

# 儲備橫向爬蟲URI
URLS = []
for page in range(1, 100):
    session_monitor = f'https://bizapi.csdn.net/blog-console-api/v1/article/list?page={page}&pageSize={pageSize}'
    URLS.append(session_monitor)

# 并發執行限定
concurrency = 100


async def get_data_from_url(url):
    """擷取目前url傳回的資料
    """
    ip = '172.200.200.200'
    sessionid = 'cudjqtngdlnn76mugl6lghjo2n'
    header = {
        'Accept': 'application/json, text/javascript, */*; q=0.01',
        'Accept-Encoding': 'gzip, deflate',
        'Accept-Language': 'zh-CN,zh;q=0.9',
        'Connection': 'keep-alive',
        'Content-Length': '0',
        'Cookie': sessionid,
        'Host': ip,
        'Origin': 'http://' + ip,
        'Referer': 'http://' + ip + '/index.html',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36',
        # 'X-Requested-With': 'XMLHttpRequest'
    }
    request = httpclient.HTTPRequest(url=url,
                                     method='GET',
                                     headers=header,
                                     connect_timeout=2.0,
                                     request_timeout=2.0,
                                     follow_redirects=False,
                                     max_redirects=False)

    response = await httpclient.AsyncHTTPClient().fetch(request)

    print("fetched %s" % url)
    html = response.body.decode(errors="ignore")
    print(html)
    json_ret = json.loads(html)
    print(json_ret)


async def main():
    q = queues.Queue()
    start = time.time()
    fetching, fetched, dead = set(), set(), set()

    for url in URLS:
        await q.put(url)

    async def fetch_url(current_url):
        if current_url in fetching:
            return

        print("fetching %s" % current_url)
        fetching.add(current_url)
        data = await get_data_from_url(current_url)
        # print(data)
        fetched.add(current_url)

    async def worker():
        async for url in q:
            if url is None:
                return
            try:
                await fetch_url(url)
            except Exception as e:
                print("Exception: %s %s" % (e, url))
                dead.add(url)
            finally:
                q.task_done()

    workers = gen.multi([worker() for _ in range(concurrency)])
    await q.join(timeout=timedelta(seconds=300))
    assert fetching == (fetched | dead)
    print("Done in %d seconds, fetched %s URLs." % (time.time() - start, len(fetched)))
    print("Unable to fetch %s URLS." % len(dead))

    # Signal all the workers to exit.
    for _ in range(concurrency):
        await q.put(None)
    await workers


if __name__ == "__main__":
    io_loop = ioloop.IOLoop.current()
    io_loop.run_sync(main)
           

繼續閱讀