Clickhouse 資料入庫
前面我們已經部署好了clickhouse 了,這一節搞點點資料寫入clickhouse,以供我們後續繼續學習使用,這裡我們的資料某站的視訊基本資訊和視訊的評論資訊
clickhouse 建表
建庫
create database bzhan;
視訊基本資訊表
CREATE TABLE bzhan.video
(
`id` Int64 comment '視訊ID',
`title` String comment '标題',
`arcurl` String comment '播放url',
`pic` String comment '封面URL',
`play` Int64 comment '播放次數',
`favorites` Int64 comment '收藏次數',
`likes` Int64 comment '點贊次數',
`duration` String comment '時長',
`author` String comment 'up主'
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8192
評論表
CREATE TABLE bzhan.comments
(
`oid` Int64 comment'視訊ID',
`mid` Int64 comment '使用者ID' ,
`uname` String '使用者昵稱',
`message` String '評論内容',
`ctime` String '建立時間'
)
ENGINE = MergeTree
ORDER BY oid
SETTINGS index_granularity = 8192
;
爬蟲
下面這個爬蟲的功能大緻如下:
- 根據輸入關鍵字搜尋相關視訊
- 判斷搜尋出來的視訊是否入庫
- 如果沒有入庫則入庫視訊基本資訊,然後爬取該視訊的評論資訊并且入庫
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import requests
import datetime
from clickhouse_driver import Client
client = Client(host='localhost', port=9000, database='bzhan', user='root', password='www1234')
headers = {
'User-Agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36",
'cookie': "_uuid=DBCB679D-4C6C-C75F-DB5E-E987135E7D4F77264infoc; buvid3=AB9C34B3-BC6E-421A-9439-A7C34F3515F4148805infoc; video_page_version=v_old_home_14; rpdid=|(J~lkkkkJYR0J'uYJ~k)muJm; PVID=1; fingerprint3=056b1a5882e909b12ebdf0b6bde4a79a; fingerprint_s=1312c99928a31fef5f1bb2eb0f79f263; buvid4=754E9B20-75D2-9EB2-A828-7EFBF0FEE85727000-022012712-BHUs2ZoAtmga1zxje3LRHw==; CURRENT_QUALITY=80; i-wanna-go-back=-1; CURRENT_BLACKGAP=0; blackside_state=0; buvid_fp_plain=undefined; b_nut=100; b_ut=7; b_lsid=4E6C10DE4_1831D85FAC0; bsource=search_bing; nostalgia_conf=-1; innersign=1; sid=6n7z99fi; fingerprint=dc0de3a15978e2e40e6072e5050b1723; buvid_fp=dc0de3a15978e2e40e6072e5050b1723; CURRENT_FNVAL=16",
'Content-Type': 'application/json',
'Accept-Encoding': '',
'Connection': 'keep-alive'
}
# 抓取視訊的評論 oid視屏ID
def comments(oid):
cnt = 0
baseUrl = "https://api.bilibili.com/x/v2/reply/main?mode=3&next={}&oid={}&plat=1&seek_rpid=&type=1"
url = baseUrl.format(cnt, oid)
response = requests.get(url, headers=headers)
replies = response.json()['data']['replies']
while replies is not None:
replyInfo2Database(replies)
cnt = cnt + 1
url = baseUrl.format(cnt, oid)
response = requests.get(url, headers=headers)
replies = response.json()['data']['replies']
# 擷取評論清單的資訊
def replyInfo2Database(replies):
for reply in replies:
message = reply['content']['message']
member = reply['member']
mid = member['mid']
uname = member['uname']
oid = reply['oid']
try:
ctime = datetime.datetime.utcfromtimestamp(reply['ctime']).strftime("%Y-%m-%d %H:%M:%S")
insert_sql = "insert into comments(oid,mid,uname,message,ctime) values ('%s','%s','%s','%s','%s')" % \
(oid, mid, uname.replace("'", ""), message.replace("'", ""), ctime)
client.execute(insert_sql)
# 評論本身也可以被再次評論,是以這裡應該遞歸去取
except Exception as e:
print(insert_sql)
print(e)
if (reply['replies'] is not None):
replyInfo2Database(reply['replies'])
# 輸入關鍵詞搜尋視屏
def searchVideo(keyword):
page = 1
baseUrl = "https://api.bilibili.com/x/web-interface/search/type?__refresh__=true&_extra=&context=&page={}&page_size=30&from_source=&from_spmid=333.337&platform=pc&highlight=1&single_column=0&keyword={}&qv_id=XaRt9mncU6G0bkxgx6XwOzKOeUmXy95f&category_id=&search_type=video&dynamic_offset=0&preload=true&com2co=true"
url = baseUrl.format(page, keyword)
response = requests.get(url, headers=headers)
if response.status_code == 200:
numPages = int(response.json()['data']['numPages'])
# 循環爬取
while response.status_code == 200 and page <= numPages:
videoInfoList = response.json()['data']['result']
videoInfo(videoInfoList)
page = page + 1
url = baseUrl.format(page, keyword)
response = requests.get(url, headers=headers)
# 輸出視屏清單裡面視屏的基本資訊
def videoInfo(videoInfoList):
for videoInfo in videoInfoList:
# 判斷是否已經入庫
if (exists(videoInfo['id'])):
print("視屏已存在:%s" % videoInfo['id'])
else:
# 視屏資料入庫
videoInfo2DadaBase(videoInfo)
# 擷取視屏的評論
print("comments {}".format(videoInfo['id']))
comments(videoInfo['id'])
def videoInfo2DadaBase(videoInfo):
oid = videoInfo['id']
title = videoInfo['title']
arcurl = videoInfo['arcurl']
pic = videoInfo['pic']
play = videoInfo['play']
favorites = videoInfo['favorites']
like = videoInfo['like']
duration = videoInfo['duration']
author = videoInfo['author']
print(oid, title, arcurl, pic, play, favorites, like, duration, author)
insert_sql = "insert into video(id,title,arcurl,pic,play,favorites,likes,duration,author) values ('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % \
(oid, title, arcurl, pic, play, favorites, like, duration, author)
client.execute(insert_sql)
def exists(id):
ans = client.query_dataframe("select count(1) as cnt from video where id={}".format(id))
if (ans.iat[0, 0] > 0):
return True
else:
return False
searchVideo("美國")
代碼寫了相關的注釋,就不做解釋了,這個直接寫庫的操作我們後面做優化的時候再講它存在的問題
資料展示
視訊資訊
評論資訊
彙總資訊