python expect 工作的使用代码

#!/usr/bin/env python
#coding=utf-8
‘‘‘
Created on 2013-8-22

@author: *****
‘‘‘
import MySQLdb
import time
import pexpect
import subprocess
import os
import socket
import urllib2
import sys
mintor_url = ‘http://**************************&msg=‘#   短息接口
News_Result_=‘News_Result_‘
tempdir= ‘datas/‘
tmpexedir= ‘exedatas/‘
reasultdir=‘redatas/‘

##########oline###########
pubid=int(14219)
dbip=‘********‘

global ipath
ipath=os.getcwd()+‘/‘
def telme(info):
    global mintor_url
    tmp_url = mintor_url+info
    socket.setdefaulttimeout(10)
    response = urllib2.urlopen(tmp_url)
    html = response.read()
    print html
    response.close()

def getlogname(hour):
    return News_Result_+str(time.strftime(‘%Y%m%d%H‘,time.localtime(time.time()-hour*60*60)))+".log"  

def dbexe(filename):
    ida =str(time.strftime(‘%Y-%m-%d‘,time.localtime(time.time()-24*60*60)))
    db = DB()
    data_list = db.getpubnews(pubid ,ida)
    newsower = db.getnewsowerinfo(ida)
    print ‘open result file:‘,filename
    file = open(filename)
    k=200
    dbresult=[];
    while 1:
        print ‘k:‘,k
        if k <=0:
            break
        lines = file.readlines(200)
        if not lines:
            break
        for line in lines:
            try:
                if k <=0:
                    break
                line = line.strip()
                lists = line.split(‘|‘)
                if(lists[0].isdigit() and lists[1].isdigit()  and newsower.count(int(lists[0]))<=0):
                    datainfo = []
                    datainfo.append(int(lists[0]))
                    datainfo.append(int(k))
                    dbresult.append(datainfo)
                    k-=1
            except Exception, ex:
                print ex
    dbresult.reverse()
    if k>100:
        print ‘data is to small,so return:‘,filename
        return
    if db.addpubnews(pubid,dbresult):
            if data_list[0] is not None:
                db.delpubnews(pubid, int(data_list[0]), int(data_list[1]))
    file.close()
    db.destory()

def checkexistsyn(name,num):
    if os.path.exists(name) == False:
        syndata(num)

def awkexe():
    log1= ipath+tempdir+getlogname(2)

    log2= ipath+tempdir+getlogname(3)
    log3= ipath+tempdir+getlogname(4)
    checkexistsyn(log1,2)
    checkexistsyn(log2,3)
    checkexistsyn(log3,4)

    reultname = ipath+reasultdir+str(time.strftime(‘%Y%m%d%H%M%S‘,time.localtime(time.time())))+‘.log‘
    cmd =‘ cat ‘+log1+‘ ‘+log2+‘ ‘+log3+‘ | awk \‘$1 !~/^NULL/\‘  | awk -F "|"  \‘{a[$1]+=$2}END{for(x in a)print x"|"a[x]}\‘  | sort -n  -r -k 2 -t "|" >> ‘+reultname
    print ‘cmd:‘,cmd
    re = subprocess.call([cmd],shell=True)
    if re==0:
        print ‘success‘
        dbexe(reultname)
    else:
        telme(‘awk-exe-fail-oa‘)

def syndata(hour):
    logn = getlogname(hour)
    command = ‘rsync -v [email protected]**.**.***:*************t/‘+logn +"  "+ipath+tempdir
    print ‘command:‘,command
    path = ipath+tempdir+logn
    foo = pexpect.spawn(command)
    index = foo.expect(["(?i)yes/no", "(?i)password", pexpect.EOF, pexpect.TIMEOUT])

    if (index == 0):
            print ‘ need---------------yes‘
            foo.sendline(‘yes‘)
            foo.expect("(?i)password")
            foo.sendline(‘*******‘)
            foo.expect(pexpect.EOF)
            foo.close()
            print ‘syndata w success‘
    elif(index == 1):
            print ‘ need---------------pass‘
            foo.sendline(‘*******‘)
            foo.expect(pexpect.EOF)
            foo.close()

            print ‘syndata q success‘
    else :
            print ‘has error‘

    return os.path.exists(path)

