天天看点

从丁香园获取疫情的最新资讯

import requests
import time
import re
from bs4 import BeautifulSoup
import pymysql


class Virus(object):
    def __init__(self):
        super().__init__()
        self.url = "https://3g.dxy.cn/newh5/view/pneumonia"
        self.header = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36"}

    def get_html(self) -> str:
        """获取网站的html"""
        with requests.session() as s:
            response = s.get(self.url, headers=self.header, timeout=5)
            response.encoding = response.apparent_encoding
            if response:
                # with open("index.html", 'w',encoding='utf-8') as file:
                #     file.write(response.text)
                return response.text

    def covert_timestamp(self, timestamp) -> str:
        """对时间戳进行转化"""
        timestamp = int(timestamp[:-3])  # 这里是因为这个网站中的时间戳后面的三位是可以忽略的
        print(timestamp)
        localtime = time.localtime(timestamp)
        date = time.strftime("%Y-%m-%d %H:%M:%S", localtime)
        return date

    def get_picture(self):
        flag = True
        pattern = re.compile(r'<img class="mapImg___3LuBG" src="(.*?)">', re.S)
        src = re.findall(pattern, self.get_html())

        if not src:  # 如果没有找到
            print("未查找到相关内容,请联系作者更新")
            return

        if not os.path.exists('疫情地图.png'):
            with open("疫情地图.png", "wb") as png:
                png.write(requests.get(src[0]).content)
        else:
            while flag:
                choice = input("原疫情地图已存在,是否覆盖: (y/n)  ")
                if choice in ["Y", "y"]:
                    with open("疫情地图.png", "wb") as png:
                        png.write(requests.get(src[0]).content)
                        flag = False
                elif choice in ["N", 'n']:
                    return
                else:
                    print("输入错误,请重新输入")

    def get_des(self) -> dict:
        pattern = re.compile(r'<div class="mapTop___2VZCl">(.*?)</div>')
        des_div = re.findall(pattern, self.get_html())
        # print(des_div[0])
        soup = BeautifulSoup(des_div[0], 'lxml')
        p_list = soup.findAll('p')
        des_text_list = []
        for p in p_list:
            des_text_list.append(p.get_text())
        return {"description": des_text_list}

    def get_detail(self) -> dict:
        div = BeautifulSoup(self.get_html(), 'lxml').findAll(
            "div", {"class": "descBox___3dfIo"})[0]
        detail_text_list = []
        for p in div.find_all("p"):
            detail_text_list.append(p.get_text())
        return {"detail": detail_text_list}

    def get_broadcast(self) -> list:
        """"获取实时播报"""
        div_list = BeautifulSoup(self.get_html(), 'lxml').find_all(
            "div", {"class": "block___wqUAz"})
        info_list = []
        for div in div_list:

            time = div.find_all("span", {"class": "leftTime___2zf53"})[
                0]
            if time.find("span"):
                time = time.find("span").get_text()
            else:
                time = time.get_text()

            title = div.find_all("p", {"class": "topicTitle___2ovVO"})[
                0].get_text()

            if "最新" in title:
                title = title[2:]

            content = div.findAll("p", {"class": "topicContent___1KVfy"})[
                0].get_text()

            topicFrom = div.find_all("p", {"class": "topicFrom___3xlna"})[
                0].get_text()
            data = {
                "time": time,
                "title": title,
                "content": content,
                "from": topicFrom
            }
            info_list.append(data)
        return info_list

    def get_all_time(self):
        """"获取之前存在数据库中所有的时间数据"""
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = "SELECT time FROM detail"
        cursor.execute(sql)
        result = cursor.fetchall()
        connect.commit()
        cursor.close()
        connect.close()
        return [i[0] for i in result]

    def check_lastest(self):
        """"判断是否存在最新的消息"""
        new = [i for i in self.get_broadcast()][0]
        new_time = new.get("time")
        if new_time not in self.get_all_time():
            print("有新的实时播报")
            for i in new:
                print(new[i])
            self.insert(new.get("time"), new.get("title"), new.get("content"),
                        new.get("from"))

            print("\n目前 湖南,上海的情况如下:")
            hunan = self.get_city_detail("湖南")
            shanghai = self.get_city_detail("上海")
            print(hunan, shanghai, sep="\n")
        else:
            print("暂无最新消息")

    @staticmethod
    def create_database():
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = "CREATE TABLE IF NOT EXISTS detail(time TEXT, title TEXT, content TEXT, contentFrom TEXT)"
        cursor.execute(sql)
        connect.commit()
        cursor.close()
        connect.close()

    @staticmethod
    def insert(time, title, content, contentFrom):
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = 'INSERT INTO detail(time, title,content,contentFrom)\
         VALUES("%s","%s","%s","%s")' % (time, title, content, contentFrom)
        # 需要加引号,否则不能通过
        # cursor.execute(sql, (time, title, content, contentFrom))
        cursor.execute(sql)
        connect.commit()
        cursor.close()
        connect.close()

    def upload_data(self):
        info = self.get_broadcast()
        for data in info[::-1]:
            self.insert(data["time"], data["title"], data["content"],
                        data["from"])

    @staticmethod
    def get_first_data():
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = "SELECT * FROM detail LIMIT 1"
        data = cursor.execute(sql)
        data = data.fetchall()
        connect.commit()
        cursor.close()
        connect.close()
        print(data)

    def get_city_detail(self, city):
        for detail in self.get_detail()["detail"]:
            if city in detail:
                return detail


