python 拉取rds 审计日志

此脚本可以拉取rds 审计日志 并且插入本地数据中。

#!/usr/bin/env  python2.6
#coding=utf-8
import os
from aliyunsdkcore import client
from aliyunsdkrds.request.v20140815 import DescribeSQLLogRecordsRequest
import json
import urllib
import datetime,time
import subprocess
from   subprocess import call
import warnings
import MySQLdb
from math import ceil
from retrying import retry

dbserver="192.168.0.94"
dbuser="root"
dbpwd="[email protected]"
dbport=3306
dbname="audit"

warnings.filterwarnings("ignore")

os.environ["PATH"]="/usr/lib64/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin"

dblist = [‘rm-1111‘,‘rm-2222‘,‘rr-333‘,‘rm-444‘,‘rm-555‘]
list = {"rm-1111":"beizhu1","rm-2222":"beizhu2", "rr-333":"beizhu3","rm-444":"beizhu4"}

# mysqldb 操作类
class db_operate(object):
        def __init__(self,_hostname,_user,_pwd,_port,_db):
                self.conn=MySQLdb.connect(host=_hostname,user=_user,passwd=_pwd,port=_port,db=_db)
                self.conn.set_character_set(‘utf8‘)
                self.cur=self.conn.cursor()
        def execsql_fetchall(self,sqlcmd):
                self.cur.execute(sqlcmd)
                result=self.cur.fetchall()
                return result
        def execsql_fetchone(self,sqlcmd):
                self.cur.execute(sqlcmd)
                result=self.cur.fetchall()
                return result
        def execsql_dml(self,sqlcmd):
                self.cur.execute(sqlcmd)
        def is_connection_usable(self):
                try:
                        self.conn.ping()
                except Exception:
                        return False
                else:
                        return True
                        
                        
# 获得时间需要备份的时间范围
def getdate():
        global start_date
        global end_date
        current_time = datetime.datetime.now()
        end_date = localtrfutc(current_time)            # 当前时间
        start_date = localtrfutc(current_time - datetime.timedelta(minutes=5))     #拉取指定时间范围内的日志
        print start_date,end_date
        return 0
        
        
#本地 时间转utc
def localtrfutc(local_time):
        utc_time = local_time - datetime.timedelta(hours=8)
        return utc_time.strftime("%Y-%m-%dT%H:%M:%SZ")
        
        
@retry(wait_fixed=5000)
def geterrorlog(db_instanceid,pageNum,Flag):
        print db_instanceid,pageNum,Flag
        clt = client.AcsClient(‘1234‘,‘123456‘,‘cn-hangzhou‘)
        request = DescribeSQLLogRecordsRequest.DescribeSQLLogRecordsRequest()
        request.set_accept_format(‘json‘)
        request.set_action_name(‘DescribeSQLLogRecords‘)
        request.set_DBInstanceId(db_instanceid)
        request.set_StartTime(start_date)
        request.set_EndTime(end_date)
        request.set_PageNumber(int(pageNum))
        result = clt.do_action(request)
        s=json.loads(result)
        if Flag == 0:
                PageRecordCount = float(s["PageRecordCount"])
                TotalRecordCount = float(s["TotalRecordCount"])
                if TotalRecordCount == 0:
                        return 0
                result = int(ceil(TotalRecordCount/PageRecordCount))
        else:
                result = s[‘Items‘][‘SQLRecord‘]
        return result
                   
                        
def installlog(dbid,auditlog):
        for log in auditlog:
                TotalExecutionTimes = log["TotalExecutionTimes"]
                ExecuteTime = datetime.datetime.strptime(log["ExecuteTime"],‘%Y-%m-%dT%H:%M:%SZ‘) + datetime.timedelta(hours=8)
                AccountName = log["AccountName"]
                HostAddress = log["HostAddress"]
                ThreadID = log["ThreadID"]
                DBName = log["DBName"]
                ReturnRowCounts = log["ReturnRowCounts"]
                sqltext = ‘/*  ‘+log["SQLText"].replace("‘",‘"‘)+‘  */‘
                serverid = dbid
                servermark = unicode(list.get(dbid),"utf-8")
                sqlcmd=‘‘‘ insert into yw_business(TotalExecutionTimes,ExecuteTime,AccountName,HostAddress,ThreadID,DBName,ReturnRowCounts,SQLText,serverid,servermark) values(‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘) ‘‘‘ % (TotalExecutionTimes,ExecuteTime,AccountName,HostAddress,ThreadID,DBName,ReturnRowCounts,sqltext,serverid,servermark)
#               sqlcmd=‘‘‘ insert into yw_business(TotalExecutionTimes,ExecuteTime,AccountName,HostAddress,ThreadID,DBName,ReturnRowCounts,SQLText) values(‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘) ‘‘‘ % (TotalExecutionTimes,ExecuteTime,AccountName,HostAddress,ThreadID,DBName,ReturnRowCounts,sqltext)             
                try:
                        fo.execsql_dml(sqlcmd)
                except Exception,e:
                        print sqlcmd
                        print e
        fo.conn.commit()
        
        