def gethour():
    d4= time.localtime(time.time())
    return int(d4.tm_hour)

def checkDate():

    hour = gethour()
    #if hour==9 or hour==12 or hour==17 or hour==20  or hour==0:
    #    return True

    return True

def startjob():
    k = 0
    while k < 10:
        k+=1
        if(syndata(2)):
            if checkDate():
                awkexe()
                return
            else:
                return
        else:
            time.sleep(60*5)

    telme(‘-ryn-data-error-oa-‘+str(k))

def teststartjob():
    pass

    k = 0
    while k < 1:
        k+=1
        if(syndata(2)) and (syndata(3)) and syndata(4):
          #  if checkDate():
                awkexe()
                return
        else:
            time.sleep(60*5)

    telme(‘-ryn-data-error-oa‘)

class DB(object):

    def destory(self):
        if self.cur != None:
            print ‘close db cur‘
            self.cur.close()
        if self.conn != None:
            print ‘close db conn‘
            self.conn.close()
        print ‘close db ‘

    def __init__(self):

        self.conn = MySQLdb.connect(host=dbip, user=‘***‘,passwd=‘*****‘, db=‘****‘, port=3306, charset=‘utf8‘)
        self.cur = self.conn.cursor()

    def getnewsowerinfo(self,startTime):

        sql = "select newsid from ****** where         (ownertype =3 or (ownertype=1 &&(ownerid in(1,2))) or (ownertype=2 && ownerid = 13793)) and createTime >=‘%s‘"
        print sql
        list=(startTime)
        self.cur.execute(sql%list)
        data_list=[]
        data_list.extend( self.cur.fetchall())
        result = []
        for data in data_list:
            try:
                result.append(int(data[0]))
            except Exception, eo:
                print "a date has error:",eo
        return result
    def addpubnews(self,ipubid,datas):

        idate=str(time.strftime(‘%Y-%m-%d %H:%M:%S‘,time.localtime(time.time())))

        sql = "insert into ********** (pubid, newsid,weight,newscreatetime,createtime,publishtime,createuser) values(%d,%d,%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘)"
        for data in datas:
            list=(ipubid,data[0],data[1],idate,idate,idate,‘lixuan‘)
           # print "add sql:",sql%list
            self.cur.execute(sql%list)
        #self.cur.executemany(sql,relist)
        self.conn.commit()
      return True

    def delpubnews(self,ipubid, maxid,minid):

        print ‘start dele data , maxid:%d,pubid:%d‘%(maxid,ipubid)
        sql = ‘delete from ********** where id <= %d and id >=%d and pubid = %d‘
        list=(maxid,minid,ipubid)
        self.cur.execute(sql%list)
        self.conn.commit()
       def getpubnews(self, ipubid,idate):
        #and createtime < ‘%s‘
        sql = "select max(id),min(id) ,count(1) from ***********  where pubid =%d  "
        list=(ipubid)
        self.cur.execute(sql%list)
        data_list=[]
        data_list.extend( self.cur.fetchall())
        return data_list[0];

if __name__ == ‘__main__‘:
     #global ipath
     if len(sys.argv) >1:
         ipath = sys.argv[1]

     idate=str(time.strftime(‘%Y-%m-%d %H:%M:%S‘,time.localtime(time.time())))
     print ‘start service...,‘,idate
     #teststartjob()
     startjob()
    
时间: 2024-10-04 12:39:41

python expect 工作的使用代码的相关文章

Python自学之旅 #新手#MacBook #《“笨办法”学Python》#第六章:常用的简易Python命令、符号、代码、格式化字符串