if __name__ == "__main__":
	"""just for test"""
    virus = Virus()
    # virus.get_picture()

    # virus.upload_data()
    # city = virus.get_city_detail("湖南")
    # print(city)
    virus.check_lastest()

           

在获取最新消息的同时,可以发送邮件到自己的邮箱(参考这篇文章)之中,然后将程序在服务其中不断的跑,便可以及时获取网页中最新的消息

!!!目前网站的结构发生变化,此方法不能正确获取信息!!!

更新之后采取下面代码:

import requests
import os
import time
import re
from bs4 import BeautifulSoup
import pymysql
import json


class Virus(object):
    def __init__(self):
        super().__init__()
        self.url = "https://3g.dxy.cn/newh5/view/pneumonia"
        self.header = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36"}
        self._html = None
        self.home_html()

    def get_html(self, url) -> str:
        """获取网站的html"""
        with requests.session() as s:
            response = s.get(url, headers=self.header, timeout=5)
            response.encoding = response.apparent_encoding
            if response:
                return response.text

    def home_html(self):
        """将网页的内容存储起来,方便之后引用"""
        self._html = self.get_html(self.url)

    def get_picture(self):
        flag = True
        pattern = re.compile(r'<img class="mapImg___3LuBG" src="(.*?)">', re.S)
        src = re.findall(pattern, self._html)

        if not src:  # 如果没有找到
            print("未查找到相关内容,请联系作者更新")
            return

        if not os.path.exists('疫情趋势图.png'):
            with open("疫情趋势图.png", "wb") as png:
                png.write(requests.get(src[0]).content)
        else:
            while flag:
                choice = input("原疫情趋势图已存在,是否覆盖: (y/n)  ")
                if choice in ["Y", "y"]:
                    with open("疫情趋势图.png", "wb") as png:
                        png.write(requests.get(src[0]).content)
                        flag = False
                elif choice in ["N", 'n']:
                    return
                else:
                    print("输入错误,请重新输入")

    def get_des(self) -> dict:
        pattern = re.compile(r'<div class="mapTop___2VZCl">(.*?)</div>')
        des_div = re.findall(pattern, self._html)
        # print(des_div[0])
        soup = BeautifulSoup(des_div[0], 'lxml')
        p_list = soup.findAll('p')
        des_text_list = []
        for p in p_list:
            des_text_list.append(p.get_text())
        return {"description": des_text_list}

    def get_area_stat(self):
        pattern = re.compile(r'window.getAreaStat = (.*?)}catch')
        json_text = re.findall(pattern, self._html)[0]
        return json.loads(json_text)

    def upload_area_stat(self) -> None:
        """"将获取到的省份城市的信息存储在数据库中"""
        state = self.get_area_stat()
        for info in state:
            cities = info.popitem()
            self.insert_to_province(info)
            for city in cities[1]:
                city["provinceShortName"] = info.get("provinceShortName")
                self.insert_to_city(city)

    def get_broadcast(self) -> list:
        """"获取实时播报"""
        url = "https://assets.dxycdn.com/gitrepo/bbs-mobile/dist/p__Pneumonia__timeline.async.4363ba04.js"
        html = self.get_html(url)
        pattern = re.compile(r"JSON.parse\('(.*?)'\)}", re.M)
        json_text = re.findall(pattern, html)[0].encode('utf-8').decode(
            "unicode_escape")
        return json.loads(json_text)

    def get_left_broadcast(self):
        pattern = re.compile(r"window.getTimelineService =(.*?)}catch")
        json_text = re.findall(pattern, self._html)[0]
        return json.loads(json_text)

    @staticmethod
    def get_all_id():
        """"获取之前存在数据库中所有的时间数据"""
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = "SELECT id FROM broadcast"
        cursor.execute(sql)
        result = cursor.fetchall()
        connect.commit()
        cursor.close()
        connect.close()
        return [i[0] for i in result]

    def check_latest(self) -> None:
        """"判断是否存在最新的消息,在这里假设的是处于不断的运行之中,
        所以只需要每次判断第一个便可以了"""
        new = [i for i in self.get_left_broadcast()][0]
        new_time = new.get("id")
        if new_time not in self.get_all_id():
            print("有新的实时播报")
            print("title: {}".format(new.get("title")))
            print("summary: {}".format(new.get("summary")))
            self.insert(new)

            print("\n目前 湖南,上海的情况如下:")
            hunan = self.get_city_detail("湖南")
            shanghai = self.get_city_detail("上海")
            print(hunan, shanghai, sep="\n")
        else:
            print("暂无最新消息")

    @staticmethod
    def create_database() -> None:
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = "CREATE TABLE IF NOT EXISTS broadcast(id INTEGER NOT NULL , " \
              "pubDate varchar(20) , title TEXT, summary TEXT, infoSource " \
              "varchar(40), sourceUrl TEXT, provinceId varchar(20), " \
              "provinceName varchar(20), createTime varchar(20), modifyTime " \
              "varchar(20)); "
        cursor.execute(sql)

        sql = "create table province(provinceName varchar(20)," \
              "provinceShortName varchar(20),confirmedCount INT," \
              "suspectedCount INTEGER, curedCount INTEGER," \
              "deadCount INTEGER,comment TEXT);"
        cursor.execute(sql)

        sql = "create table city(provinceShortName varchar(20),cityName " \
              "varchar(20),confirmedCount INTEGER,suspectedCount INT," \
              "curedCount INT,deadCount INTEGER); "
        cursor.execute(sql)

        connect.commit()
        cursor.close()
        connect.close()

    def insert(self, info) -> None:
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = 'INSERT INTO broadcast(id, pubDate, title, summary, infoSource, ' \
              'sourceUrl, provinceId, provinceName, createTime, modifyTime) ' \
              'VALUES("%s","%s","%s","%s","%s","%s","%s","%s","%s","%s")' % (
                  info.get("id"), self.convert_timestamp(info.get("pubDate")),
                  info.get("title"),
                  info.get("summary"),
                  info.get("infoSource"), info.get("sourceUrl"),
                  info.get("provinceId"),
                  info.get("provinceName"),
                  self.convert_timestamp(info.get('createTime')),
                  self.convert_timestamp(info.get("modifyTime")))
        # 需要加引号,否则can not通过
        cursor.execute(sql)
        connect.commit()
        cursor.close()
        connect.close()

    def upload_data(self) -> None:
        info = self.get_broadcast()
        for data in info[::-1]:
            self.insert(data)

    def upload_left_data(self) -> None:
        """"主页和实时播报页面的并没有同步,另外插入"""
        left_info = self.get_left_broadcast()
        id_list = self.get_all_id()
        for info in left_info[::-1]:
            if info.get("id") not in id_list:
                self.insert(info)

    @staticmethod
    def convert_timestamp(timestamp) -> str:
        """对时间戳进行转化"""
        timestamp = timestamp / 1000  # 这里是因为这个网站中的时间戳后面的三位是可以忽略的
        localtime = time.localtime(timestamp)
        date = time.strftime("%Y-%m-%d %H:%M:%S", localtime)
        return date

    def insert_to_province(self, info) -> None:
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = 'INSERT INTO province(provinceName, provinceShortName, ' \
              'confirmedCount, suspectedCount,curedCount, deadCount, comment) ' \
              'VALUES("%s","%s","%s","%s","%s","%s", "%s")' % (
                  info.get("provinceName"),
                  info.get("provinceShortName"),
                  info.get("confirmedCount"),
                  info.get("suspectedCount"),
                  info.get("curedCount"),
                  info.get("deadCount"),
                  info.get("comment"))
        cursor.execute(sql)
        connect.commit()
        cursor.close()
        connect.close()

    def get_city_detail(self, city) -> str:
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = 'SELECT * FROM province where provinceShortName="%s"' % city
        cursor.execute(sql)
        result = cursor.fetchone()
        connect.commit()
        cursor.close()
        connect.close()
        result = "在%s 确诊的有 %d 人,疑似 %d 人,死亡 %d 人,成功治愈 %d 人" % (
            result[1], result[2], result[3], result[4], result[6])
        return result

    @staticmethod
    def insert_to_city(info):
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = 'INSERT INTO city(provinceShortName, cityName, confirmedCount, ' \
              'suspectedCount, curedCount, deadCount) ' \
              'VALUES("%s","%s","%s","%s","%s","%s")' % (
                  info.get("provinceShortName"),
                  info.get("cityName"),
                  info.get("confirmedCount"),
                  info.get("suspectedCount"),
                  info.get("curedCount"),
                  info.get("deadCount"))
        cursor.execute(sql)
        connect.commit()
        cursor.close()
        connect.close()

    def refresh_province_city(self):
        connect = pymysql.connect("localhost", 'root', 'Edgar', 'edgar')
        cursor = connect.cursor()
        sql = 'truncate table city;'
        cursor.execute(sql)
        sql = 'truncate table province;'
        cursor.execute(sql)
        connect.commit()
        cursor.close()
        connect.close()
        self.upload_area_stat()



