不具有通用性,留作纪念。
[[email protected] python]# cat insert_uv.py #!/usr/bin/env python # -*- coding:utf-8 -*- from datetime import * from with_conn_to_db import conn_to_mysql import urllib2,json import time ###define yestoday 0-24 hours delta part########## today = date.today() yestoday = today - timedelta(days=1) #today = today - timedelta(days=1) #print today,yestoday #import sys #sys.exit() a = str(yestoday) + ‘ ‘ + ‘00:00:00‘ b = str(today) + ‘ ‘ + ‘00:00:00‘ #print a,b timeArray1 = time.strptime(a, "%Y-%m-%d %H:%M:%S") timeArray2 = time.strptime(b, "%Y-%m-%d %H:%M:%S") start_time = int(time.mktime(timeArray1)) * 1000 end_time = int(time.mktime(timeArray2)) * 1000 #####define es index and search part######## server = ‘http://elk.xkops.com:9200/‘ #stat_index = ‘client-visit-*‘ index=‘client-*‘ #start_time = 1459146210879 #stop_time = 1459147110879 url = server + index + "/_search?pretty=true" query_date={ "query": { "filtered": { "query": { "query_string": { "analyze_wildcard": True, "query": "*" } }, "filter": { "bool": { "must": [ { "range": { "@timestamp": { "gte": start_time, "lte": end_time, "format": "epoch_millis" } } } ], "must_not": [] } } } }, "size": 0, "aggs": { "4": { "terms": { "field": "visit_tenant_id", "size": 1000000, "order": { "1": "desc" } }, "aggs": { "1": { "sum": { "field": "count" } }, "3": { "terms": { "field": "client_type", "size": 2, "order": { "1": "desc" } }, "aggs": { "1": { "sum": { "field": "count" } }, "2": { "terms": { "field": "sessionId", "size": 1000000, "order": { "1": "desc" } }, "aggs": { "1": { "sum": { "field": "count" } } } } } } } } } } query_date = json.dumps(query_date) req = urllib2.Request(url,query_date) response = urllib2.urlopen(req) page = response.read() #print page result = json.loads(page) ###避免当天多次插入,插入前先删除####### sql = "delete from uv_stat where create_time = ‘%s‘" % (yestoday) #print sql with conn_to_mysql(‘logstash‘) as db: db.execute(sql) for s in result[‘aggregations‘][‘4‘][‘buckets‘]: tenant_id = s[‘key‘] type1 = s[‘3‘][‘buckets‘][0][‘key‘] for a in s[‘3‘][‘buckets‘][0][‘2‘][‘buckets‘]: session_id = a[‘key‘] #print tenant_id,type1,session_id sql = "insert into uv_stat(tenant_id,create_time,session_id,client_type) values(‘%s‘,‘%s‘,‘%s‘,‘%s‘)" %(tenant_id,yestoday,session_id,type1) #print sql with conn_to_mysql(‘logstash‘) as db: db.execute(sql) if len(s[‘3‘][‘buckets‘]) > 1: type2 = s[‘3‘][‘buckets‘][1][‘key‘] for a in s[‘3‘][‘buckets‘][1][‘2‘][‘buckets‘]: session_id = a[‘key‘] #print tenant_id,type2,session_id sql = "insert into uv_stat(tenant_id,create_time,session_id,client_type) values(‘%s‘,‘%s‘,‘%s‘,‘%s‘)" %(tenant_id,yestoday,session_id,type2) #print sql with conn_to_mysql(‘logstash‘) as db: db.execute(sql) else: continue
时间: 2024-10-18 13:50:20