python 查询Elasticsearch的小例子

#!/usr/bin/env python# -*- coding: utf-8 -*-

from sfo_common.agent import Agentfrom sfo_common.import_common import *

class ElkLog(object):    """    处理ELK数据类    """    def __init__(self):        pass

def get_elk_log_json(self):        """        通过调用elasticsearch接口查询指定索引数据,计算集群的平均响应时间        :return:        """        try:            day = time.strftime("%Y.%m.%d",time.localtime(time.time()))            clusters = config.elk_index_name.split(‘,‘)            if clusters:                for cluster in clusters:                    index_name="{}-swift-proxy-{}".format(cluster,day)                    req_url = ‘{}{}/_search?pretty‘.format(config.elk_server_url,index_name)                    headers = {‘content-type‘: "application/json"}                    l_time = datetime.datetime.now() + datetime.timedelta(minutes=-5)                    now_time = util.local2utc(datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S.%f‘))                    now_time_5m = util.local2utc(l_time.strftime(‘%Y-%m-%d %H:%M:%S.%f‘))                    body = {                        "query": {                            "bool":{                                "must":{                                    "match_all":{}                                },                                "filter":{                                    "range":{                                        "@timestamp":{                                            "gte":now_time_5m,                                            "lte":now_time                                        }                                    }                                }                            }                        },                        "size": 10000,                        "sort": {                            "@timestamp": { "order": "asc" }                        },                        "_source": ["status", "method","client_ip","remote_ip","timestamp","request_time","@timestamp"]                    }                    #print req_url,body,headers                    response = requests.post(req_url,data=json.dumps(body),headers=headers)                    total_time=head_total_time=get_total_time=put_total_time=post_total_time=delete_total_time = 0.0                    head_count=get_count=put_count=post_count=delete_count = 0

if response.status_code == 200:                        tps = SfoClusterTps()                        res_data = json.loads(response.text,encoding=‘UTF-8‘)                        if res_data and res_data.has_key(‘hits‘):                            hits = res_data[‘hits‘]                            total = hits[‘total‘]                            list = hits[‘hits‘]                            if list and total > 0:                                for obj in list:                                    if isinstance(obj,dict) and obj.has_key(‘_source‘):                                        source = obj[‘_source‘]                                        if source.has_key(‘request_time‘):                                            total_time += float(source[‘request_time‘])                                        if source.has_key(‘method‘) and str(source[‘method‘]).strip().upper()==‘HEAD‘:                                            head_count += 1                                            if source.has_key(‘request_time‘):                                                head_total_time += float(source[‘request_time‘])                                        if source.has_key(‘method‘) and str(source[‘method‘]).strip().upper()==‘GET‘:                                            get_count += 1                                            if source.has_key(‘request_time‘):                                                get_total_time += float(source[‘request_time‘])                                        if source.has_key(‘method‘) and str(source[‘method‘]).strip().upper()==‘PUT‘:                                            put_count += 1                                            if source.has_key(‘request_time‘):                                                put_total_time += float(source[‘request_time‘])                                        if source.has_key(‘method‘) and str(source[‘method‘]).strip().upper()==‘POST‘:                                            post_count += 1                                            if source.has_key(‘request_time‘):                                                post_total_time += float(source[‘request_time‘])                                        if source.has_key(‘method‘) and str(source[‘method‘]).strip().upper()==‘DELETE‘:                                            delete_count += 1                                            if source.has_key(‘request_time‘):                                                delete_total_time += float(source[‘request_time‘])                                tps.guid = str(uuid.uuid1())                                tps.cluster_name = cluster                                if total > 0:                                    tps.avg_time = ‘%.2f‘%(total_time/total*1000)                                else:                                    tps.avg_time = 0                                if head_count > 0:                                    tps.head_time = ‘%.2f‘%(head_total_time/head_count*1000)                                else:                                    tps.head_time = 0                                if get_count > 0:                                    tps.get_time = ‘%.2f‘%(get_total_time/get_count*1000)                                else:                                    tps.get_time = 0                                if put_count > 0:                                    tps.put_time = ‘%.2f‘%(put_total_time/put_count*1000)                                else:                                    tps.put_time = 0                                if post_count > 0:                                    tps.post_time = ‘%.2f‘%(post_total_time/post_count*1000)                                else:                                    tps.post_time = 0                                if delete_count > 0:                                    tps.delete_time = ‘%.2f‘%(delete_total_time/delete_count*1000)                                else:                                    tps.delete_time = 0                                tps.add_time = time.strftime(‘%Y-%m-%d %H:%M:%S‘, time.localtime(time.time()))                                db.session.add(tps)                                db.session.commit()                            else:                                pass                        else:                            pass                    else:                        pass        except Exception as ex:            logger.exception("get_elk_log_json function execute exception:" + str(ex))        finally:            db.session.close()            db.session.remove()

#schedule tasksdef get_elklog_json_schl(executor):    """    起线程执行日志分析    :param executor:    :return:    """    try:        el = ElkLog()        executor.submit(el.get_elk_log_json)        #threading.Thread(target=el.get_elk_log_json).start()    except Exception as ex:        logger.exception("get_elklog_json_schl function execute exception:" + str(ex))

class ElklogUnitAgnet(Agent):    def __init__(self, pidfile):        Agent.__init__(self, pidfile)

