天天看點

python es 操作

# coding:utf-8
import datetime

import urllib3
from elasticsearch import Elasticsearch, exceptions
import config as conf
import re

es_client = Elasticsearch("http://%s:%s@%s:%d" % (conf.es_user, conf.es_passwd, conf.es_host, conf.es_port))


def get_date(index_name):
    """
    提取索引中的日期值
    :param index_name: 索引名稱
    :return: 日期
    """
    return int(re.sub(r'\D', "", index_name))


def get_indices(n):
    """
    擷取沒有别名并且超出保留範圍的索引清單
    :param n: 保留近n天的資料
    :return: list
    """
    index_dict = es_client.indices.get_alias()
    recommend_ches_hot = []
    recommend_ches = []
    recommend_hot_article = []
    recommend_hot_video = []
    recommend_hot_bbs = []
    recommend_article = []
    recommend_bbs = []
    result = []

    for index_name in index_dict:

        # 擷取别名為空的索引
        if not index_dict[index_name]["aliases"]:

            # 首頁推薦冷啟動資料索引
            if index_name.startswith("recommend_ches_hot"):
                recommend_ches_hot.append(index_name)
            elif index_name.startswith("recommend_ches") and not index_name.startswith("recommend_ches_update") and not index_name.startswith("recommend_ches_vid_update"):
            # 首頁推薦資料索引
                recommend_ches.append(index_name)

            # 文章推薦冷啟動資料索引
            if index_name.startswith("recommend_hot_article"):
                recommend_hot_article.append(index_name)
            # 文章推薦離線資料索引
            if index_name.startswith("recommend_article"):
                recommend_article.append(index_name)

            # 社群推薦冷啟動資料索引
            if index_name.startswith("recommend_hot_bbs"):
                recommend_hot_bbs.append(index_name)
            # 社群推薦離線資料索引
            if index_name.startswith("recommend_bbs_scatter"):
                recommend_bbs.append(index_name)

            # 視訊推薦冷啟動資料索引
            if index_name.startswith("recommend_hot_video"):
                recommend_hot_video.append(index_name)

    # 排序擷取需要删除的索引
    recommend_ches_hot.sort(key=get_date, reverse=True)
    recommend_ches.sort(key=get_date, reverse=True)
    recommend_hot_article.sort(key=get_date, reverse=True)
    recommend_article.sort(key=get_date, reverse=True)
    recommend_hot_bbs.sort(key=get_date, reverse=True)
    recommend_bbs.sort(key=get_date, reverse=True)
    recommend_hot_video.sort(key=get_date, reverse=True)

    result.extend(recommend_ches_hot[n:])
    result.extend(recommend_ches[n:])
    result.extend(recommend_hot_article[n:])
    result.extend(recommend_article[n:])
    result.extend(recommend_hot_bbs[n:])
    result.extend(recommend_bbs[n:])
    result.extend(recommend_hot_video[n:])

    return result


def delete_indices(indices):
    """
    批量删除索引
    :param indices: 索引名稱清單
    """
    es_client.indices.delete(",".join(indices))


def create_indices(index_name, body=None):
    """
    建立索引
    :param index_name: 索引名稱
    :param body: 可以配置`settings`和`mappings`
    """
    es_client.indices.create(index_name, body)


def delete_by_day(index_name, n):
    """
    删除索引中前第n天的資料。
    :param index_name: 索引名稱
    :param n: 前第n天
    """
    now_time = datetime.datetime.now()
    day = (now_time + datetime.timedelta(days=-n)).strftime("%Y%m%d")
    delete_options = {
        "query": {
            "bool": {
                "must": [{"term": {"day": day}}]
            }
        }
    }
    result = ""
    try:
        result = es_client.delete_by_query(index=index_name, body=delete_options, scroll_size=3000, slices=5, conflicts="proceed")
    # delete_by_query會抛出逾時異常,但删除指令已經送出到服務端并會執行完畢,隻是用戶端擷取不到指令執行的傳回結果
    except (urllib3.exceptions.ReadTimeoutError, exceptions.ConnectionTimeout):
        print("連接配接逾時,未擷取到執行結果。")
    print(result)

def delete_log_expire():
    es_client_log = Elasticsearch("http://%s:%s@%s:%d" % (conf.es_user_log, conf.es_passwd_log, conf.es_host_log, conf.es_port_log))
    import time
    expire = time.time()
    delete_options = {
        "query": {
            "range": {
                "expire": {"lte": int(expire)}
            }
        }
    }
    result = ""
    try:
        result = es_client_log.delete_by_query(index="webapi", body=delete_options, scroll_size=3000, slices=5, conflicts="proceed")
        print(result)
        result = es_client_log.delete_by_query(index="buss", body=delete_options, scroll_size=3000, slices=5, conflicts="proceed")
        print(result)
    # delete_by_query會抛出逾時異常,但删除指令已經送出到服務端并會執行完畢,隻是用戶端擷取不到指令執行的傳回結果
    except (urllib3.exceptions.ReadTimeoutError, exceptions.ConnectionTimeout):
        print("連接配接逾時,未擷取到執行結果。")



if __name__ == '__main__':

    # 保留近兩天資料
    index_name_list = get_indices(2)
    if index_name_list:
        delete_indices(index_name_list)

    print("已删除索引:", index_name_list)
    #統一日志服務
    delete_log_expire()
    # 增量更新的ES資料生命周期維護
    delete_by_day("recommend_ches", 30)
    delete_by_day("recommend_ches_vid", 30)      