# 插入其他页面的数据
def insertdata(dbid,totalpageNum):
        for pageNum in range(1,totalpageNum+1):
                result = geterrorlog(dbid,pageNum,1)
                 if result == "0":
                        continue
                installlog(dbid,result)
                
                
try:
        if __name__ == "__main__":
                getdate()
                global fo
                fo=db_operate(dbserver,dbuser,dbpwd,int(dbport),dbname)
                for i in dblist:
                        print i
                        totalpages=geterrorlog(i,1,0)
                        print totalpages
                        insertdata(i,totalpages)
                fo.conn.close()
except Exception,e:
        cmd = ‘‘‘ echo  -e  ‘审计日志插入异常‘ |  mail -s  ‘ %s  审计日志异常 ‘  [email protected] ‘‘‘ % (list.get("%s" % i)) 
        call(cmd,shell=True)
        print e
时间: 2024-10-18 11:31:35

python 拉取rds 审计日志的相关文章

Python 拉取日志文件paramiko

paramiko的另一篇博文:http://467754239.blog.51cto.com/4878013/1619166 场景: 在游戏行业的集群中,日志分析或许是必不可少的,那么为了更方便的管理日志,就是统一存放日志,然后入库数据库 #!/usr/bin/env python #coding:utf8 from multiprocessing import Process from datetime import * import paramiko import string import

mysql5.6和mariadb远程拉取二进制日志

从mysql5.6开始支持远程拉取二进制日志 mysqlbinlog --raw --stop-never --result-file=/tmp/log/ --read-from-remote-server -h 192.168.10.17 -P 3306 -u liuwei -p123456 mysql-bin.000475 选项说明: -h --host:远程主机的IP地址 -P --port:远程主机的端口 -u --user:远程数据库的用户 -p --password:远程数据库用户的

python自动拉取备份压缩包并删除3天前的旧备份

业务场景,异地机房自动拉取已备份好的tar.gz数据库压缩包,并且只保留3天内的压缩包文件,用python实现 #!/usr/bin/env python import requests,time,os,datetime,platform from threading import Thread #cd backup dir if platform.system() == 'Windows': os.chdir('D:\python\mysqlbackup_all') elif platform

PostgreSQL构建流复制拉取日志的起始位置在哪里

WaitForWALToBecomeAvailable: if (!InArchiveRecovery) currentSource = XLOG_FROM_PG_WAL; else if (currentSource == 0) currentSource = XLOG_FROM_ARCHIVE; for (;;){ int oldSource = currentSource; if (lastSourceFailed){ switch (currentSource){ case XLOG_F

使用Windows Live Writer拉取之前写的博客

因为之前写的博客有错误需要修改,但是在Windows Live Writer中找了半天也没找到怎么拉取之前的博客,在[打开本地草稿]或者[打开最近使用过的日志]中,由于存储的项数有限,所以就找不到那篇博客了,在网上百度了,也没找到解决方案,最后还是拉取到了之前写的博客了,很简单的步骤. 选中打开,然后点击其图标,接下来就是下面这个图了,就完成了.简单吧.

无比强大!Python抓取cssmoban网站的模版并下载

Python实现抓取http://www.cssmoban.com/cssthemes网站的模版并下载 实现代码 # -*- coding: utf-8 -*- import urlparse import urllib2 import re import os import os.path URL='http://www.cssmoban.com/cssthemes' #全局超时设置 urllib2.socket.setdefaulttimeout(500) #根据url获取内容 def ge

hadoop异常之 reduce拉取数据失败  (error in shuffle in fetcher)

主要错误信息:Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#43 解决办法:限制reduce的shuffle内存使用 hive:set mapreduce.reduce.shuffle.memory.limit.percent=0.1; MR:job.getConfiguration().setStrings("mapreduce.reduce.sh

用python 抓取B站视频评论,制作词云

python 作为爬虫利器,与其有很多强大的第三方库是分不开的,今天说的爬取B站的视频评论,其实重点在分析得到的评论化作嵌套的字典,在其中取出想要的内容.层层嵌套,眼花缭乱,分析时应细致!步骤分为以下几点: F12进入开发者选项进入B站你想观看的视频页面,例如我看的是咬人猫的一个视频,进入开发者选项后,向下拉取视频评论,这时评论内容才被加载出来,此刻在开发者选项中网络那里就可以看到从网站获取的很多信息,仔细查找,发现我们想要的如下图:可以看到评论区的内容,点开消息头中的请求网址(https://

rsync推送和拉取

rsync格式: # 拷贝本地文件.当SRC和DES路径信息都不包含有单个冒号":"分隔符时就启动这种工作模式.如:rsync -a /data /backup rsync [OPTION]... SRC DEST # 使用一个远程shell程序(如rsh.ssh)来实现将本地机器的内容拷贝到远程机器.当DST路径地址包含单个冒号":"分隔符时启动该模式.如:rsync -avz *.c foo:src rsync [OPTION]... SRC [US[email