第六章:常用的简易Python命令.符号.代码.字符串 <“笨办法”学Python>这本书中,确实用了较多篇幅来介绍Python的一些常用简单的命令.符号.代码和字符串等,对于像我这样的自学新手,真的是非常棒,因为它们可以帮我建立接着学下去的信心和兴趣.但我在这个系列的博客当中,不打算写的这么精细,首先因为这不符合我写博的初衷和习惯,其次因为我不打算靠这写书来挣钱,最后因为我确实没有那个实力去挖掘简单东西中更深奥复杂的应用.所以,我写的这个博客,只适合像我这样的自学新手,如果想要成为大神,还是

python __slots__ 使你的代码更加节省内存

在默认情况下,Python的新类和旧类的实例都有一个字典来存储属性值.这对于那些没有实例属性的对象来说太浪费空间了,当需要创建大量实例的时候,这个问题变得尤为突出. 因此这种默认的做法可以通过在新式类中定义了一个__slots__属性从而得到了解决.__slots__声明中包含若干实例变量,并为每个实例预留恰好足够的空间来保存每个变量,因此没有为每个实例都创建一个字典,从而节省空间. 现在来说说python中dict为什么比list浪费内存? 和list相比,dict 查找和插入的速度极快,不会

python的工作记录A

马上进入工作自动化: [[email protected] ~]# cat svn_bbs.py import os,sys,commands,subprocess import re,time svnUrl = "svn://xxx" svnExportCmdPre = "svn export svn://xxx/" sitePath = "/xxx" updateFolder = "/srv/salt/xxx/" salt

深入一步探索Python计算身份证校验位的代码写法【map -&gt; imap】

根据身份证末位的校验位计算方法,写出了python版的. 根据别人写的一个修改得到: 用到map, zip, sum #!/usr/bin/env python def check_bit( string ): s = map( int, string ) a = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2] b = sum( map( lambda x: x[0]*x[1], zip(a, s) ) ) c = b % 11 d =

Python连接MySQL的实例代码

Python连接MySQL的实例代码 MySQLdb下载地址:http://sourceforge.net/projects/mysql-python/ 下载解压缩后放到%Python_HOME%/Lib/site-packages目录中,python会自动找到此包. MySQLdb基本上是MySQL C API的Python版,遵循Python Database API Specification v2.0. 其他: 1. 平台及版本 linux 内核2.6,gcc 3.4.4,glibc 2

python 遍历文件夹文件代码

import os def tree(top): for path, names, fnames in os.walk(top): for fname in fnames: yield os.path.join(path, fname) for name in tree('C:\Users\XXX\Downloads\Test'): print name python 遍历文件夹文件代码

用python做自动化测试--对Java代码做单元测试 (1)

大多数时间我们说的python,指的是C实现的python, 在这篇文章里,我们要说的是java实现的python,她的名字叫Jython, 大家可以到到官方网站http://www.jython.org/ 看看,最近2年很活跃,发布新版本比较多,其实我在jython 2.1版本的时候就开始接触过,但当时看到社区不活跃,没继续学习应用下去.最近公司有几个项目需要对外发布java 实现的API, java毕竟没那么熟悉,写起API接口测试的代码来速度还是没那么快, 还是高大上的python来的快,

python批量同步web服务器代码核心程序

#!/usr/bin/env python #coding:utf8 import os,sys import md5,tab from mysql_co.my_db import set_mysql from ssh_co.ssh_connect import sshd from ssh_co.cfg.config import ssh_message,item_path from file import findfile def my_mysql(): db_file={} my_conne

用python做自动化测试--对Java代码做单元测试 (2)-导入第三方jar包

用Jython对Java做单元测试,当然是为了测试公司开发的java代码,这样就涉及到导入第三包的问题,怎么导入第三方包? 可以利用http://blog.csdn.net/powerccna/article/details/37739207 这里的实现的函数,扫描指定目录下的jar包,然后通过sys.path.append()函数加入到jython的sys.path里面. for jar_file in scan_files("/home/jim/java_jar",postfix=