天天看點

<随便寫> 多線程的例子

'''
	一個線程在使用這個共享的時候,其他線程必須等待他結束
	通過"鎖"實作,作用就是防止多個線程使用這片記憶體空間
	程序:程式的一次執行
	線程:cpu運算的基本排程機關
	多線程:大量密集I/O處理,在等待響應的時候,其他線程去工作
	多程序:大量的密集并行計算
	scrapy:異步網絡架構(很多協程在處理)
	頁碼隊列--線程取頁碼爬取(采集線程--網絡IO)--資料隊列(得到的響應)--線程解析網頁(解析線程磁盤IO)--解析後的資料存儲
'''
# 請求
import requests
# 隊列
from multiprocessing import Queue
# 線程
from threading import Thread
import threading
# 解析
from lxml import etree
# 存儲
import json
import time


class ThreadCrawl(Thread):
	def __init__(self, threadName, pageQueue, dataQueue):
		# 調用父類的初始化方法
		super(ThreadCrawl, self).__init__()
		self.threadName = threadName
		self.pageQueue = pageQueue
		self.dataQueue = dataQueue
		self.headers = {"User-Agent":"Mozilla/5.0(Windows NT 10.0;WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.170 Safari/537.36"}

	# thread.start()會執行run方法
	def run(self):
		print("啟動"+self.threadName)
		while not CRAWL_EXIT:
			try:
				# 從頁碼隊列取出一個數字,
				# 可選參數block(預設Ture)
				# 1.隊列為空,block為Ture,會進入阻塞狀态,直到有新的值進入隊列
				# 2.如果隊列為空.block為False,會彈出Queue.empty()出錯
				page = self.pageQueue.get(False)
				url = "https://www.qiushibaike.com/text/page/" + str(page) + "/"
				content = requests.get(url,headers=self.headers).text
				#調用資料隊列,将源碼放進去
				self.dataQueue.put(content)
			except:
				pass
			print("結束"+self.threadName)

class ThreadParse(Thread):
	def __init__(self,threadName,dataQueue,filename,lock):
		super(ThreadParse,self).__init__()
		self.threadName = threadName
		self.dataQueue = dataQueue
		self.filename = filename
		self.lock = lock

	def run(self):
		while not PARSE_EXIT:
			try:
				html = self.dataQueue.get(False)
				self.parse(html)
			except:
				pass

	def parse(self,html):
		html = etree.HTML(html)
		print(html)

		# with 後面有兩個必須執行的操作:__enter__ 和 _exit__
		# 不管裡面的操作結果如何,都會執行打開、關閉
		# 打開鎖、處理内容、釋放鎖
		with self.lock:
			# 寫入存儲的解析後的資料
			self.filename.write(json.dumps(html, ensure_ascii=False).encode("utf-8") + "\n")


CRAWL_EXIT = False
PARSE_EXIT = False

def main():
	# 頁碼隊列,可以存儲20個值
	pageQueue = Queue(20)
	# 放入1-10數字,先進先出
	for i in range(1, 21):
		pageQueue.put(i)

	# 資料隊列,HTML源碼,不寫參數,預設無限
	dataQueue = Queue()

	# 建立鎖
	lock = threading.Lock()

	# 采集線程名字
	crawlList = ["采集線程1号", "采集線程2号", "采集線程3号"]

	# 存儲采集線程
	thread_crawl = []
	for threadName in crawlList:
		# 寫一個
		thread = ThreadCrawl(threadName, pageQueue, dataQueue)
		thread.start()
		thread_crawl.append(thread)

	filename = open("duanzi.json","a")
	#解析線程名字
	parseList = ["解析線程1号","解析線程2号","解析線程3号"]
	threadparse = []
	for threadName in parseList:
		thread = ThreadParse(threadName,dataQueue,filename,lock)
		thread.start()
		threadparse.append(thread)



	#如果隊列不為空,一直在這等待
	while not pageQueue.empty():
		pass
	#如果隊列為空,CRAWL_EXIT = True 退出
	global CRAWL_EXIT
	CRAWL_EXIT = True

	#加阻塞,線程做完才能運作主線程
	for thread in thread_crawl:
		thread.join()
		print(thread)

	while not dataQueue.empty():
		pass

	global PARSE_EXIT
	PARSE_EXIT = True

	for thread in threadparse:
		thread.join()
		print(thread)

	with lock:
		# 關閉檔案
		filename.close()
	print("謝謝使用")

if __name__ == '__main__':
	main()