定时采集bigdesk中的Elasticsearch性能参数,并保存到数据库或ELK,以便于进行长期监控。
基于python脚本实现,脚本如下:
#coding=gbk import httplibimport jsonimport timeimport es_savelogimport ConfigHelperimport MQHelper def main(): #变量初始化 #上一次统计数据 dictLastNodeInfo={} #本次统计当前节点 dictNodeInfo={} print "start..." while 1==1: flag=ConfigHelper.GetIntConfig("Flag") if flag <> 1: #判断是否满足退出条件 print "终止"+str(flag) break urlarray = ConfigHelper.GetStringConfig("EsUrl").split(‘|‘) #取出每次执行完成后的休眠时长:秒 sleeptime=ConfigHelper.GetFloatConfig("SleepTime") for urlindex in range(0,len(urlarray)): url=urlarray[urlindex] conn = httplib.HTTPConnection(url) #取出ES版本号 conn.request("GET","") serverinfo=conn.getresponse() objServerJson=json.loads(serverinfo.read()) esVersion=str(objServerJson["version"]["number"]) #取出集群健康状况 conn.request("GET","/_cluster/health") healthinfo=conn.getresponse() objHealthJson=json.loads(healthinfo.read()) health=str(objHealthJson["status"]) #取出各ES节点统计数据 conn.request("GET", "/_nodes/stats?human=true") nodesread = conn.getresponse() objNodesJson=json.loads(nodesread.read()) for i in range(0,len(objNodesJson["nodes"].values())): try: esNode=objNodesJson["nodes"].values()[i] nodename=str(esNode["name"]) dictNodeInfo["EsVersion"]=esVersion dictNodeInfo["Health"]=health #记录ES节点名称 dictNodeInfo["NodeName"]=nodename dictNodeInfo["Interval"]=sleeptime #记录CPU信息 dictNodeInfo["OSUserCpu"]=esNode["os"]["cpu"]["user"] #记录ThreadpoolCount dictNodeInfo["ThreadpoolCount"]=esNode["thread_pool"]["search"]["active"] #记录JVM堆内存 dictNodeInfo["HeapMem"]=float(esNode["jvm"]["mem"]["heap_used"].replace("gb","").replace("mb","")) curGCYoungCount=int(esNode["jvm"]["gc"]["collectors"]["young"]["collection_count"]) curGCOldCount=int(esNode["jvm"]["gc"]["collectors"]["old"]["collection_count"]) curGCYoungTime=int(esNode["jvm"]["gc"]["collectors"]["young"]["collection_time_in_millis"]) curGNCOldTime=int(esNode["jvm"]["gc"]["collectors"]["old"]["collection_time_in_millis"]) lastGCYoungCount=int(dictLastNodeInfo.get(nodename+"_GCYoungCount",-1)) lastGCOldCount=int(dictLastNodeInfo.get(nodename+"_GCOldCount",-1)) lastGCYoungTime=int(dictLastNodeInfo.get(nodename+"_GCYoungTime",-1)) lastGCOldTime=int(dictLastNodeInfo.get(nodename+"_GCOldTime",-1)) if lastGCYoungCount>=0 and lastGCOldCount>=0 and lastGCYoungTime>=0 and lastGCYoungTime>=0: dictNodeInfo["GCYoungCount"]=curGCYoungCount-lastGCYoungCount dictNodeInfo["GCOldCount"]=curGCOldCount-lastGCOldCount dictNodeInfo["GCYoungTime"]=curGCYoungTime-lastGCYoungTime dictNodeInfo["GCOldTime"]=curGNCOldTime-lastGCOldTime if lastGCOldCount>0: dictNodeInfo["GCYOCountRate"]=lastGCYoungCount/lastGCOldCount dictLastNodeInfo[nodename+"_GCYoungCount"]=curGCYoungCount dictLastNodeInfo[nodename+"_GCOldCount"]=curGCOldCount dictLastNodeInfo[nodename+"_GCYoungTime"]=curGCYoungTime dictLastNodeInfo[nodename+"_GCOldTime"]=curGNCOldTime #记录连接数信息 dictNodeInfo["ChannelTransport"]=esNode["transport"]["server_open"] dictNodeInfo["ChannelHttp"]=esNode["http"]["current_open"] #记录当前节点Indices-Query信息 objSearch=esNode["indices"]["search"] curQueryTotal=objSearch["query_total"] curFetchTotal=objSearch["fetch_total"] curTimestamp=esNode["timestamp"] lastQueryTotal=dictLastNodeInfo.get(nodename+"_QueryTotal",-1) lastFetchTotal=dictLastNodeInfo.get(nodename+"_FetchTotal",-1) lastTimestamp=dictLastNodeInfo.get(nodename+"_Timestamp",-1) if lastQueryTotal>0 and curQueryTotal>0: curQueryCount=curQueryTotal-lastQueryTotal curFetchCount=curFetchTotal-lastFetchTotal curQueryTime=(curTimestamp-lastTimestamp)/1000 dictNodeInfo["Interval"]=curQueryTime #print curQueryTotal,lastQueryTotal,curQueryCount,curTimestamp,lastTimestamp,curQueryTime,curQueryCount/curQueryTime #记录QPS if curQueryTime>0: dictNodeInfo["IndicesQueryPS"]=curQueryCount/curQueryTime dictNodeInfo["IndicesFetchPS"]=curFetchCount/curQueryTime #print curQueryCount,curQueryTime,curQueryCount/curQueryTime #更新上次节点数据对象 dictLastNodeInfo[nodename+"_QueryTotal"]=curQueryTotal dictLastNodeInfo[nodename+"_FetchTotal"]=curFetchTotal dictLastNodeInfo[nodename+"_Timestamp"]=curTimestamp #取出cache信息 dictNodeInfo["FilterCache"] = float(esNode["indices"]["filter_cache"]["memory_size"].replace("mb","").replace("kb","")) dictNodeInfo["FieldCache"] = float(esNode["indices"]["fielddata"]["memory_size"].replace("mb","").replace("kb","")) #保存数据到数据库 if(dictNodeInfo.get("IndicesQueryPS",-1) < 0 or dictNodeInfo.get("GCYoungCount",-1) < 0): continue es_savelog.SaveLog(dictNodeInfo) #推送ELK消息 dictNodeInfo["IndexName"] = "esbigdesk" dictNodeInfo["LogTime"] = time.strftime("%Y-%m-%d %H:%M:%S.000", time.localtime()) print json.dumps(dictNodeInfo) MQHelper.SendMessage(json.dumps(dictNodeInfo)) dictNodeInfo.clear() except Exception,ex: print Exception,":",ex #休眠 time.sleep(sleeptime) #启动if __name__=="__main__": main() print "over"
时间: 2024-10-13 14:16:07