天天看点

python es 操作

# coding:utf-8
import datetime

import urllib3
from elasticsearch import Elasticsearch, exceptions
import config as conf
import re

es_client = Elasticsearch("http://%s:%s@%s:%d" % (conf.es_user, conf.es_passwd, conf.es_host, conf.es_port))


def get_date(index_name):
    """
    提取索引中的日期值
    :param index_name: 索引名称
    :return: 日期
    """
    return int(re.sub(r'\D', "", index_name))


def get_indices(n):
    """
    获取没有别名并且超出保留范围的索引列表
    :param n: 保留近n天的数据
    :return: list
    """
    index_dict = es_client.indices.get_alias()
    recommend_ches_hot = []
    recommend_ches = []
    recommend_hot_article = []
    recommend_hot_video = []
    recommend_hot_bbs = []
    recommend_article = []
    recommend_bbs = []
    result = []

    for index_name in index_dict:

        # 获取别名为空的索引
        if not index_dict[index_name]["aliases"]:

            # 首页推荐冷启动数据索引
            if index_name.startswith("recommend_ches_hot"):
                recommend_ches_hot.append(index_name)
            elif index_name.startswith("recommend_ches") and not index_name.startswith("recommend_ches_update") and not index_name.startswith("recommend_ches_vid_update"):
            # 首页推荐数据索引
                recommend_ches.append(index_name)

            # 文章推荐冷启动数据索引
            if index_name.startswith("recommend_hot_article"):
                recommend_hot_article.append(index_name)
            # 文章推荐离线数据索引
            if index_name.startswith("recommend_article"):
                recommend_article.append(index_name)

            # 社区推荐冷启动数据索引
            if index_name.startswith("recommend_hot_bbs"):
                recommend_hot_bbs.append(index_name)
            # 社区推荐离线数据索引
            if index_name.startswith("recommend_bbs_scatter"):
                recommend_bbs.append(index_name)

            # 视频推荐冷启动数据索引
            if index_name.startswith("recommend_hot_video"):
                recommend_hot_video.append(index_name)

    # 排序获取需要删除的索引
    recommend_ches_hot.sort(key=get_date, reverse=True)
    recommend_ches.sort(key=get_date, reverse=True)
    recommend_hot_article.sort(key=get_date, reverse=True)
    recommend_article.sort(key=get_date, reverse=True)
    recommend_hot_bbs.sort(key=get_date, reverse=True)
    recommend_bbs.sort(key=get_date, reverse=True)
    recommend_hot_video.sort(key=get_date, reverse=True)

    result.extend(recommend_ches_hot[n:])
    result.extend(recommend_ches[n:])
    result.extend(recommend_hot_article[n:])
    result.extend(recommend_article[n:])
    result.extend(recommend_hot_bbs[n:])
    result.extend(recommend_bbs[n:])
    result.extend(recommend_hot_video[n:])

    return result


def delete_indices(indices):
    """
    批量删除索引
    :param indices: 索引名称列表
    """
    es_client.indices.delete(",".join(indices))


def create_indices(index_name, body=None):
    """
    创建索引
    :param index_name: 索引名称
    :param body: 可以配置`settings`和`mappings`
    """
    es_client.indices.create(index_name, body)


def delete_by_day(index_name, n):
    """
    删除索引中前第n天的数据。
    :param index_name: 索引名称
    :param n: 前第n天
    """
    now_time = datetime.datetime.now()
    day = (now_time + datetime.timedelta(days=-n)).strftime("%Y%m%d")
    delete_options = {
        "query": {
            "bool": {
                "must": [{"term": {"day": day}}]
            }
        }
    }
    result = ""
    try:
        result = es_client.delete_by_query(index=index_name, body=delete_options, scroll_size=3000, slices=5, conflicts="proceed")
    # delete_by_query会抛出超时异常,但删除命令已经提交到服务端并会执行完毕,只是客户端获取不到命令执行的返回结果
    except (urllib3.exceptions.ReadTimeoutError, exceptions.ConnectionTimeout):
        print("连接超时,未获取到执行结果。")
    print(result)

def delete_log_expire():
    es_client_log = Elasticsearch("http://%s:%s@%s:%d" % (conf.es_user_log, conf.es_passwd_log, conf.es_host_log, conf.es_port_log))
    import time
    expire = time.time()
    delete_options = {
        "query": {
            "range": {
                "expire": {"lte": int(expire)}
            }
        }
    }
    result = ""
    try:
        result = es_client_log.delete_by_query(index="webapi", body=delete_options, scroll_size=3000, slices=5, conflicts="proceed")
        print(result)
        result = es_client_log.delete_by_query(index="buss", body=delete_options, scroll_size=3000, slices=5, conflicts="proceed")
        print(result)
    # delete_by_query会抛出超时异常,但删除命令已经提交到服务端并会执行完毕,只是客户端获取不到命令执行的返回结果
    except (urllib3.exceptions.ReadTimeoutError, exceptions.ConnectionTimeout):
        print("连接超时,未获取到执行结果。")



if __name__ == '__main__':

    # 保留近两天数据
    index_name_list = get_indices(2)
    if index_name_list:
        delete_indices(index_name_list)

    print("已删除索引:", index_name_list)
    #统一日志服务
    delete_log_expire()
    # 增量更新的ES数据生命周期维护
    delete_by_day("recommend_ches", 30)
    delete_by_day("recommend_ches_vid", 30)      