# coding:utf-8

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

import config as conf
import time

from HiveClient import HiveClient

es_client = Elasticsearch("http://%s:%s@%s:%d" % (conf.es_user, conf.es_passwd, conf.es_host, conf.es_port))

def fetch(index, size=10):
    """
    查詢線上ES資料

    :param index: 傳入想要查詢的index名稱,可以使用别名
             一次查詢多個index時,可以傳入一個包含多個index的list集合, 或者傳入一個用逗号分隔多個index的字元串
             傳入空串或者`_all`将查詢所有
    :param size: 傳回的記錄條數 (預設: 10)
    :return: 疊代器
    """
    result = es_client.search(index=index, params={"size": size}, sort="rank:asc")
    for hit in result['hits']['hits']:
        yield hit['_source']


def queryById(index, uid, size=10):

    # q = '{"query":{"bool":{"must":[{"term":{"uid":"' + uid + '"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}}'
    result = es_client.search(index=index, params={"size": size}, q="uid:"+uid, sort="rank:asc")#, q="uid:"+uid
    for hit in result['hits']['hits']:
        yield hit['_source']


def saveBulk(datas, _index, _type):

    ACTIONS = []
    for data in datas:
        action = {
            "_index": _index,
            "_type": _type,
            "_id": data.pop("id"),
            "_source": data
        }
        ACTIONS.append(action)
    res, _ = bulk(es_client, ACTIONS, index=_index, raise_on_error=True)
    print(res)


def saveBulkByNum(datas, _index, _type):

    actions = []
    num = 0
    res = 0
    for data in datas:
        data["rank"]=0
        data["typeid"]=0
        action = {
            "_index": _index,
            "_type": _type,
            "_id": data.pop("id"),
            "_source": data
        }
        actions.append(action)
        num += 1
        if num % 10000 == 0:
            res, _ = bulk(es_client, actions, index=_index, raise_on_error=True)
            print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "寫入了"+str(res)+"條資料", "總共寫入了"+str(num)+"條")
            actions = []
    res, _ = bulk(es_client, actions, index=_index, raise_on_error=True)
    print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "寫入了"+str(res)+"條資料", "總共寫入了"+str(num)+"條")


def hiveToES(table_name, day, _index, _type):

    # 查詢Hive資料
    hc = HiveClient(conf.hive_host, conf.hive_port, conf.hive_user)
    hql = "select * from {} where `day`='{}' limit 20".format(table_name, day)
    datas = hc.query(hql)
    # print(hql)
    saveBulkByNum(datas, _index, _type)


def deleteByQuery(index_name, uid, day):

    # 多個條件限制的删除
    delete_options = {
        "query": {
            "bool": {
                "must": [{"term": {"uid": uid}}, {"range": {"day": {"lte": day}}}]
            }
        }
    }

    result = es_client.delete_by_query(index=index_name, body=delete_options)

    print(result)


if __name__ == '__main__':

    count = 0
    # from queue import Queue
    # q = Queue()
    # s = set()
    # for x in queryById("recommend_ches", "2107547", 500): # 1760110
    #     # print(x['itemid'])
    #     print(x['itemid'], x['rank'], x['typeid'], x['day'])
    #     count += 1
    #     # if int(x['itemid']) in [102385 ,106933 ,107265 ,110662 ,110662 ,110662 ,120216 ,110662 ,120216 ,102385 ,117524 ,105200 ,103147 ,114137 ,117494 ,108929 ,126150 ,114303 ,118683 ,114137 ,103147 ,114303 ,118683 ,123332 ,100745 ,108929 ,100745 ,108929 ,117494 ,114137 ,118683 ,123332 ,100745 ,100485 ,101930 ,136657 ,124968 ,103850 ,124968 ,103850 ,121481 ,101756 ,106281 ,120118 ,146702 ,128497 ,120202 ,100195 ,119058 ,120118 ,106281 ,100195 ,119058 ,116477 ,102479 ,152162 ,120202 ,128497 ,146702 ,120118 ,106281 ,101756 ,121481 ,103850 ,124968 ,136657 ,101930 ,100485 ,100745 ,123332 ,118683 ,114303 ,126150 ,108929 ,117494 ,114137 ,103147 ,105200 ,117524 ,102269 ,100734 ,149832 ,111684 ,138572 ,103246 ,151511 ,100734 ,102269 ,102269 ,100734 ,149832 ,111684 ,100734 ,100734 ,102269 ,111684 ,149832]:
    #     #     count += 1
    #     #     print(x['itemid'], x['rank'], x['typeid'], x['day'])
    #     #     print(x['day'])
    #     #     q.put(x['itemid'])
    #     # s.add(x['itemid'])
    # print(count)
    # print(s)
    # print(len(s))
    # for _ in range(100):
    #     print(q.get())

    # for _ in range(100):
    #     st = time.time()
    #     count = 0
    #     for _ in queryById("recommend_article", "1760110", 1000):
    #         count += 1
    #     # print(count)
    #     pt = time.time() - st
    #     if pt > 1:
    #         print(pt)

    for x in fetch("recommend_hot_video", 500):
        print(x)
        count += 1
    print(count)

    # deleteByQuery("recommend_ches")