# es 查询更新操作 # _*_ coding: utf-8 _*_ import time import datetime import pymysql from elasticsearch import Elasticsearch from urllib3.connectionpool import xrange # class EsClient(): es_host = "192.168.8.190" port = 9200 timeout = 15000 global index global CLIENT index = "content-2019.12.30" CLIENT = Elasticsearch(hosts=es_host,port=port,timeout=timeout) # 数据格式化 def get_page_data(result): res = [] for hit in result[‘hits‘][‘hits‘]: # print(hit) res.append({hit["_source"]["clientip"]: hit["_source"]["version"]}) print(res) return res # 数据的分页操作 def load_all_data(scroll_id): if scroll_id: page = CLIENT.scroll(scroll_id=scroll_id, scroll=‘10m‘) scroll_id = page[‘_scroll_id‘] else: page = CLIENT.search(index=index, scroll=‘10m‘, size=10000) scroll_id = page[‘_scroll_id‘] # print(page) data = page["hits"][‘hits‘] total = page[‘hits‘][‘total‘][‘value‘] print(len(data), total) return {"data": data, "scroll_id": scroll_id, "length": len(data), "total": total} # 数据的查询操作 # 默认只给10000条数据 def query(body): page = CLIENT.search(index=index, body=body) data = page["hits"][‘hits‘] total = page[‘hits‘][‘total‘][‘value‘] print(len(data), total) return {"data": data, "length": len(data), "total": total} #update:更新指定index、type、id所对应的文档 #更新的主要点: #1. 需要指定 id #2. body={"doc": <xxxx>} , 这个doc是必须的 # es.update(index="my_index",doc_type="test_type",id=1,body={"doc":{"name":"python1","addr":"深圳1"}}) # 更新一条数据 def run_update_1(): print("开始修改") body={"doc":{"site": "小武的测试站-+++++"}} CLIENT.update(index=‘content-2019.12.30‘, id=‘y-O4nG8Bpdw5Z6bISieJ‘, body=body) print("修改成功") pass # 批量更新, 跟新满足条件的数据 def run_update_all(): print("开始修改") query = { "script": {"source": "ctx._source[‘site‘]=‘小武的测试站‘" }, # 更新语句,将数据集合更改 ‘query‘: {‘match‘: {‘site‘: ‘生意地‘}} # 查询语句,将满足条件的值进行筛选出来 } CLIENT.update_by_query(index=‘content-2019.12.30‘, body=query) print("修改成功") pass if __name__ == "__main__": run_update_all()
原文地址:https://www.cnblogs.com/xiao-xue-di/p/12187531.html
时间: 2024-10-08 23:03:51