# coding:utf-8

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

import config as conf
import time

from HiveClient import HiveClient

es_client = Elasticsearch("http://%s:%s@%s:%d" % (conf.es_user, conf.es_passwd, conf.es_host, conf.es_port))

def fetch(index, size=10):
    """
    查询线上ES数据

    :param index: 传入想要查询的index名称,可以使用别名
             一次查询多个index时,可以传入一个包含多个index的list集合, 或者传入一个用逗号分隔多个index的字符串
             传入空串或者`_all`将查询所有
    :param size: 返回的记录条数 (默认: 10)
    :return: 迭代器
    """
    result = es_client.search(index=index, params={"size": size}, sort="rank:asc")
    for hit in result['hits']['hits']:
        yield hit['_source']


def queryById(index, uid, size=10):

    # q = '{"query":{"bool":{"must":[{"term":{"uid":"' + uid + '"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}}'
    result = es_client.search(index=index, params={"size": size}, q="uid:"+uid, sort="rank:asc")#, q="uid:"+uid
    for hit in result['hits']['hits']:
        yield hit['_source']


def saveBulk(datas, _index, _type):

    ACTIONS = []
    for data in datas:
        action = {
            "_index": _index,
            "_type": _type,
            "_id": data.pop("id"),
            "_source": data
        }
        ACTIONS.append(action)
    res, _ = bulk(es_client, ACTIONS, index=_index, raise_on_error=True)
    print(res)


def saveBulkByNum(datas, _index, _type):

    actions = []
    num = 0
    res = 0
    for data in datas:
        data["rank"]=0
        data["typeid"]=0
        action = {
            "_index": _index,
            "_type": _type,
            "_id": data.pop("id"),
            "_source": data
        }
        actions.append(action)
        num += 1
        if num % 10000 == 0:
            res, _ = bulk(es_client, actions, index=_index, raise_on_error=True)
            print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "写入了"+str(res)+"条数据", "总共写入了"+str(num)+"条")
            actions = []
    res, _ = bulk(es_client, actions, index=_index, raise_on_error=True)
    print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "写入了"+str(res)+"条数据", "总共写入了"+str(num)+"条")


def hiveToES(table_name, day, _index, _type):

    # 查询Hive数据
    hc = HiveClient(conf.hive_host, conf.hive_port, conf.hive_user)
    hql = "select * from {} where `day`='{}' limit 20".format(table_name, day)
    datas = hc.query(hql)
    # print(hql)
    saveBulkByNum(datas, _index, _type)


def deleteByQuery(index_name, uid, day):

    # 多个条件限制的删除
    delete_options = {
        "query": {
            "bool": {
                "must": [{"term": {"uid": uid}}, {"range": {"day": {"lte": day}}}]
            }
        }
    }

    result = es_client.delete_by_query(index=index_name, body=delete_options)

    print(result)


if __name__ == '__main__':

    count = 0
    # from queue import Queue
    # q = Queue()
    # s = set()
    # for x in queryById("recommend_ches", "2107547", 500): # 1760110
    #     # print(x['itemid'])
    #     print(x['itemid'], x['rank'], x['typeid'], x['day'])
    #     count += 1
    #     # if int(x['itemid']) in [102385 ,106933 ,107265 ,110662 ,110662 ,110662 ,120216 ,110662 ,120216 ,102385 ,117524 ,105200 ,103147 ,114137 ,117494 ,108929 ,126150 ,114303 ,118683 ,114137 ,103147 ,114303 ,118683 ,123332 ,100745 ,108929 ,100745 ,108929 ,117494 ,114137 ,118683 ,123332 ,100745 ,100485 ,101930 ,136657 ,124968 ,103850 ,124968 ,103850 ,121481 ,101756 ,106281 ,120118 ,146702 ,128497 ,120202 ,100195 ,119058 ,120118 ,106281 ,100195 ,119058 ,116477 ,102479 ,152162 ,120202 ,128497 ,146702 ,120118 ,106281 ,101756 ,121481 ,103850 ,124968 ,136657 ,101930 ,100485 ,100745 ,123332 ,118683 ,114303 ,126150 ,108929 ,117494 ,114137 ,103147 ,105200 ,117524 ,102269 ,100734 ,149832 ,111684 ,138572 ,103246 ,151511 ,100734 ,102269 ,102269 ,100734 ,149832 ,111684 ,100734 ,100734 ,102269 ,111684 ,149832]:
    #     #     count += 1
    #     #     print(x['itemid'], x['rank'], x['typeid'], x['day'])
    #     #     print(x['day'])
    #     #     q.put(x['itemid'])
    #     # s.add(x['itemid'])
    # print(count)
    # print(s)
    # print(len(s))
    # for _ in range(100):
    #     print(q.get())

    # for _ in range(100):
    #     st = time.time()
    #     count = 0
    #     for _ in queryById("recommend_article", "1760110", 1000):
    #         count += 1
    #     # print(count)
    #     pt = time.time() - st
    #     if pt > 1:
    #         print(pt)

    for x in fetch("recommend_hot_video", 500):
        print(x)
        count += 1
    print(count)

    # deleteByQuery("recommend_ches")