Python实现cmd命令连续执行

之前是想写一个微信控制程序,通过登录网页微信,可以直接执行命令行代码。也不用ssh登录了,想法很方便。

但是现实很残酷,微信登录这块基本没有问题,已经有大佬写好了,但是命令行执行遇到问题了。

运行cmd

开始时,使用os.popen()执行命令,但是该命令需要手动修改运行目录。此方案被我直接丢弃了。

单开进程

那么自然想到通过启动进程的方式来实现,Python有对进程的封装subprocess,可以通过创建Popen对象来实现。我只要单开一个bash,与它进行交互就好啦。

简单实现如下:

?
p = subprocess.Popen(‘/bin/bash‘, shell=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
?
while True:
    c = input()
    c += os.linesep
    p.stdin.write(c.encode(‘utf8‘))
    print(out_s.decode(‘utf8‘), end=‘‘)
然后,马上就有遇到问题了,输出流一直拿不到内容,被阻塞了。

刷新缓冲区

被阻塞有两种情况,一输入流阻塞,所以没有输出,二输出流阻塞。看到网上有的将输入流关闭就可以了:

p.stdin.close()

但是关闭后就不能再次运行命令了,通过查看其对象方法,发现可以直接刷新缓冲区,很好

p.stdin.flush()

但是发现读取到的文件只有一行,很明显,没有读完

循环读取

需要循环读取输出缓冲区的内容。

while True:
    out_s = p.stdout.readline()
    print(out_s.decode(‘utf8‘), end=‘‘)

新的问题出现了,循环怎么结束啊?当缓冲区没有内容时,readline方法会阻塞等待。

读取阻塞

很好,找了半天也没找到解决阻塞的办法。那就只能靠自己了,既然它要阻塞,那就随他阻塞好了,我单开一个线程去读取,让它一直阻塞去吧。

解决后的完整测试代码:

import subprocess
import os
import threading
?
?
p = subprocess.Popen(‘/bin/bash‘, shell=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
?
def test():
    global p
    while True:
        print(p.stdout.readline().decode(‘utf8‘), end=‘‘)
?
threading.Thread(target=test).start()
?
while True:
    c = input()
    c += os.linesep
    p.stdin.write(c.encode(‘utf8‘))
    p.stdin.flush()

很好,问题解决了,简单封装一个工具类吧。

注意:如果输入一个不存在的命令,输出内容不在stdout流中,要到stderr中获取。此方案暂时还不支持sudo命令,回头在研究研究


至此,其实还有一个小问题,我怎么能知道哪些返回是同一条命令所返回的呢?就这个微信工具来说,自然可以直接通过时间判断,若超过1s没有,则认为是一组,统一返回。感觉有些牵强,暂时没有想到更好的解决办法。

最后奉上工具链接:

<https://gitee.com/hujingnb/python_demo>

原文地址:https://www.cnblogs.com/hujingnb/p/12112670.html

时间: 2024-10-08 13:50:31

Python实现cmd命令连续执行的相关文章

python程序在命令行执行提示ModuleNotFoundError: No module named &#39;XXX&#39; 解决方法

原文链接:https://www.cnblogs.com/dreamyu/p/7889959.html 在ide中执行python程序,都已经在默认的项目路径中,所以直接执行是没有问题的.但是在cmd中执行程序,所在路径是python的搜索路径,如果涉及到import引用就会报类似ImportError: No module named xxx这样的错误,解决方法: 在报错的模块中添加: import sys import os curPath = os.path.abspath(os.path

C#中,一个cmd命令窗口执行多条dos命令(有修改,加入执行等待)

原文章标题:C# 程序一个cmd命令窗口执行多条dos命令 原文章地址:http://www.cnblogs.com/visibleisfalse/p/3578886.html 以下代码有修改,标出的红色代码,表示执行一条dos命令后,等待执行完成. public void DoDos(string comd1, string comd2, string comd3) { Process p = new Process();//创建进程对象 try { p.StartInfo.FileName

shell脚本linux命令连续执行

shell命令连续执行的三种方式: 1.命令1:命令2:命令3 依次执行命令,无论前一条命令是否执行成功. 2.命令1 && 命令2 && 命令3 前一条命令执行成功后才会执行下一条命令. 3.命令1 || 命令2 || 命令3 前一条命令执行失败才会执行下一条命令. 原文地址:https://www.cnblogs.com/smallredness/p/9241775.html

python 控制 cmd 命令行颜色

基于win7 + python3.4 import ctypes import sys '''Windows CMD命令行颜色''' # 句柄号 STD_INPUT_HANDLE = -10 STD_OUTPUT_HANDLE= -11 STD_ERROR_HANDLE = -12 # 前景色 FOREGROUND_BLACK = 0x0 # 黑 FOREGROUND_BLUE = 0x01 # 蓝 FOREGROUND_GREEN = 0x02 # 绿 FOREGROUND_RED = 0x0

测试:python调用cmd命令三种方法

目前我使用到的python中执行cmd的方式有三种 使用os.system("cmd") 该方法在调用完shell脚本后,返回一个16位的二进制数,低位为杀死所调用脚本的信号号码,高位为脚本的退出状态码,即脚本中"exit 1"的代码执行后,os.system函数返回值的高位数则是1,如果低位数是0的情况下,则函数的返回值是0×100,换算为10进制得到256. 如果我们需要获得os.system的正确返回值,那使用位移运算可以还原返回值: >>>

测试:python调用cmd命令,让push包自动化

一.python与monkey script脚本相结合,达到修改.script脚本中的内容 1 #coding:utf-8 2 ''' 3 push脚本进室内机,并运行 4 5 ''' 6 import os 7 import re 8 import subprocess 9 10 #执行脚本 11 def run_monkey(path): 12 filelist=os.listdir(path) 13 for filename in filelist: 14 filepath=os.path

python cmd命令调用

关于python调用cmd命令: 主要介绍两种方式: 1.python的OS模块. OS模块调用CMD命令有两种方式:os.popen(),os.system(). 都是用当前进程来调用. os.system是无法获取返回值的.当运行结束后接着往下面执行程序.用法如:OS.system("ipconfig"). OS.popen带返回值的,如何获取返回值.如 p=os.popen(cmd) print p.read().得到的是个字符串. 这两个都是用当前进程来调用,也就是说它们都是阻

appium+python自动化44-appium命令行模式

前言 appium desktop有个客户端版本,每次运行脚本的时候都要去双击启动才能运行,很显然不太方便,影响效率.那么有没什么办法不启动桌面程序就能运行呢,比如cmd命令行执行? 环境: appium 命令行模式版本 1.8 windows环境 npm 1.NPM是随同NodeJS一起安装的包管理工具,能解决NodeJS代码部署上的很多问题,常见的使用场景有以下几种: 允许用户从NPM服务器下载别人编写的第三方包到本地使用. 允许用户从NPM服务器下载并安装别人编写的命令行程序到本地使用.

Python利用多线程定时执行cmd命令关机

利用os模块可以执行cmd命令,利用这一点可以实现定时关机,然而在等待关机的过程中也不能啥都不干,于是多线程派上了用场. #! /usr/bin/env python #coding=utf-8 #这里需要引入三个模块 import time, os, sched, easygui, thread # 第一个参数确定任务的时间,返回从某个特定的时间到现在经历的秒数 # 第二个参数以某种人为的方式衡量时间 schedule = sched.scheduler(time.time, time.sle