天天看點

Clickhouse 資料入庫

Clickhouse 資料入庫

前面我們已經部署好了clickhouse 了,這一節搞點點資料寫入clickhouse,以供我們後續繼續學習使用,這裡我們的資料某站的視訊基本資訊和視訊的評論資訊

clickhouse 建表

建庫

create database bzhan;      

視訊基本資訊表

CREATE TABLE bzhan.video
(
    `id` Int64 comment '視訊ID',
    `title` String comment '标題',
    `arcurl` String comment '播放url',
    `pic` String comment '封面URL',
    `play` Int64 comment '播放次數',
    `favorites` Int64 comment '收藏次數',
    `likes` Int64 comment '點贊次數',
    `duration` String comment '時長',
    `author` String comment 'up主'
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8192      

評論表

CREATE TABLE bzhan.comments
(
    `oid` Int64 comment'視訊ID',
    `mid` Int64 comment '使用者ID' ,
    `uname` String '使用者昵稱',
    `message` String '評論内容',
    `ctime` String '建立時間'
)
ENGINE = MergeTree
ORDER BY oid
SETTINGS index_granularity = 8192
;      

爬蟲

下面這個爬蟲的功能大緻如下:

  1. 根據輸入關鍵字搜尋相關視訊
  2. 判斷搜尋出來的視訊是否入庫
  3. 如果沒有入庫則入庫視訊基本資訊,然後爬取該視訊的評論資訊并且入庫
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json

import requests
import datetime

from clickhouse_driver import Client

client = Client(host='localhost', port=9000, database='bzhan', user='root', password='www1234')

headers = {
    'User-Agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36",
    'cookie': "_uuid=DBCB679D-4C6C-C75F-DB5E-E987135E7D4F77264infoc; buvid3=AB9C34B3-BC6E-421A-9439-A7C34F3515F4148805infoc; video_page_version=v_old_home_14; rpdid=|(J~lkkkkJYR0J'uYJ~k)muJm; PVID=1; fingerprint3=056b1a5882e909b12ebdf0b6bde4a79a; fingerprint_s=1312c99928a31fef5f1bb2eb0f79f263; buvid4=754E9B20-75D2-9EB2-A828-7EFBF0FEE85727000-022012712-BHUs2ZoAtmga1zxje3LRHw==; CURRENT_QUALITY=80; i-wanna-go-back=-1; CURRENT_BLACKGAP=0; blackside_state=0; buvid_fp_plain=undefined; b_nut=100; b_ut=7; b_lsid=4E6C10DE4_1831D85FAC0; bsource=search_bing; nostalgia_conf=-1; innersign=1; sid=6n7z99fi; fingerprint=dc0de3a15978e2e40e6072e5050b1723; buvid_fp=dc0de3a15978e2e40e6072e5050b1723; CURRENT_FNVAL=16",
    'Content-Type': 'application/json',
    'Accept-Encoding': '',
    'Connection': 'keep-alive'
}


# 抓取視訊的評論 oid視屏ID
def comments(oid):
    cnt = 0
    baseUrl = "https://api.bilibili.com/x/v2/reply/main?mode=3&next={}&oid={}&plat=1&seek_rpid=&type=1"
    url = baseUrl.format(cnt, oid)
    response = requests.get(url, headers=headers)
    replies = response.json()['data']['replies']
    while replies is not None:
        replyInfo2Database(replies)
        cnt = cnt + 1
        url = baseUrl.format(cnt, oid)
        response = requests.get(url, headers=headers)
        replies = response.json()['data']['replies']


# 擷取評論清單的資訊
def replyInfo2Database(replies):
    for reply in replies:
        message = reply['content']['message']
        member = reply['member']
        mid = member['mid']
        uname = member['uname']
        oid = reply['oid']
        try:
            ctime = datetime.datetime.utcfromtimestamp(reply['ctime']).strftime("%Y-%m-%d %H:%M:%S")
            insert_sql = "insert into comments(oid,mid,uname,message,ctime) values ('%s','%s','%s','%s','%s')" % \
                         (oid, mid, uname.replace("'", ""), message.replace("'", ""), ctime)
            client.execute(insert_sql)
            # 評論本身也可以被再次評論,是以這裡應該遞歸去取
        except Exception as e:
            print(insert_sql)
            print(e)
        if (reply['replies'] is not None):
            replyInfo2Database(reply['replies'])


# 輸入關鍵詞搜尋視屏
def searchVideo(keyword):
    page = 1
    baseUrl = "https://api.bilibili.com/x/web-interface/search/type?__refresh__=true&_extra=&context=&page={}&page_size=30&from_source=&from_spmid=333.337&platform=pc&highlight=1&single_column=0&keyword={}&qv_id=XaRt9mncU6G0bkxgx6XwOzKOeUmXy95f&category_id=&search_type=video&dynamic_offset=0&preload=true&com2co=true"
    url = baseUrl.format(page, keyword)
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        numPages = int(response.json()['data']['numPages'])
    # 循環爬取
    while response.status_code == 200 and page <= numPages:
        videoInfoList = response.json()['data']['result']
        videoInfo(videoInfoList)
        page = page + 1
        url = baseUrl.format(page, keyword)
        response = requests.get(url, headers=headers)


# 輸出視屏清單裡面視屏的基本資訊
def videoInfo(videoInfoList):
    for videoInfo in videoInfoList:
        # 判斷是否已經入庫
        if (exists(videoInfo['id'])):
            print("視屏已存在:%s" % videoInfo['id'])
        else:
            # 視屏資料入庫
            videoInfo2DadaBase(videoInfo)
            # 擷取視屏的評論
            print("comments {}".format(videoInfo['id']))
            comments(videoInfo['id'])


def videoInfo2DadaBase(videoInfo):
    oid = videoInfo['id']
    title = videoInfo['title']
    arcurl = videoInfo['arcurl']
    pic = videoInfo['pic']
    play = videoInfo['play']
    favorites = videoInfo['favorites']
    like = videoInfo['like']
    duration = videoInfo['duration']
    author = videoInfo['author']
    print(oid, title, arcurl, pic, play, favorites, like, duration, author)
    insert_sql = "insert into video(id,title,arcurl,pic,play,favorites,likes,duration,author) values ('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % \
                 (oid, title, arcurl, pic, play, favorites, like, duration, author)
    client.execute(insert_sql)


def exists(id):
    ans = client.query_dataframe("select count(1) as cnt from video where id={}".format(id))
    if (ans.iat[0, 0] > 0):
        return True
    else:
        return False
searchVideo("美國")      

代碼寫了相關的注釋,就不做解釋了,這個直接寫庫的操作我們後面做優化的時候再講它存在的問題

資料展示

視訊資訊

評論資訊

彙總資訊

Clickhouse 資料入庫

總結