def run(self):        try:            sys.stdout.flush()            hostname = socket.getfqdn()            hostip = socket.gethostbyname(hostname)            logger.info("hostname is {}, ip is {}".format(hostname, hostip))            #use schedule            with ThreadPoolExecutor(config.thread_workers) as executor:                schedule.every(config.upload_refresh).seconds.do(get_elklog_json_schl,executor)                schedule.run_all(0)                while True:                    schedule.run_pending()                    time.sleep(0.1)        except Exception as ex:            logger.exception("elk log agent run exception:" + str(ex))

def main():    agent = ElklogUnitAgnet(config.elklog_agnet_pfile)    try:        if len(sys.argv) == 3:            if ‘elklog‘ == sys.argv[1]:                if ‘start‘ == sys.argv[2]:                    agent.start()                if ‘stop‘ == sys.argv[2]:                    agent.stop()            else:                print("Unknown command")                sys.exit(2)        else:            print("usage: %s" % (sys.argv[0],))            sys.exit(2)    except Exception as ex:        logger.exception("elk log process run exception:" + str(ex))

if __name__ == ‘__main__‘:    main()

###########################################################################################更多查询方式接口:

查询一条记录
curl -H "Content-Type: application/json" -X POST ‘http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty‘ -d ‘{"query": { "match_all": {} },"size": 1}‘
查询offset为20的记录
curl -H "Content-Type: application/json" -X POST ‘http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty‘ -d ‘{"query": { "match": { "offset": 20 } }}‘
查询结果只返回指定的字段
curl -H "Content-Type: application/json" -X POST ‘http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty‘ -d ‘{"query": { "match_all": {} },"_source": ["host", "message"]}‘
查询结果排序
curl -H "Content-Type: application/json" -X POST ‘http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty‘ -d ‘{"query": { "match_all": {} },"_source": ["offset","host", "message"]},"sort": { "offset": { "order": "desc" } }‘
返回从10开始的10条记录
curl -H "Content-Type: application/json" -X POST ‘http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty‘ -d ‘{"query": { "match_all": {} },"from": 10,"size": 10,"_source": ["offset","host", "message"]},"sort": { "offset": { "order": "desc" } }‘

全部查询接口实例请参考:

https://www.cnblogs.com/pilihaotian/p/5830754.html

原文地址:https://www.cnblogs.com/chmyee/p/9907676.html

时间: 2024-10-17 02:09:13

python 查询Elasticsearch的小例子的相关文章

一个与Linq延迟查询有关的小例子

提出问题 下面所给代码编译时正常,但是执行时会出错,请指出程序在执行时能够执行到编号为(1)(2)(3)的代码行中的哪一行. using System; using System.Collections.Generic; using System.Linq; namespace DeferredExecutionExp { class Program { static void Main(string[] args) { List<Student> studentList = new List

【python】由一个小例子看出python的灵活性,IF ELSE一例

temp = input("请输入1到100之间的数字:") num = int(temp) if 1 <= num <= 100:                         # 这一点给赞 省得用 and 或 &了 print('你妹好漂亮^_^')                    # tab键,还有":" else: print('你大爷好丑T_T')                     # tab键   还有":&

用百度地图做了一个输入地址查询经纬度的小例子

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <script src="jquery.js"></script> </head> <body> <input type="text"

?python 的zip 函数小例子

In [57]: name = ('Tome','Rick','Stephon') In [58]: age = (45,23,55) In [59]: for a,n in zip (name,age): ....: print a,n ....: Tome 45Rick 23Stephon 55 In [60]:

python try小例子

#!/usr/bin/python import telnetlib import socket try: tn=telnetlib.Telnet('10.67.21.29',60000) except socket.error, e: print e exit(1) tn.set_debuglevel(1) tn.write('quit'+'\n') print 'ok' socket.error为错误类型 e为对象 python try小例子,布布扣,bubuko.com

IbLpJnJErT分页查询的小例子

锓锩苘院 IbLpJnJErT分页查询的小例子

python速成第二篇(小爬虫+文件操作+socket网络通信小例子+oop编程)

大家好,由于前天熬夜写完第一篇博客,然后昨天又是没休息好,昨天也就不想更新博客,就只是看了会资料就早点休息了,今天补上我这两天的所学,先记录一笔.我发现有时候我看的话会比较敷衍,而如果我写出来(无论写到笔记本中还是博客中,我都有不同的感觉)就会有不同的想法,我看书或者看资料有时候感觉就是有一种惰性,得过且过的感觉,有时候一个知识想不通道不明,想了一会儿,就会找借口给自己说这个知识不重要,不需要太纠结了,还是去看下一个吧,然后就如此往复下去,学习就会有漏洞,所以这更加坚定了我写博客来记录的想法.

文本查询小例子---涉及多态 句柄类

最近实现了c++prime上的文本查询的小例子,见第四版15章最后一节.涉及到了多态,句柄类,故在此给出实现的源代码,以便后续查看. 一:系统要求: 图片无法上传---待传 二:代码 (1)queryWord.h ------------queryWorld是真正用来实现保存的数据结构及查询操作的. #ifndef QUERYWORD #define QUERYWORD #include <map> #include <set> #include <vector> #i

[Python]Python 使用 for 循环的小例子

[Python]Python 使用 for 循环的小例子: In [7]: for i in range(5): ...: print "xxxx" ...: print "yyyy" ...: xxxxyyyyxxxxyyyyxxxxyyyyxxxxyyyyxxxxyyyy