if __name__ == "__main__":
    virus = Virus()

    # virus.get_picture()

    virus.create_database()

    # print(virus.get_des())

    # print(virus.get_area_stat())

    # virus.upload_area_stat()

    # print(virus.get_broadcast())

    # print(virus.get_left_broadcast())

    # print(virus.get_all_id())

    # virus.check_latest()

    # virus.upload_data()
    # virus.upload_left_data()

    # print(virus.get_city_detail("上海"))

    # virus.check_latest()

    virus.refresh_province_city()



           

运行程序之后部分结果如下:

provinceName provinceShortName confirmedCount suspectedCount curedCount deadCount comment
湖北省 湖北 549 31 24
广东省 广东 53 2
浙江省 浙江 43 1
北京市 北京 36 1
重庆市 重庆 27
湖南省 湖南 24
上海市 上海 20 1
四川省 四川 15
山东省 山东 15
安徽省 安徽 15
广西壮族自治区 广西 13
福建省 福建 10 2 福建地区新增疑似 2 例(漳州 1 例;三明 1 例)
河南省 河南 9
江苏省 江苏 9
海南省 海南 8
天津市 天津 8
江西省 江西 7
陕西省 陕西 5
黑龙江省 黑龙江 4 1
辽宁省 辽宁 4
贵州省 贵州 3
吉林省 吉林 3
云南省 云南 2
宁夏回族自治区 宁夏 2
香港 香港 2
澳门 澳门 2
河北省 河北 2 1
甘肃省 甘肃 2
新疆维吾尔自治区 新疆 2
台湾 台湾 1
山西省 山西 1
内蒙古自治区 内蒙古 1
青海省 青海 1 西宁新增疑似 1 例
provinceShortName cityName confirmedCount suspectedCount curedCount deadCount
湖北 武汉 495 31 23
湖北 孝感 22
湖北 黄冈 12
湖北 荆州 8
湖北 荆门 8
湖北 仙桃 2
湖北 宜昌 1 1
湖北 十堰 1
广东 深圳 15 2
广东 珠海 8
广东 佛山 7
广东 广州 7
广东 惠州 5
广东 韶关 3
广东 阳江 3
广东 湛江 2
广东 中山 2
广东 肇庆 1
广东 清远 1
浙江 台州 18
浙江 杭州 6
浙江 温州 6 1
浙江 宁波 5
浙江 嘉兴 3
浙江 衢州 2
浙江 舟山 1
浙江 绍兴 1
浙江 金华 1
北京 外地来京人员 10
北京 海淀 6
北京 朝阳 5
北京 西城 4
北京 昌平 3
北京 大兴 2 1
北京 丰台 2
北京 通州 2
北京 石景山 1
北京 顺义 1
重庆 万州区 3
重庆 巫山县 3
重庆 长寿区 3
重庆 垫江县 2
重庆 永川区 2
重庆 九龙坡区 2
重庆 渝北区 2
重庆 开州区 2
重庆 涪陵区 1
重庆 大渡口区 1
重庆 忠县 1
重庆 云阳县 1
重庆 奉节县 1
重庆 巫溪县 1
重庆 秀山县 1
重庆 两江新区 1
湖南 长沙 8
湖南 永州 4
湖南 怀化 3
湖南 岳阳 3
湖南 娄底 3
湖南 郴州 1
湖南 株洲 1
湖南 湘潭 1
四川 成都 7
四川 广安 2
四川 绵阳 2
四川 达州 1
四川 德阳 1
四川 遂宁 1
四川 雅安 1
山东 青岛 4
山东 威海 2
山东 临沂 2
山东 济南 2
山东 烟台 2
山东 潍坊 1
山东 日照 1
山东 济宁 1
安徽 合肥 6
安徽 六安 2
安徽 阜阳 2
安徽 滁州 1
安徽 亳州 1
安徽 安庆 1
安徽 池州 1
安徽 蚌埠 1
广西 北海 6
广西 柳州 2
广西 桂林 2
广西 梧州 1
广西 百色 1
广西 河池 1
福建 福州 5
福建 厦门 3
福建 泉州 1
福建 宁德 1
河南 郑州 3
河南 巩义 2
河南 洛阳 1
河南 三门峡 1
河南 信阳 1
河南 周口 1
江苏 南京 3
江苏 苏州 2
江苏 连云港 1
江苏 扬州 1
江苏 南通 1
江苏 无锡 1
海南 海口 3
海南 三亚 2
海南 万宁 2
海南 临高县 1
江西 南昌 2
江西 抚州 1
江西 萍乡 1
江西 九江 1
江西 新余 1
江西 吉安 1
陕西 西安 2
陕西 咸阳 1
陕西 安康 1
陕西 延安 1
黑龙江 牡丹江 1
黑龙江 哈尔滨 1
黑龙江 大庆 1
黑龙江 绥化 1 1
辽宁 沈阳 2
辽宁 大连 1
辽宁 朝阳 1
贵州 贵阳 1
贵州 铜仁 1
贵州 黔南州 1
吉林 长春 1
吉林 吉林 1
吉林 松原 1
云南 昆明 2
宁夏 银川 1
宁夏 中卫 1
河北 石家庄 1
河北 沧州 1 1
甘肃 兰州 1
甘肃 白银 1
山西 太原 1
内蒙古 满洲里 1

最新代码见 GitHub