1 | ``` |
python-list排序,set的使用
list排序,和打印
1 | l = [2, 4, 3, 5] |
set的使用
1 | # 打印 |
1 | foo = "t1m3a3pp4mt" |
python-matplotlib多曲线
- matplotlib 多个曲线的练习
1 | import matplotlib.pyplot as pl |
- 结果
python-pytest全面解析
运行pytest的三种方式
1 | import pytest |
方式一
1 | pytest.main(["-s", "pt_test1.py"]) |
方式二
在pycharm中新建pytest
点击运行即可
使用命令执行
1
2
3
4
5
6
7
8
9D:\project\ut>pytest pt_test1.py
======================================================================= test session starts ========================================================================
platform win32 -- Python 3.6.6, pytest-5.4.3, py-1.9.0, pluggy-0.13.1
rootdir: D:\project\ut
collected 2 items
pt_test1.py .. [100%]
======================================================================== 2 passed in 0.02s =========================================================================
测试制定的函数
::
制定测试函数1
2
3pytest pt_test1.py::test_add
pytest.main(["pt_test1.py::test_add"])-k
模糊搜索,模糊搜索add的测试函数1
2pytest -k add pt_test1.py
pytest.main(["-s", "pt_test1.py", "-k", "add"])- 使用
pytest.mark
标注后,使用-m
参数1
2
3
4
5@pytest.mark.finished
def test_add():
print("测试函数:test_add")
pytest -m finished pt_test1.py - 一个函数可以打多个标记;多个函数也可以打相同的标记,运行逻辑:
pytest -m "finished and commit"
跳过测试
pytest.mark.skip
1
2
3
4
5
6# test_skip.py
@pytest.mark.skip(reason='out-of-date api')
def test_connect():
pass
pytest tests/test-function/test_skip.pypytest.mark.skipif
为测试函数指定被忽略的条件1
2
3
4
5
6@pytest.mark.skipif(conn.__version__ < '0.2.0',
reason='not supported until v0.2.0')
def test_api():
pass
pytest tests/test-function/test_skip.py
参数化
- 密码长度的测试函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18# test_parametrize.py
@pytest.mark.parametrize('passwd',
['123456',
'abcdefdfs',
'as52345fasdf4'])
def test_passwd_length(passwd):
assert len(passwd) >= 8
$ pytest tests/test-function/test_parametrize.py
============================= test session starts =============================
platform win32 -- Python 3.6.4, pytest-3.6.1, py-1.5.2, pluggy-0.6.0
rootdir: F:\self-repo\learning-pytest, inifile:
collected 3 items
tests\test-function\test_parametrize.py F.. [100%]
================================== FAILURES =================================== - 再看一个多参数的例子,用于校验用户密码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26# test_parametrize.py
@pytest.mark.parametrize('user, passwd',
[('jack', 'abcdefgh'),
('tom', 'a123456a')])
def test_passwd_md5(user, passwd):
db = {
'jack': 'e8dc4081b13434b45189a720b77b6818',
'tom': '1702a132e769a623c1adb78353fc9503'
}
import hashlib
assert hashlib.md5(passwd.encode()).hexdigest() == db[user]
$ pytest -v tests/test-function/test_parametrize.py::test_passwd_md5_id
============================= test session starts =============================
platform win32 -- Python 3.6.4, pytest-3.6.1, py-1.5.2, pluggy-0.6.0 -- c:\anaconda3\python.exe
cachedir: .pytest_cache
rootdir: F:\self-repo\learning-pytest, inifile:
collected 2 items
tests/test-function/test_parametrize.py::test_passwd_md5_id[User<Jack>] PASSED [ 50%]
tests/test-function/test_parametrize.py::test_passwd_md5_id[User<Tom>] PASSED [100%]
========================== 2 passed in 0.07 seconds ===========================
固件
- 固件(
Fixture
)是一些函数,pytest
会在执行测试函数之前(或之后)加载运行它们
-Pytest
使用pytest.fixture()
定义固件,下面是最简单的固件,只返回北京邮编1
2
3
4
5
6
7
8
@pytest.fixture()
def postcode():
return '010'
def test_postcode(postcode):
assert postcode == '010'
预处理和后处理
- 很多时候需要在测试前进行预处理(如新建数据库连接),并在测试完成进行清理(关闭数据库连接)。
Pytest
使用yield
关键词将固件分为两部分,yield 之前的代码属于预处理,会在测试前执行;yield 之后的代码属于后处理,将在测试完成后执行
1 | # test_db.py |
作用域
在定义固件时,通过 scope
参数声明作用域,可选项有:
function
: 函数级,每个测试函数都会执行一次固件;默认的作用域为 functionclass
: 类级别,每个测试类执行一次,所有方法都可以使用;module
: 模块级,每个模块执行一次,模块内函数和方法都可使用;session
: 会话级,一次测试只执行一次,所有被找到的函数和方法都可用。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18@pytest.fixture(scope='function')
def func_scope():
pass
@pytest.fixture(scope='module')
def mod_scope():
pass
@pytest.fixture(scope='session')
def sess_scope():
pass
@pytest.fixture(scope='class')
def class_scope():
pass- 对于类使用作用域,需要使用 pytest.mark.usefixtures (对函数和方法也适用):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21# test_scope.py
@pytest.mark.usefixtures('class_scope')
class TestClassScope:
def test_1(self):
pass
def test_2(self):
pass
$ pytest --setup-show tests/fixture/test_scope.py::TestClassScope
============================= test session starts =============================
platform win32 -- Python 3.6.4, pytest-3.6.1, py-1.5.2, pluggy-0.6.0
rootdir: F:\self-repo\learning-pytest, inifile:
collected 2 items
tests\fixture\test_scope.py
SETUP C class_scope
tests/fixture/test_scope.py::TestClassScope::()::test_1 (fixtures used: class_scope).
tests/fixture/test_scope.py::TestClassScope::()::test_2 (fixtures used: class_scope).
TEARDOWN C class_scope
使用命令行在pytest中传递多个参数
- 配置conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13# conftest.py
import pytest
def pytest_addoption(parser):
parser.addoption("--input1", action="store", default="default input1")
parser.addoption("--input2", action="store", default="default input2")
@pytest.fixture
def input1(request):
return request.config.getoption("--input1")
@pytest.fixture
def input2(request):
return request.config.getoption("--input2") - 编写测函数
1
2
3
4
5
6
7# test.py
import pytest
@pytest.mark.unit
def test_print_name(input1, input2):
print ("Displaying input1: %s" % input1)
print("Displaying input2: %s" % input2) - 执行命令
1
2
3
4
5
6
7
8
9
10
11>py.test -s test.py --input1 tt --input2 12
================================================= test session starts =================================================
platform win32 -- Python 3.7.0, pytest-4.1.1, py-1.7.0, pluggy-0.8.1
rootdir: pytest, inifile:
collected 1 item
test.py Displaying input1: tt
Displaying input2: 12
.
============================================== 1 passed in 0.04 seconds ====================================
其他的一些参数总结
-v, --verbose
详细结果
--q, --quiet
极简结果显示,简化控制台的输出,可以看出输出信息和之前不添加-q不信息不一样, 下图中有两个..点代替了pass结果
--s
输入我们用例中的调式信息,比如print的打印信息等,我们在用例中加上一句 print(driver.title),我们再运行一下我们的用例看看,调试信息输出
--V
可以输出用例更加详细的执行信息,比如用例所在的文件及用例名称等--junit-xml=path
输出xml文件格式,在与jenkins做集成时使用--result-log=path
将最后的结果保存到本地文件中
本文来自
兼容unittest中的setup
- 使用
setup
,setup_cass
,teardown_class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27class TestCase():
def setup(self):
print("setup: 每个用例开始前执行")
def teardown(self):
print("teardown: 每个用例结束后执行")
def setup_class(self):
print("setup_class:所有用例执行之前")
def teardown_class(self):
print("teardown_class:所有用例执行之前")
def setup_method(self):
print("setup_method: 每个用例开始前执行")
def teardown_method(self):
print("teardown_method: 每个用例结束后执行")
def test_one(self):
print("正在执行----test_one")
x = "this"
assert 'h' in x
def test_three(self):
print("正在执行test_two")
a = "hello"
b = "hello word"
assert a in b
def add(self,a, b):
print("这是加减法")
return a + b
if __name__ == '__main__':
pytest.main(['-s', 'test_fixt_class']) - 来自https://www.cnblogs.com/tallme/p/11369791.html
conftest
- 多用例的数据可以共,比如selenium中的driver的
- conftest.py文件名字不能更改
- conftest.py与运行的用例要在同一个pakage下,并且有__init__.py文件
- 如下图,我可以放到和用例testcase同级文件夹目录,也可以放到testcase文件夹下面的用例目录
- 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30# conftest.py
import pytest
from selenium import webdriver
import os
@pytest.fixture()
def driver():
PATH = lambda p: os.path.abspath(
os.path.join(os.path.dirname(__file__), p)
)
driver_path = PATH("../exe/chromedriver.exe")
driver = webdriver.Chrome(driver_path)
driver.get('https://www.baidu.com')
driver.maximize_window()
# 返回数据
yield driver
# 实现用例后置
driver.quit()
import pytest
# test_case001.pu
class TestClassName:
@pytest.mark.usefixtures("driver")
def test_func_name(self, driver):
driver.find_element_by_id("kw").click() - 执行:
pytest -s testcase/
共享session
在selenium中,可以实现driver共享,不用每个用例都重新打开和关闭浏览器
1 | # conftest.py |
批量执行z
1 | pytest -s TestCase\ |
单例模式
单例是一种设计模式,应用该模式的类只会生成一个实例,单例模式保证了在程序的不同位置都可以且仅可以取到同一个对象实例:如果实例不存在,会创建一个实例;如果已存在就会返回这个实例
单例函数
1 | # coding:utf-8 |
- selenium 的driver只有一个单例实例:
GetSeleniumDriver.py
1 | # -*- coding:utf-8 -*- |
- 编写用例
1 | # test_3.py |
1 | # test_4.py |
- 代码结构
html美化
在conftest.py
中的优化的代码
1 | import time |
- 编写用例,用例下面的注释:测试百度作为报告中的标题
1 | @pytest.mark.finished |
- 执行结果
1 | pytest -m finished webAuto\testcase --html=report.html --self-contained-html --capture=sys |
python-schematics的使用
- schematics代替model层的功能
- schematics自动的字段有验证功能
1 | from schematics.models import Model |
- 列表嵌套字典
- 你也可以嵌套其他的model层
1 | from schematics.models import Model |
- 字典
1 | from schematics.models import Model |
更多schematics高级的用法
- jsonschema 是验证json的正确性
1 | from jsonschema import validate |
python-unittest全面解析
1 | import unittest |
执行结果
1 | setUpClass_整个测试开始后执行,只执行一次 |
测试分析
结果分析
setUp初始化的值可以改变
1 | setUp_name_hi |
setUp
中的self.name
的值,在测试函数中是可以被改变的,一开始name
为hi,后面为test_add
setUpClass的的值无法改变
1 | 在def setUpClass赋值 |
setUpClass
初始化的值在函数中无法改变,应该是cls
和self
的指向不一样
setUp和setUpClass
- 每个用例都要独立,也就是每个用执行时,都会重开设备,就使用
setup
,比如appium
中的driver - 若想复用某个初始化条件,单不期望每个用例都重启再打开,可以使用
setUpClass
- 在使用
appium
时,用setUp
频繁初始化driver
造成执行用例时间太长,可以使用setUpClass
初始化driver,结合launch-app达到每个用例不重启,同时解决依赖关系的问题
- 在使用
不同的运行unittest的方式
方式一
1 | unittest.main() |
方式二
1 | # 构造测试集 |
方式三
1 | # TestLoader 用来加载TestCase到TestSuite中的,其中有几个 loadTestsFrom__()方法,就是从各个地方寻找TestCase,创建它们的实例,然后add到TestSuite中,再返回一个TestSuite实例 |
方式四
1 | # 进入到测试目录 |
方式五,直接到终端中输入命令
1 | # 测试某个类下的所有函数 |
其他扩展
unittest的条件装饰器的使用
1 | class MyTestCase(unittest.TestCase): |
@unittest.skip(reason)
跳过被此装饰器装饰的测试。 reason 为测试被跳过的原因。@unittest.skipIf(condition, reason)
当 condition 为真时,跳过被装饰的测试。@unittest.skipUnless(condition, reason)
跳过被装饰的测试,除非 condition 为真。
@unittest.expectedFailure
把测试标记为预计失败。如果测试不通过,会被认为测试成功;如果测试通过了,则被认为是测试失败。
exception unittest.SkipTest(reason)
引发此异常以跳过一个测试。
参数化
- 在使用
unittest
时,希望testcase
传参给unittest
,下面时用appium
的一个伪代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63class ParametrizedTestCase(unittest.TestCase):
""" TestCase classes that want to be parametrized should
inherit from this class.
"""
def __init__(self, methodName='runTest', param=None):
super(ParametrizedTestCase, self).__init__(methodName)
# 得到testcase传来的参数
global devicess
devicess = param
@classmethod
def setUpClass(cls):
pass
cls.driver = get_driver(devicess)
cls.logTest = myLog().getLog(cls.devicesName) # 每个设备实例化一个日志记录器
def setUp(self):
pass
@classmethod
def tearDownClass(cls):
cls.driver.close_app()
cls.driver.quit()
pass
def tearDown(self):
pass
@staticmethod
def parametrize(testcase_klass, param=None):
testloader = unittest.TestLoader()
testnames = testloader.getTestCaseNames(testcase_klass)
suite = unittest.TestSuite()
for name in testnames:
suite.addTest(testcase_klass(name, param=param))
return suite
class HomeTest(ParametrizedTestCase):
def testFirstOpen(self):
app = {"logTest": self.logTest, "driver": self.driver, "path": PATH("../yamls/home/firstOpen.yaml"),
"device": self.devicesName, "caseName": sys._getframe().f_code.co_name}
page = FirstOpenPage(app)
page.operate()
page.checkPoint()
@classmethod
def setUpClass(cls):
super(HomeTest, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(HomeTest, cls).tearDownClass()
if __name__ == '__main__':
devices = {"设备信息"}
suite = unittest.TestSuite()
suite.addTest(ParametrizedTestCase.parametrize(HomeTest, param=devices))
unittest.TextTestRunner(verbosity=2).run(suite)
ddt
pip install ddt
使用json
- 新建文件 test_data_list.json:
1
2
3
4[
"Hello",
"Goodbye"
] - 新建文件 test_data_dict.json:
1
2
3
4{
"unsorted_list": [ 10, 12, 15 ],
"sorted_list": [ 15, 12, 50 ]
} - 实例代码
1 | import unittest |
使用yaml文件
1 | import unittest |
python-多线程知识全面解析
非阻塞启动线程
1 | import threading |
- 得到值如下,线程启动函数后,非阻塞执行
1
2
3
4
5
6start thread
start....
111
222
end thread
end...
多线程并发处理
1 | import threading |
- 打印结果:每次运行三个线程,每个线程循环打印10次
1
2
3
4
5
6
7
8
9start run
start run
start run
0
0
...
thread_id=1:name=Thread-1
所有线程执行完毕
总循环次数为:30 - 多线程共享资源,可以使用全局变量
global
多线程加锁
- 对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间
1 | # -*- coding: utf-8 -*- |
- 打印结果是每次只能运行一个线程
1 | start |
多线程与队列
- 我们经常会遇到这样的一个问题,这里有成千上万条数据,每次需要取出其中的一条数据进行处理,那么引入多线程该怎么进行任务分配?
- 我们可以将数据进行分割然后交给多个线程去跑,可是这并不是一个明智的做法。在这里我们可以使用队列与线程相结合的方式进行任务分配。
- 队列线程的思想: 首先创建一个全局共享的队列,队列中只存在有限个元素,并将所有的数据逐条加入到队列中,并调用队列的join函数进行等待。之后便可以开启若干线程,线程的任务就是不断的从队列中取数据进行处理就可以了。
1 | import threading |
- 这里必须要在判断
q.empty()
前加上线程锁,因为可能会出现这样的一种情况。 - 某一时刻,队列中还有一个元素,该元素正在被线程A取出,而与此同时线程B正在判断队列q是否为空,而此时线程B中队列q不为空进入后面的操作,但是待B去取元素时,最后一个元素已经被A取出,造成线程等待,显示出被挂起的状态。
- 我们也可以通过加入
q.get(timeout=10)
超时操作来弥补这一问题。 - 打印的结果
1
2
3
4
5
6
7[<myThread(Thread-0, started 6568)>, <myThread(Thread-1, started 7724)>, <myThread(Thread-2, started 7796)>]
Thread-1:Sat Aug 22 11:36:29 2020:Queue:0
Thread-1:Sat Aug 22 11:36:29 2020:Queue:1
...
Thread-1:Sat Aug 22 11:36:30 2020:Queue:9998
Thread-1:Sat Aug 22 11:36:30 2020:Queue:9999
Exiting Main Thread
ThreadPoolExecutor线程池的使用
- 锁依然可以运用到线程池
map
的使用,接受一个List的数据,会循环调用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19from concurrent.futures.thread import ThreadPoolExecutor
import time
num = 0
def print_time(data):
global num
num += 1
time.sleep(2)
print("start_%s" % data)
print("============")
data = []
for i in range(50):
data.append(i)
with ThreadPoolExecutor(10) as pool:
result = pool.map(print_time, data)
# 等待所有线程执行完毕
for i in result:
pass
print("循环次数=%s" % num)- 打印结果为:每次启动10个线程,启动了5次
1
2
3
4
5
6============
start_46
start_49
============
============
循环次数=50 submit
接受list的数据,也可以接受字典1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19rom concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures import as_completed
import time
def print_time(data):
time.sleep(2)
print("start_%s" % data)
print("============")
data = []
for i in range(50):
data.append(i)
with ThreadPoolExecutor(10) as executor:
future_list = []
for i in range(10):
# future = executor.submit(print_time,data)
future = executor.submit(print_time, {"name": 111, "id": 222})
future_list.append(future)
for res in as_completed(future_list): # 这个futrure_list是你future对象的列表
print(res.result())
参考
python-文件,文件夹操作全面解析
读写文件
1 | with open('data1.txt', mode='w', encoding="utf-8") as f: |
- 打开多个文件
1
2
3with open("file_test","r",encoding="utf-8") as f, \
open("file_test2","r",encoding="utf-8") as f2:
...
flush方法
1 | f = open("file_test","w",encoding="utf-8") |
- 一般的文件流操作都包含缓冲机制,write方法并不直接将数据写入文件,而是先写入内存中特定的缓冲区。
- flush方法是用来刷新缓冲区的,即将缓冲区中的数据立刻写入文件,同时清空缓冲区。
- 正常情况下缓冲区满时,操作系统会自动将缓冲数据写入到文件中。
- 至于close方法,原理是内部先调用flush方法来刷新缓冲区,再执行关闭操作,这样即使缓冲区数据未满也能保证数据的完整性。
- 如果进程意外退出或正常退出时而未执行文件的close方法,缓冲区中的内容将会丢失。
- 但有时你需要在关闭前刷新到硬盘中,这时就可以使用 flush() 方法
操作文件夹
- 判断路径或文件
1 | os.path.isabs(...) # 判断是否绝对路径 |
- 工作目录及创建文件夹操作
1
2
3
4
5os.getcwd() # 获取当前工作目录 等同于os.path.dirname(__file__)
os.chdir(".\\test_mkdir") # 改变工作目录,. 表示本级目录; .. 表示上级目录
os.listdir(...) # 列出目录下的文件
os.mkdir(...) # 创建单个目录
os.makedirs(...) # 创建多级目 - 删除文件夹、文件
1
2
3os.rmdir(...) # 删除空文件夹
os.remove(...) # 删除单一文件
shutil.rmtree(...) # 删除文件夹及其下所有文 - 重命名文件夹/文件
1
os.rename(oldfileName, newFilename)
- 复制文件/文件夹
1
2
3
4shutil.copyfile("old","new") # 复制文件,都只能是文件
shutil.copytree("old","new") # 复制文件夹,都只能是目录,且new必须不存在
shutil.copy("old","new") # 复制文件/文件夹。经过测试,直接复制文件夹,报权限问题;直接复制文件正常
shutil.move("old","new") # 移动文件/文件夹至 new 文件夹中
实例
进入到相对文件目录
1 | # os.path.dirname(__file__) 表示得到当前目录路径等同于os.getcwd() |
引用公共模块
- 在不同的项目中,需要引用到当前项目的公用方法
1
2
3
4
5
6
7# 得到当前文件绝对路径
abs_path = os.path.abspath(os.path.dirname(__file__))
# 得到公共用例目录,采用切割项目名的方法
common_path = os.path.join(abs_path.split("airtest_auto")[0], "airtest_auto", "util")
# 导入到公用目录
sys.path.append(common_path)
from app_util import *
得到目录下的所有文件和文件夹
1 | for root, dirs, files in os.walk("t3", topdown=False): |
得到文件夹下的某些文件
1 | for i in os.listdir("t1"): |
得到文件夹下的修改时间排序的文件
1 | # 得到父目录的绝对路径 |
参考如下
python-执行外部命令的几种方式
os.system
- 执行的时候程序会打出cmd在linux上执行的信息
- 执行命令成功返回为0,否则为1
- 如果想获取在cmd输出的内容,是没办法获到的
1
2
3
4t2 = os.system("1adb devices")
t3 = os.system("adb devices")
print(t2) # 打印为1
print(t3) # 打印为0
subprocess.call
- 取代
os.system
,但是也是无法获取cmd输出的内容
1 | import subprocess |
os.popen
- popen返回的是一个file对象,跟open打开文件一样操作了,r是以读的方式打开
1 | output = os.popen('adb devices') |
subprocess.Popen
subprocess
模块代替os.system
、os.popen
,能够得到命令输出的值
shell
参数
- 在
linux
下,当shell=False
(默认)时,Popen
使用os.execvp()
来执行子程序。args
一般要是一个【列表】。如果args
是个字符串的
话,会被当做是可执行文件的路径,这样就不能传入任何参数了。1
2
3subprocess.Popen("cat test.txt", shell=True)
这是因为它相当于
subprocess.Popen(["/bin/sh", "-c", "cat test.txt"])
stdin
stdout
stderr
参数
- 分别表示程序的标准输入、输出、错误句柄。他们可以是
PIPE
,文件描述符或文件对象,也可以设置为None
表示从父进程继承 - 执行结果使用管道输出的实例:
1 | pipe=subprocess.Popen("adb devices",shell=True,stdout=subprocess.PIPE).stdout |
执行结果保存在文件实例
1
2
3
4cmd = "adb shell ls /sdcard/ | findstr aa.png"
fhandle = open(r"e:\aa.txt", "w")
pipe = subprocess.Popen(cmd, shell=True, stdout=fhandle).stdout
fhandle.close()子进程的文本流控制
1
2
3
4
5
6
7
8#!/usr/bin/env python
import subprocess
child1 = subprocess.Popen(["ls","-l"], stdout=subprocess.PIPE)
child2 = subprocess.Popen(["wc"], stdin=child1.stdout,stdout=subprocess.PIPE)
out = child2.communicate()
print outchild1.stdout–>subprocess.PIPE
child2.stdin<–subprocess.PIPE
child2.stdout–>subprocess.PIPE
相当于将child1.stdout–>child2.stdin->child2.stdout->subprocess.PIPE
subprocess.PIPE实际上为文本流提供一个缓存区。child1的stdout将文本输出到缓存区,随后child2的stdin从该PIPE中将文本读取走。child2的输出文本也被存放在PIPE中,直到
communicate()
方法从PIPE中读取出PIPE中的文本。要注意的是,
communicate()
是Popen对象的一个方法,该方法会阻塞父进程,直到子进程完成。我们还可以利用communicate()方法来使用PIPE给子进程输入:
1
2
3import subprocess
child = subprocess.Popen(["cat"], stdin=subprocess.PIPE)
child.communicate("vamei") //()不为空,则写入subprocess.PIPE,为空,则从subprocess.PIPE读取subprocess.PIPE–>child.stdin
commiuncate相当于写入subprocess.PIPE,然后child从subprocess.PIPE读取
利用python的subprocess模块执行外部命令, 并捕获stdout, stderr的输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import subprocess
# print ’popen3:’
def external_cmd(cmd, msg_in=''):
try:
proc = subprocess.Popen(cmd,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout_value, stderr_value = proc.communicate(msg_in)
return stdout_value, stderr_value
except ValueError as err:
# log("ValueError: %s" % err)
return None, None
except IOError as err:
# log("IOError: %s" % err)
return None, None
if __name__ == '__main__':
stdout_val, stderr_val = external_cmd('dir')
print ('Standard Output: %s' % stdout_val)
print ('Standard Error: %s' % stderr_val)得到命令的返回值,wait
1
2p=subprocess.Popen("dir", shell=True)
p.wait()但是Popen函数有一个缺陷,就是它是一个阻塞的方法。如果运行cmd时产生的内容非常多,函数非常容易阻塞住。解决办法是不使用wait()方法,但是也不能获得执行的返回值了。
commands.getstatusoutput
- 不推荐使用,因为在python3中被废弃
python-读写excel
1 | import sys |