0%

局限性。

  • Monkey测试使用的事件流数据流是随机的,不能进行自定义。
  • 可对MonkeyTest的对象,事件数量,类型,频率等进行设置。

Monkey的基本用法

  • 基本语法如下:
    • $ adb shell monkey [options]
    • 如果不指定options,Monkey将以无反馈模式启动,并把事件任意发送到安装在目标环境中的全部包。下面是一个更为典型的命令行示例,它启动指定的应用程序,并向其发送500个伪随机事件:
    • $ adb shell monkey -p your.package.name -v 500

分析日志

  • 通过Android trace文件分析死锁ANR实例过程
  • system/build.prop 日志文件主要记录手机系统信息,如版本,型号,品牌
  • adb logcat 导出日志文件

monkey.ini 配置文件

1
2
3
4
5
6
7
8
cmd=adb shell monkey -p com.dgm.user --throttle 500 --ignore-timeouts --ignore-crashes   --monitor-native-crashes -v -v
package_name=com.dgm.user
logdir=d:\android
remote_path=d:\android_server
phone_msg_log=d:\android_temp\phone.txt
sum = 100 -
activity = com.dgm.user.SplashActivity
exceptions=['NullPointer','IllegalState','IllegalArgument','ArrayIndexOutOfBounds','RuntimeException','SecurityException']
  • throttle 每次事件等待500毫秒
  • sum 定义随机事件数
  • exceptions 异常定义,用于后面扩展

结果生成为可视化图片 使用的是matplotlib

image-20210930181002746

  • 当然可以看日志文件

代码分析

  • 获得cpu-men
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# -*- coding: utf-8 -*-
import subprocess
pkg_name = "com.dgm.user"
cpu = []
men = []
def top_cpu(pkg_name):
cmd = "adb shell dumpsys cpuinfo | grep " + pkg_name
temp = []
# cmd = "adb shell top -n %s -s cpu | grep %s$" %(str(times), pkg_name)
top_info = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.readlines()
for info in top_info:
temp.append(info.split()[2].decode()) # bytes转换为string
# print("cpu占用:%s" %cpu)
for i in temp:
if i != "0%":
cpu.append(i.split("%")[0])
return cpu

def get_men(pkg_name):
cmd = "adb shell dumpsys meminfo %s" %(pkg_name)
print(cmd)
temp = []
m = []
men_s = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.readlines()
for info in men_s:
temp.append(info.split())
# print("内存占用:%s" %men[19][1].decode()+"K")
m.append(temp)
for t in m:
men.append(t[19][1].decode())
return men
  • 入口代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import monkeyConfig
from adb_common import AndroidDebugBridge as ai
import matplotlibBase as mt
import MenCpu as m
import datetime as dt
CPU = [[],[]] # time,使用情况
MEN = [[],[]] #当前时间,和内存使用情况
# 得到手机信息
def getPhoneMsg(cmd_log):
l_list = []
f = open(cmd_log, "r")
lines = f.readlines()
for line in lines:
line = line.split('=')
#Android 系统,如anroid 4.0
if (line[0] == 'ro.build.version.release'):
l_list.append(line[1])
#手机名字
if (line[0]=='ro.product.model'):
l_list.append(line[1])
#手机品牌
if (line[0]=='ro.product.brand'):
l_list.append(line[1])
f.close()
return l_list

#开始脚本测试
def start_monkey(cmd, logdir, now1, logcatname):
print(cmd)
os.popen(cmd)
# os.kill()
#print"使用Logcat导出日志"
cmd2 = "adb logcat -d >%s" % logcatname
os.popen(cmd2)
#print"导出traces文件"
tracesname = logdir + "\\" + now1 + r"traces.log"
cmd3 = "adb shell cat /data/anr/traces.txt>%s" % tracesname
os.popen(cmd3)

if __name__ == '__main__':
ini_file = 'monkey.ini'
if os.path.isfile(ini_file):
if ai().attached_devices():
mc = monkeyConfig.baseReadnin(ini_file)
ai().open_app(mc.get_package_name(), mc.get_activity())
os.system('adb shell cat /system/build.prop >'+mc.get_phone_msg_log()) #存放的手机信息
ll_list = getPhoneMsg(mc.get_phone_msg_log())
# monkey开始测试
sum = mc.get_sum()
temp = ""
monkeylog = ""
start_monkey(mc.get_cmd(), mc.get_logdir(), mc.get_now(), mc.get_logcatname())
for i in range(sum):
time.sleep(1)
print(i)
dn = dt.datetime.now()
CPU[0].append(dn)
m.top_cpu(mc.get_package_name())
MEN[0].append(dn)
m.get_men(mc.get_package_name())
monkeylog = open(mc.get_logdir() + "\\" + mc.get_now()+"monkey.log")
temp = monkeylog.read()
monkeylog.close()
if temp.count('Monkey finished')>0:
print("测试完成咯")
CPU[1].append(m.cpu)
MEN[1].append(m.men)
# geterror(ll_list, mc.get_log(), mc.get_remote_path(), mc.now) 错误显示
mt.cpu_men_plots(CPU, MEN)
break
else:
print("设备不存在")
else:
print(u"配置文件不存在"+ini_file)
  • 结果以曲线图展示
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    def cpu_men_plots(cpu, men):
    import matplotlib.pyplot as pl
    import matplotlib.dates as mdates
    import datetime

    # 处理异常数据,有时候得到数据(占用情况)会比时间多一次循环的数据,造成xy的数据不一致,而引起报错
    if len(cpu[0]) != len(cpu[1][0]):
    cpu[1][0]= cpu[1][0][0:len(cpu[0])]

    if len(men[0]) != len(men[1][0]):
    men[1][0]= men[1][0][0:len(men[0])]
    print(men[0])
    print(men[1][0])
    a1 = pl.subplot(311)
    a1.set_title("CPU")
    a1.set_ylabel("占用情况%")
    a1.plot(cpu[0], cpu[1][0])
    a1.xaxis.set_major_locator(mdates.SecondLocator(interval=1))
    a1.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))

    a2 = pl.subplot(312)
    a2.set_title("内存")
    a2.set_ylabel("使用情况 K")
    a2.plot(men[0], men[1][0])
    a2.xaxis.set_major_locator(mdates.SecondLocator(interval=2))
    a2.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))

    # a3 = pl.subplot(313)
    # a3.set_title("流量")
    # a3.set_ylabel("使用情况 K")
    # a3.plot(x,list2)
    # a3.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))

    # a1.margins(x=0.2)
    pl.tight_layout()
    pl.show()

更多请参考我的源码

例子一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from aiohttp import ClientSession
#  你使用async以及await关键字将函数异步化
async def fetch(url):
async with ClientSession() as session:
async with session.get(url) as response:
return await response.read()
async def run(loop, r):
url = "http://gc.ditu.aliyun.com/geocoding?a=苏州市"
tasks = []
for i in range(r):
task = asyncio.ensure_future(fetch(url.format(i)))
tasks.append(task)

responses = await asyncio.gather(*tasks)
# 注意asyncio.gather()的用法,它搜集所有的Future对象,然后等待他们返回。
# print(json.loads(responses[0].decode()))
print(len(responses))

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(loop, 800))
loop.run_until_complete(future)

  • future = asyncio.ensure_future(run(loop, 800)) 这里 构造1000个请求时,就报too many open files loop is not close 作者也遇到此问题,说是本机的socket 端口用光了?很是怀疑

例子二 asyncio.Semaphore解决报错问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
from aiohttp import ClientSession

async def fetch(url):
async with ClientSession() as session:
async with session.get(url) as response:
return await response.read()


async def bound_fetch(sem, url):
async with sem:
await fetch(url)

async def run(loop, r):
url = "http://gc.ditu.aliyun.com/geocoding?a=苏州市"
tasks = []
# create instance of Semaphore
sem = asyncio.Semaphore(100)
for i in range(r):
# pass Semaphore to every GET request
task = asyncio.ensure_future(bound_fetch(sem, url.format(i)))
tasks.append(task)

responses = await asyncio.gather(*tasks)
print(responses)
number = 100000
loop = asyncio.get_event_loop()

future = asyncio.ensure_future(run(loop, number))
loop.run_until_complete(future)
  • 然而我用asyncio.Semaphore时,发现请求不成功,已经发了邮件给作者,没有回我。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
​```
import asyncio
import aiohttp
import json
def fetch_page(url):
try:
response = yield from aiohttp.request('GET', url)
string = (yield from response.read()).decode('utf-8')
if response.status == 200:
data = json.loads(string)
print(data)
else:
print("data fetch failed for")
print(response.content, response.status)
except asyncio.TimeoutError:
print("访问失败")
if __name__ == '__main__':
url = 'http://gc.ditu.aliyun.com/geocoding?a=%E8%8B%8F%E5%B7%9E%E5%B8%82'
# urls = [url] * 4
loop = asyncio.get_event_loop()
tasks = []
for i in range(5):
tasks.append(asyncio.async(fetch_page(url)))
# tasks = [asyncio.async(fetch_page(z, i)) for i, z in enumerate(urls)]
print(tasks)
loop.run_until_complete(asyncio.wait(tasks, timeout=5))
loop.close()
​```

封装一次

​```
class fetch():
def __init__(self, dict_http):
'''
http请求的封装,传入dict
:param dict_http:
'''
self.dict_http = dict_http
def get(self, url, param):
data = {}
url = self.dict_http["protocol"] + self.dict_http["host"] + ":" + str(self.dict_http["port"]) + url
print(url)
try:
response = yield from aiohttp.request("GET", url, headers=self.dict_http["header"], params=param)
string = (yield from response.read()).decode('utf-8')
if response.status == 200:
data = json.loads(string)
else:
print("data fetch failed for")
print(response.content, response.status)
data["status_code"] = response.status
print(data)
except asyncio.TimeoutError:
print("访问失败")
return data
def post(self,url, param):
data = {}
url = self.dict_http["protocol"] + self.dict_http["host"] + ':' + str(self.dict_http["port"]) + url
try:
response = yield from aiohttp.request('POST', url, data=param)
string = (yield from response.read()).decode('utf-8')
if response.status == 200:
data = json.loads(string)
else:
print("data fetch failed for")
print(response.content, response.status)
data["status_code"] = response.status
print(data)
except asyncio.TimeoutError:
print("访问失败")
return data
if __name__ == '__main__':
url = '/iplookup/iplookup.php'
loop = asyncio.get_event_loop()
tasks = []
dict_http = {}
dict_http["protocol"] = "http://"
dict_http["host"] = "int.dpool.sina.com.cn"
dict_http["port"] = 80
dict_http["header"] = {"Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8","User-Agent":"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36"}
f = fetch(dict_http)
for i in range(2):
tasks.append(asyncio.async(f.get(url, param="format=json&ip=218.4.255.255")))
loop.run_until_complete(asyncio.wait(tasks, timeout=5))
loop.close()

​```

list排序,和打印

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
l = [2, 4, 3, 5]
str = "bcdea"
def badIterable():
i = 0
iterable = ["a", "b", "c"]
for item in iterable:
print(i, item)
i += 1
def goodIterable():
iterable = ["a", "b", "c"]
for i, item in enumerate(iterable):
print(i, item)

#结果:[(0, 'a'), (1, 'b'), (2, 'c')]
def listIterable():
le = list(enumerate('abc'))
return le
#结果:[(1, 'a'), (2, 'b'), (3, 'c')]
def listIterable1():
le = list(enumerate('abc', 1))
# 正序
def sortList(li):
li.sort()
return li
# 倒序
def reverseList(li):
# li.reverse()
temp = li[::-1]
print(temp)
return temp

set的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 打印
def testSet():
my_set = {i * 15 for i in range(100)}
print(my_set)
# 交集
def intersection():
x = set("span")
y = set("pta")
print(x&y)
# 合集
def collection():
x = set("span")
y = set("pta")
print(x|y)
# 差集
def subtract():
x = set("span")
y = set("pta")
print(x-y)

#去掉重复数据
def removeRepeat():
a = [11,22,33,44,11,22]
b = set(a)
print(b)
c = [i for i in b]
print(c)
1
2
3
foo = "t1m3a3pp4mt"    
print("".join(list(set(foo)))[::-1]) # 去掉重复字符串,倒序排列
print("".join(list(set(foo)))[::1]) # 去掉重复自父亲,正序排列

  • matplotlib 多个曲线的练习
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import matplotlib.pyplot as pl
import matplotlib.dates as mdates
import datetime
x = [
datetime.datetime(2011,1,1,1,1,2),
datetime.datetime(2011,1,1,1,1,3),
datetime.datetime(2011,1,1,1,1,4),
datetime.datetime(2011,1,1,1,1,5),
datetime.datetime(2011,1,1,1,1,6),
datetime.datetime(2011,1,1,1,1,7),
]

list1 = [20,10,90,10,50,3]
list2 = [1000,3000,2899,1922,16000,89222]
a1 = pl.subplot(311) # 曲线图一
a1.set_title("CPU")
a1.set_ylabel("占用情况%")
a1.plot(x,list1)
# a1.xaxis.set_major_locator(mdates.SecondLocator(interval=2))
a1.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))

a2 = pl.subplot(312) #曲线图二
a3 = pl.subplot(313) #曲线图三

a2.set_title("内存")
a2.set_ylabel("使用情况 K")
a2.plot(x,list2)
# a1.xaxis.set_major_locator(mdates.SecondLocator(interval=2))
a2.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))


a3.set_title("流量")
a3.set_ylabel("使用情况 K")
a3.plot(x,list2)
# a1.xaxis.set_major_locator(mdates.SecondLocator(interval=2))
a3.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))

# a1.margins(x=0.2)
pl.tight_layout()
pl.show()
  • 结果

Paste_Image.png

运行pytest的三种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import pytest

@pytest.mark.finished
def test_add():
print("测试函数:test_add")


@pytest.mark.finished
def test_subtract():
print("测试函数:test_subtract")


@pytest.mark.unfinished
def test_no_finish():
pass
if __name__ == "__main__":
pytest.main(["-s", "pt_test1.py"])

方式一

1
pytest.main(["-s", "pt_test1.py"])

方式二

  • 在pycharm中新建pytest
    image.png

  • 点击运行即可
    image.png

  • 使用命令执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    D:\project\ut>pytest pt_test1.py
    ======================================================================= test session starts ========================================================================
    platform win32 -- Python 3.6.6, pytest-5.4.3, py-1.9.0, pluggy-0.13.1
    rootdir: D:\project\ut
    collected 2 items

    pt_test1.py .. [100%]

    ======================================================================== 2 passed in 0.02s =========================================================================

测试制定的函数

  • ::制定测试函数
    1
    2
    3
    pytest pt_test1.py::test_add
    pytest.main(["pt_test1.py::test_add"])

  • -k模糊搜索,模糊搜索add的测试函数
    1
    2
    pytest -k add pt_test1.py
    pytest.main(["-s", "pt_test1.py", "-k", "add"])
  • 使用pytest.mark标注后,使用-m参数
    1
    2
    3
    4
    5
    @pytest.mark.finished
    def test_add():
    print("测试函数:test_add")

    pytest -m finished pt_test1.py
  • 一个函数可以打多个标记;多个函数也可以打相同的标记,运行逻辑:

pytest -m "finished and commit"

跳过测试

  • pytest.mark.skip
    1
    2
    3
    4
    5
    6
    # test_skip.py
    @pytest.mark.skip(reason='out-of-date api')
    def test_connect():
    pass

    pytest tests/test-function/test_skip.py
  • pytest.mark.skipif 为测试函数指定被忽略的条件
    1
    2
    3
    4
    5
    6
    @pytest.mark.skipif(conn.__version__ < '0.2.0',
    reason='not supported until v0.2.0')
    def test_api():
    pass

    pytest tests/test-function/test_skip.py

参数化

  • 密码长度的测试函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    # test_parametrize.py

    @pytest.mark.parametrize('passwd',
    ['123456',
    'abcdefdfs',
    'as52345fasdf4'])
    def test_passwd_length(passwd):
    assert len(passwd) >= 8

    $ pytest tests/test-function/test_parametrize.py
    ============================= test session starts =============================
    platform win32 -- Python 3.6.4, pytest-3.6.1, py-1.5.2, pluggy-0.6.0
    rootdir: F:\self-repo\learning-pytest, inifile:
    collected 3 items

    tests\test-function\test_parametrize.py F.. [100%]

    ================================== FAILURES ===================================
  • 再看一个多参数的例子,用于校验用户密码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    # test_parametrize.py

    @pytest.mark.parametrize('user, passwd',
    [('jack', 'abcdefgh'),
    ('tom', 'a123456a')])
    def test_passwd_md5(user, passwd):
    db = {
    'jack': 'e8dc4081b13434b45189a720b77b6818',
    'tom': '1702a132e769a623c1adb78353fc9503'
    }

    import hashlib

    assert hashlib.md5(passwd.encode()).hexdigest() == db[user]

    $ pytest -v tests/test-function/test_parametrize.py::test_passwd_md5_id
    ============================= test session starts =============================
    platform win32 -- Python 3.6.4, pytest-3.6.1, py-1.5.2, pluggy-0.6.0 -- c:\anaconda3\python.exe
    cachedir: .pytest_cache
    rootdir: F:\self-repo\learning-pytest, inifile:
    collected 2 items

    tests/test-function/test_parametrize.py::test_passwd_md5_id[User<Jack>] PASSED [ 50%]
    tests/test-function/test_parametrize.py::test_passwd_md5_id[User<Tom>] PASSED [100%]

    ========================== 2 passed in 0.07 seconds ===========================

固件

  • 固件(Fixture)是一些函数,pytest 会在执行测试函数之前(或之后)加载运行它们
    - Pytest 使用pytest.fixture()定义固件,下面是最简单的固件,只返回北京邮编
    1
    2
    3
    4
    5
    6
    7
    8

    @pytest.fixture()
    def postcode():
    return '010'


    def test_postcode(postcode):
    assert postcode == '010'

预处理和后处理

  • 很多时候需要在测试前进行预处理(如新建数据库连接),并在测试完成进行清理(关闭数据库连接)。
  • Pytest 使用 yield 关键词将固件分为两部分,yield 之前的代码属于预处理,会在测试前执行;yield 之后的代码属于后处理,将在测试完成后执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# test_db.py

@pytest.fixture()
def db():
print('Connection successful')

yield

print('Connection closed')


def search_user(user_id):
d = {
'001': 'xiaoming'
}
return d[user_id]


def test_search(db):
assert search_user('001') == 'xiaoming

============================= test session starts =============================
platform win32 -- Python 3.6.4, pytest-3.6.1, py-1.5.2, pluggy-0.6.0
rootdir: F:\self-repo\learning-pytest, inifile:
collected 1 item

tests\fixture\test_db.py Connection successful
.Connection closed

作用域

在定义固件时,通过 scope 参数声明作用域,可选项有:

  • function: 函数级,每个测试函数都会执行一次固件;默认的作用域为 function
  • class: 类级别,每个测试类执行一次,所有方法都可以使用;
  • module: 模块级,每个模块执行一次,模块内函数和方法都可使用;
  • session: 会话级,一次测试只执行一次,所有被找到的函数和方法都可用。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @pytest.fixture(scope='function')
    def func_scope():
    pass


    @pytest.fixture(scope='module')
    def mod_scope():
    pass


    @pytest.fixture(scope='session')
    def sess_scope():
    pass


    @pytest.fixture(scope='class')
    def class_scope():
    pass
  • 对于类使用作用域,需要使用 pytest.mark.usefixtures (对函数和方法也适用):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    # test_scope.py

    @pytest.mark.usefixtures('class_scope')
    class TestClassScope:
    def test_1(self):
    pass

    def test_2(self):
    pass

    $ pytest --setup-show tests/fixture/test_scope.py::TestClassScope
    ============================= test session starts =============================
    platform win32 -- Python 3.6.4, pytest-3.6.1, py-1.5.2, pluggy-0.6.0
    rootdir: F:\self-repo\learning-pytest, inifile:
    collected 2 items

    tests\fixture\test_scope.py
    SETUP C class_scope
    tests/fixture/test_scope.py::TestClassScope::()::test_1 (fixtures used: class_scope).
    tests/fixture/test_scope.py::TestClassScope::()::test_2 (fixtures used: class_scope).
    TEARDOWN C class_scope

使用命令行在pytest中传递多个参数

  • 配置conftest.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    # conftest.py
    import pytest
    def pytest_addoption(parser):
    parser.addoption("--input1", action="store", default="default input1")
    parser.addoption("--input2", action="store", default="default input2")

    @pytest.fixture
    def input1(request):
    return request.config.getoption("--input1")

    @pytest.fixture
    def input2(request):
    return request.config.getoption("--input2")
  • 编写测函数
    1
    2
    3
    4
    5
    6
    7
    # test.py
    import pytest

    @pytest.mark.unit
    def test_print_name(input1, input2):
    print ("Displaying input1: %s" % input1)
    print("Displaying input2: %s" % input2)
  • 执行命令
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >py.test -s test.py --input1 tt --input2 12
    ================================================= test session starts =================================================
    platform win32 -- Python 3.7.0, pytest-4.1.1, py-1.7.0, pluggy-0.8.1
    rootdir: pytest, inifile:
    collected 1 item

    test.py Displaying input1: tt
    Displaying input2: 12
    .

    ============================================== 1 passed in 0.04 seconds ====================================

其他的一些参数总结

  • -v, --verbose
    详细结果
    - -q, --quiet
    极简结果显示,简化控制台的输出,可以看出输出信息和之前不添加-q不信息不一样, 下图中有两个..点代替了pass结果
    - -s
    输入我们用例中的调式信息,比如print的打印信息等,我们在用例中加上一句 print(driver.title),我们再运行一下我们的用例看看,调试信息输出
    - -V
    可以输出用例更加详细的执行信息,比如用例所在的文件及用例名称等
  • --junit-xml=path
    输出xml文件格式,在与jenkins做集成时使用
  • --result-log=path
    将最后的结果保存到本地文件中

本文来自

兼容unittest中的setup

  • 使用setup,setup_cass,teardown_class
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    class TestCase():
    def setup(self):
    print("setup: 每个用例开始前执行")
    def teardown(self):
    print("teardown: 每个用例结束后执行")
    def setup_class(self):
    print("setup_class:所有用例执行之前")
    def teardown_class(self):
    print("teardown_class:所有用例执行之前")
    def setup_method(self):
    print("setup_method: 每个用例开始前执行")
    def teardown_method(self):
    print("teardown_method: 每个用例结束后执行")
    def test_one(self):
    print("正在执行----test_one")
    x = "this"
    assert 'h' in x
    def test_three(self):
    print("正在执行test_two")
    a = "hello"
    b = "hello word"
    assert a in b
    def add(self,a, b):
    print("这是加减法")
    return a + b
    if __name__ == '__main__':
    pytest.main(['-s', 'test_fixt_class'])
  • 来自https://www.cnblogs.com/tallme/p/11369791.html

conftest

  • 多用例的数据可以共,比如selenium中的driver的
  • conftest.py文件名字不能更改
  • conftest.py与运行的用例要在同一个pakage下,并且有__init__.py文件
  • 如下图,我可以放到和用例testcase同级文件夹目录,也可以放到testcase文件夹下面的用例目录

image.png

  • 代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    # conftest.py
    import pytest
    from selenium import webdriver
    import os


    @pytest.fixture()
    def driver():
    PATH = lambda p: os.path.abspath(
    os.path.join(os.path.dirname(__file__), p)
    )
    driver_path = PATH("../exe/chromedriver.exe")
    driver = webdriver.Chrome(driver_path)
    driver.get('https://www.baidu.com')
    driver.maximize_window()

    # 返回数据
    yield driver

    # 实现用例后置
    driver.quit()

    import pytest

    # test_case001.pu

    class TestClassName:
    @pytest.mark.usefixtures("driver")
    def test_func_name(self, driver):
    driver.find_element_by_id("kw").click()
  • 执行: pytest -s testcase/

共享session

在selenium中,可以实现driver共享,不用每个用例都重新打开和关闭浏览器

image-20220823100335089

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# conftest.py 

import os
from selenium import webdriver
import pytest

@pytest.fixture(scope='session')
def driver(request):
PATH = lambda p: os.path.abspath(
os.path.join(os.path.dirname(__file__), p)
)
chromedriver = PATH("exe/chromedriver.exe")
os.environ["webdriver.chrome.driver"] = chromedriver
driver = webdriver.Chrome(chromedriver)
driver.maximize_window() # 将浏览器最大化
driver.implicitly_wait(2)
# openurl = "http://www.baidu.com"
# driver.get(openurl)

def end():
driver.quit()

request.addfinalizer(end) # 终结函数
return driver


# test_1.py
import pytest
def test_001(driver):
openurl = "http://www.baidu.com"
driver.get(openurl)
driver.find_element_by_id("kw").click()

if __name__ == '__main__':
pytest.main(['-v', 'test_1.py'])

# test_2.py
import pytest
import time

def test_001(driver):
# openurl = "http://www.baidu.com"
# driver.get(openurl)
driver.find_element_by_id("kw").send_keys("baidu")
time.sleep(5)
if __name__ == '__main__':
pytest.main(['-v', 'test_2.py'])

批量执行z

1
pytest -s TestCase\

单例模式

  • 单例是一种设计模式,应用该模式的类只会生成一个实例,单例模式保证了在程序的不同位置都可以且仅可以取到同一个对象实例:如果实例不存在,会创建一个实例;如果已存在就会返回这个实例

  • 单例函数

1
2
3
4
5
6
7
8
9
10
# coding:utf-8  
# singleton.py
#单例模式函数,用来修饰类
def singleton(cls,*args,**kw):
instances = {}
def _singleton():
if cls not in instances:
instances[cls] = cls(*args,**kw)
return instances[cls]
return _singleton
  • selenium 的driver只有一个单例实例:GetSeleniumDriver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# -*- coding:utf-8 -*-
from selenium import webdriver
from Utils.singleton import singleton
import os
@singleton
class GetSeleniumDriver(object):
def __init__(self):
PATH = lambda p: os.path.abspath(
os.path.join(os.path.dirname(__file__), p))
chromedriver = PATH("../exe/chromedriver.exe")
os.environ["webdriver.chrome.driver"] = chromedriver
self.driver = webdriver.Chrome(chromedriver)
self.driver.maximize_window() # 将浏览器最大化
self.driver.implicitly_wait(2)
  • 编写用例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# test_3.py

import pytest
import time
from Utils.GetSeleniumDriver import GetSeleniumDriver
class TestCase003(object):

# 初始化
def setup_class(self):
self.driver=GetSeleniumDriver().driver
self.driver.get("http://www.baidu.com")

def test_003(self):
self.driver.find_element_by_id("kw").send_keys("baidu")
time.sleep(5)
# 清理后,影响其他函数
def teardown_class(self):
# self.driver.close()
# self.driver.quit()
pass


if __name__ == '__main__':
pytest.main(['-v', 'test_3.py'])

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# test_4.py
import pytest
import time
from Utils.GetSeleniumDriver import GetSeleniumDriver


class TestCase004():

def setup_class(self):
self.driver = GetSeleniumDriver().driver
# openurl = "http://www.baidu.com"
# self.driver.get(openurl)

def test_004(self):
self.driver.find_element_by_id("kw").send_keys("baidu222")
time.sleep(5)

def teardown_class(self):
# self.driver.close()
# self.driver.quit()
pass

  • 代码结构

image-20220916152333834

html美化

conftest.py中的优化的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import time

import pytest
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from py._xmlgen import html
import os

_driver = None


@pytest.fixture(scope='session', autouse=True)
def driver():
global _driver
chromedriver=r"D:\project\autoPlatform\webAuto\exe\chromedriver.exe"
os.environ["webdriver.chrome.driver"] = chromedriver
_driver = webdriver.Chrome(chromedriver)
_driver.maximize_window()
_driver.get("https://www.baidu.com/")
# 返回数据
yield _driver
# 实现用例后置
# _driver.close() # centos报错
_driver.quit()


@pytest.mark.optionalhook
def pytest_html_results_summary(prefix):
"""
#添加summary内容
"""
prefix.extend([html.p("所属部门: 测试组")])
prefix.extend([html.p("框架设计: XXX")])


@pytest.mark.hookwrapper
def pytest_runtest_makereport(item):
"""
当测试失败的时候,自动截图,展示到html报告中
:param item:
"""
if not _driver:
return
pytest_html = item.config.pluginmanager.getplugin('html')
outcome = yield
report = outcome.get_result()

report.description = str(item.function.__doc__)
extra = getattr(report, 'extra', [])

if report.when == 'call' or report.when == "setup":
xfail = hasattr(report, 'wasxfail')
if (report.skipped and xfail) or (report.failed and not xfail):
screen_img = _capture_screenshot()
if screen_img:
html = '<div><img src="data:image/png;base64,%s" alt="screenshot" style="width:1024px;height:768px;" ' \
'onclick="window.open(this.src)" align="right"/></div>' % screen_img
extra.append(pytest_html.extras.html(html))
report.extra = extra


def pytest_html_results_table_header(cells):
cells.insert(1, html.th('Description')) # 表头添加Description
cells.pop(-1) # 删除link


def pytest_html_results_table_row(report, cells):
'''新增用例描述内容,来自于用例的注释'''
cells.insert(1, html.td(report.description)) # 用例的描述
cells.pop(-1) # 删除link


def pytest_html_results_table_html(report, data):
"""
去除执行成功用例的log输出
"""
if report.passed:
del data[:]
data.append(html.div('通过的用例未捕获日志输出.', class_='empty log'))
pass


def pytest_html_report_title(report):
report.title = "pytest示例项目测试报告"

def pytest_terminal_summary(terminalreporter, exitstatus, config):
'''收集测试结果'''
# print(terminalreporter.stats)
passed = len([i for i in terminalreporter.stats.get('passed', []) if i.when != 'teardown'])
failed = len([i for i in terminalreporter.stats.get('failed', []) if i.when != 'teardown'])
error = len([i for i in terminalreporter.stats.get('error', []) if i.when != 'teardown'])
# skipped = len([i for i in terminalreporter.stats.get('skipped', []) if i.when != 'teardown'])
print("total:", passed+failed+error)
print('passed:', passed)
print('failed:', failed)
print('error:', error)
# print('成功率:%.2f' % (len(terminalreporter.stats.get('passed', []))/terminalreporter._numcollected*100)+'%')
# terminalreporter._sessionstarttime 会话开始时间
duration = time.time() - terminalreporter._sessionstarttime
print('total times:', duration, 'seconds')

def _capture_screenshot():
"""
截图保存为base64
:return:
"""
return _driver.get_screenshot_as_base64()



  • 编写用例,用例下面的注释:测试百度作为报告中的标题
1
2
3
4
5
6
7
8
@pytest.mark.finished
def test_baidu2(self, driver):
"""
测试百度下
"""
page = BaiduPage(driver, "test_baidu2")
page.operate()
page.check_point()
  • 执行结果
1
pytest -m finished webAuto\testcase --html=report.html --self-contained-html --capture=sys

image-20230508174717686

  • schematics代替model层的功能
  • schematics自动的字段有验证功能
1
2
3
4
5
6
7
8
9
10
11
12
13
from schematics.models import Model
from schematics.types import StringType, URLType
import json
class Person(Model):
name = StringType(required=True) #不能为空
website = URLType()
person = Person({'name':'sss',
'website': 'http://soundcloud.com/joestrummer'})
person.name=u"哈哈"
js = json.dumps(person.to_primitive())
print(js)
f = person.validate() ##验证字段是否正确,如果不正确会报错,如果正确返回的就是none
print(f)
  • 列表嵌套字典
    • 你也可以嵌套其他的model层
1
2
3
4
5
6
7
8
9
10
11
12
13
from schematics.models import Model
from schematics.types import StringType, IntType
from schematics.types.compound import ListType,MultiType
class Person(Model):
mls = ListType(MultiType())
shi = StringType()
code = IntType()
mls_test = Person({'mls': [{
'en_US': 'Hello, world!',
'fr_FR': 'Bonjour tout le monde!',
'es_MX': '¡Hola, mundo!',
}],'shi':'kun','code':0})
print(mls_test.mls[0]["en_US"])
  • 字典
1
2
3
4
5
6
7
8
9
10
11
12
from schematics.models import Model
from schematics.types import StringType, IntType,MultilingualStringType
class Person(Model):
mls = MultilingualStringType()
shi = StringType()
code = IntType()
mls_test = Person({'mls': {
'en_US': 'Hello, world!',
'fr_FR': 'Bonjour tout le monde!',
'es_MX': '¡Hola, mundo!',
},'shi':'kun','code':0})
print(mls_test.mls["en_US"])

更多schematics高级的用法

  • jsonschema 是验证json的正确性
1
2
3
4
5
6
7
8
9
10
11
12
13
from jsonschema import validate

schema = {
"type" : "object",
"properties" : {
"price" : {"type" : "number"},
"name" : {"type" : "string"},
"list":{"maxItems":2},
"address":{'regex':'bj'},
},
}

validate({"name" : "Eggs", "price" : "dfda",'list':[1,5],'address':'bj-jiuxianqiao'}, schema)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import unittest
import os


class MyUnittest(unittest.TestCase):

def setUp(self) -> None:
print("setUp_每运行一次用例都会执行一次")
self.name = "hi"
print("setUp_name_%s" % self.name)

def tearDown(self) -> None:
print("tearDown_每结束一次用例都会执行一次")
print("tearDown_name_%s" % self.name)

@classmethod
def setUpClass(cls) -> None:
print("setUpClass_整个测试开始后执行,只执行一次")
cls.func = "setUpClass"
print("setUpClass_func_%s" % cls.func)

@classmethod
def tearDownClass(cls) -> None:
print("tearDownClass_整个测试完成后执行,只执行一次")
print("tearDownClass_func_%s" % cls.func)

def test_add(self):
print("测试函数:test_add")
self.name = "test_add"
print("setUpClass_cls.func_%s" % self.func)

def test_subtract(self):
print("测试函数:test_subtract")
self.name = "test_subtract"
self.func = "test_subtract"


if __name__ == "__main__":
# 运行方式一
unittest.main()

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
setUpClass_整个测试开始后执行,只执行一次
setUpClass_func_setUpClass
test_add (ut_module.ut1_test.MyUnittest) ... setUp_每运行一次用例都会执行一次
setUp_name_hi
测试函数:test_add
setUpClass_cls.func_setUpClass
tearDown_每结束一次用例都会执行一次
tearDown_name_test_add
ok
test_subtract (ut_module.ut1_test.MyUnittest) ... setUp_每运行一次用例都会执行一次
setUp_name_hi
测试函数:test_subtract
tearDown_每结束一次用例都会执行一次
tearDown_name_test_subtract
ok
tearDownClass_整个测试完成后执行,只执行一次
tearDownClass_func_setUpClass

----------------------------------------------------------------------
Ran 2 tests in 0.003s

OK

测试分析

结果分析

setUp初始化的值可以改变

1
2
3
4
5
setUp_name_hi
测试函数:test_add
setUpClass_cls.func_setUpClass
tearDown_每结束一次用例都会执行一次
tearDown_name_test_add
  • setUp中的self.name的值,在测试函数中是可以被改变的,一开始name为hi,后面为test_add

setUpClass的的值无法改变

1
2
3
4
5
6
7
8
9
10
在def setUpClass赋值
self.func = "setUpClass"
在 def test_subtract中赋值
self.func = "test_subtract"
# 执行结果
setUpClass_整个测试开始后执行,只执行一次
setUpClass_func_setUpClass
.....
tearDownClass_整个测试完成后执行,只执行一次
tearDownClass_func_setUpClass
  • setUpClass初始化的值在函数中无法改变,应该是clsself的指向不一样

setUp和setUpClass

  • 每个用例都要独立,也就是每个用执行时,都会重开设备,就使用setup,比如appium中的driver
  • 若想复用某个初始化条件,单不期望每个用例都重启再打开,可以使用setUpClass
    • 在使用appium时,用setUp频繁初始化driver造成执行用例时间太长,可以使用setUpClass初始化driver,结合launch-app达到每个用例不重启,同时解决依赖关系的问题

不同的运行unittest的方式

方式一

1
unittest.main()

方式二

1
2
3
4
5
6
7
# 构造测试集
suite = unittest.TestSuite()
suite.addTest(MyUnittest("test_add"))
suite.addTest(MyUnittest("test_subtract"))
# 执行测试
runner = unittest.TextTestRunner()
runner.run(suite)

方式三

1
2
3
4
5
6
7
8
# TestLoader 用来加载TestCase到TestSuite中的,其中有几个  loadTestsFrom__()方法,就是从各个地方寻找TestCase,创建它们的实例,然后add到TestSuite中,再返回一个TestSuite实例
# TestSuite 测试套件集合,最终将传递给testRunner进行测试执行
# TextTestRunner 用来执行测试用例,将测试结果保存在TextTestResult中
# 此用法可以同时测试多个类
suite1 = unittest.TestLoader().loadTestsFromTestCase(MyUnittest)
suite2 = unittest.TestLoader().loadTestsFromTestCase(MyUnittest1)
suite = unittest.TestSuite([suite1, suite2 ])
unittest.TextTestRunner(verbosity=2).run(suite)

方式四

1
2
3
# 进入到测试目录
os.chdir(".//ut_module")
os.system("python -m unittest -v ut1_test")

方式五,直接到终端中输入命令

1
2
3
4
5
6
7
8
9
# 测试某个类下的所有函数
D:\project\ut>python -m unittest -v ut_module.ut1_test
# 测试类中某一个函数
D:\project\ut>python -m unittest -v ut_module.ut1_test.MyUnittest.test_add
# 执行模块下所有test结尾的文件
D:\project\ut>python -m unittest discover D:\project\ut\ut_module "*_test.py"
# 同时执行多个测试类下的测试函数
D:\project\ut>python -m unittest ut_module.ut1_test.MyUnittest.test_add ut_module.ut2_test.MyUnittest.test_subtract1

其他扩展

unittest的条件装饰器的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MyTestCase(unittest.TestCase):

@unittest.skip("demonstrating skipping")
def test_nothing(self):
self.fail("shouldn't happen")

@unittest.skipIf(mylib.__version__ < (1, 3),
"not supported in this library version")
def test_format(self):
# Tests that work for only a certain version of the library.
pass

@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows")
def test_windows_support(self):
# windows specific testing code
pass

def test_maybe_skipped(self):
if not external_resource_available():
self.skipTest("external resource not available")
# test code that depends on the external resource
pass
@unittest.expectedFailure
def test_fail(self):
self.assertEqual(1, 0, "broken")

@unittest.skip("showing class skipping")
class MySkippedTestCase(unittest.TestCase):
def test_not_run(self):
pass
  • @unittest.skip(reason)
    跳过被此装饰器装饰的测试。 reason 为测试被跳过的原因。
  • @unittest.skipIf(condition, reason)
    当 condition 为真时,跳过被装饰的测试。
  • @unittest.skipUnless(condition, reason)
    跳过被装饰的测试,除非 condition 为真。

@unittest.expectedFailure
把测试标记为预计失败。如果测试不通过,会被认为测试成功;如果测试通过了,则被认为是测试失败。

  • exception unittest.SkipTest(reason)
    引发此异常以跳过一个测试。

参数化

  • 在使用unittest时,希望testcase传参给unittest,下面时用appium的一个伪代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    class ParametrizedTestCase(unittest.TestCase):
    """ TestCase classes that want to be parametrized should
    inherit from this class.
    """

    def __init__(self, methodName='runTest', param=None):
    super(ParametrizedTestCase, self).__init__(methodName)
    # 得到testcase传来的参数
    global devicess
    devicess = param

    @classmethod
    def setUpClass(cls):
    pass
    cls.driver = get_driver(devicess)
    cls.logTest = myLog().getLog(cls.devicesName) # 每个设备实例化一个日志记录器

    def setUp(self):
    pass

    @classmethod
    def tearDownClass(cls):
    cls.driver.close_app()
    cls.driver.quit()
    pass
    def tearDown(self):
    pass

    @staticmethod
    def parametrize(testcase_klass, param=None):
    testloader = unittest.TestLoader()
    testnames = testloader.getTestCaseNames(testcase_klass)
    suite = unittest.TestSuite()
    for name in testnames:
    suite.addTest(testcase_klass(name, param=param))
    return suite


    class HomeTest(ParametrizedTestCase):
    def testFirstOpen(self):
    app = {"logTest": self.logTest, "driver": self.driver, "path": PATH("../yamls/home/firstOpen.yaml"),
    "device": self.devicesName, "caseName": sys._getframe().f_code.co_name}

    page = FirstOpenPage(app)
    page.operate()
    page.checkPoint()



    @classmethod
    def setUpClass(cls):
    super(HomeTest, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
    super(HomeTest, cls).tearDownClass()

    if __name__ == '__main__':
    devices = {"设备信息"}
    suite = unittest.TestSuite()
    suite.addTest(ParametrizedTestCase.parametrize(HomeTest, param=devices))
    unittest.TextTestRunner(verbosity=2).run(suite)

ddt

  • pip install ddt

使用json

  • 新建文件 test_data_list.json:
    1
    2
    3
    4
    [
    "Hello",
    "Goodbye"
    ]
  • 新建文件 test_data_dict.json:
    1
    2
    3
    4
    {
    "unsorted_list": [ 10, 12, 15 ],
    "sorted_list": [ 15, 12, 50 ]
    }
  • 实例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import unittest
from ddt import ddt, file_data
from ddt_demo.mycode import has_three_elements,is_a_greeting

@ddt
class FooTestCase(unittest.TestCase):

@file_data('test_data_dict.json')
def test_file_data_json_dict(self, value):
self.assertTrue(has_three_elements(value))

@file_data('test_data_list.json')
def test_file_data_json_list(self, value):
self.assertTrue(is_a_greeting(value))

if __name__=='__main__':
unittest.main(verbosity=2

使用yaml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import unittest
from ddt import ddt, file_data
from ddt_demo.mycode import has_three_elements,is_a_greeting

@ddt
class FooTestCase(unittest.TestCase):

@file_data('test_data_dict.yaml')
def test_file_data_yaml_dict(self, value):
self.assertTrue(has_three_elements(value))

@file_data('test_data_list.yaml')
def test_file_data_yaml_list(self, value):
self.assertTrue(is_a_greeting(value))

if __name__=='__main__':
unittest.main(verbosity=2)

非阻塞启动线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import time
def one_thread(name,id):
print("start....")
print(name)
print(id)
time.sleep(5)
print("end...")

print("start thread")
threading.Thread(target=one_thread, args=(), kwargs={"name": 111, "id": 222}).start()
# args是一个list
# kwargs是一个字典,需要对应函数的key
print("end thread")
  • 得到值如下,线程启动函数后,非阻塞执行
    1
    2
    3
    4
    5
    6
    start thread
    start....
    111
    222
    end thread
    end...

多线程并发处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import threading
import time


class myThread(threading.Thread):
def __init__(self, threadID, name):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name

def run(self):
print_time(self.threadID, self.name)

num = 0
def print_time(threadID, name):
global num
# 每一个线程循环10次,最终总循环次数为30次
for i in range(10):
print("start run")
time.sleep(2)
print(i)
num += 1
print("thread_id=%s:name=%s" % (threadID, name))


if __name__ == '__main__':
threads = []
# 新增三个线程
for i in range(3):
name = "Thread-%d" % i
t = myThread(i, name)
t.start()
threads.append(t)
for t in threads:
t.join()
print("所有线程执行完毕")
print("总循环次数为:%s" % num)
  • 打印结果:每次运行三个线程,每个线程循环打印10次
    1
    2
    3
    4
    5
    6
    7
    8
    9
    start run
    start run
    start run
    0
    0
    ...
    thread_id=1:name=Thread-1
    所有线程执行完毕
    总循环次数为:30
  • 多线程共享资源,可以使用全局变量global

多线程加锁

  • 对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# -*- coding: utf-8 -*-
import time
import threading
# 创建锁对象
lock = threading.Lock()
num = 0

def run(n):
global num
for i in range(10):
# 加锁 为了确保下面代码只能由一个线程从头到尾的执行
# 会阻止多线程的并发执行,所以效率会大大降低
"""
lock.acquire()
try:
num = num - n
num = num + n
finally:
# 解锁
lock.release()
"""
with lock:
time.sleep(2)
print("start")
num = num + 1
print("==============")


if __name__ == '__main__':
t1 = threading.Thread(target=run,args=(6,))
t2 = threading.Thread(target=run,args=(9,))
t1.start()
t2.start()
t1.join()
t2.join()
print("num = %s"%(num))
  • 打印结果是每次只能运行一个线程
1
2
3
4
start
==============
...
num = 20

多线程与队列

  • 我们经常会遇到这样的一个问题,这里有成千上万条数据,每次需要取出其中的一条数据进行处理,那么引入多线程该怎么进行任务分配?
  • 我们可以将数据进行分割然后交给多个线程去跑,可是这并不是一个明智的做法。在这里我们可以使用队列与线程相结合的方式进行任务分配。
  • 队列线程的思想: 首先创建一个全局共享的队列,队列中只存在有限个元素,并将所有的数据逐条加入到队列中,并调用队列的join函数进行等待。之后便可以开启若干线程,线程的任务就是不断的从队列中取数据进行处理就可以了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import threading
import time
import queue

q = queue.Queue(10)

threadLock = threading.Lock()


class myThread(threading.Thread):
def __init__(self, threadID, name):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.exitFlag = 0

def run(self):
while not self.exitFlag:
threadLock.acquire()
if not q.empty():
id = q.get()
print_time(self.name, id)
threadLock.release()
else:
threadLock.release()


def print_time(threadName, id):
print ("%s:%s:%s"%(threadName,time.ctime(time.time()),id))
# pass


# 创建3个线程
threads = []
for i in range(3):
name = "Thread-%d" % i
t = myThread(i, name)
t.start()
threads.append(t)
print(threads)

# 新增队列数据
for i in range(10000):
q_name = "Queue:%d" % i
q.put(q_name)

# 等待队列清空
while not q.empty():
pass

# 也可以join方法,与上同效
# q.join()

# 通知线程,处理完之后关闭
for t in threads:
t.exitFlag = 1

# 等待所有线程结束之后才退出
for t in threads:
t.join()

print("Exiting Main Thread")
  • 这里必须要在判断q.empty()前加上线程锁,因为可能会出现这样的一种情况。
  • 某一时刻,队列中还有一个元素,该元素正在被线程A取出,而与此同时线程B正在判断队列q是否为空,而此时线程B中队列q不为空进入后面的操作,但是待B去取元素时,最后一个元素已经被A取出,造成线程等待,显示出被挂起的状态。
  • 我们也可以通过加入q.get(timeout=10)超时操作来弥补这一问题。
  • 打印的结果
    1
    2
    3
    4
    5
    6
    7
    [<myThread(Thread-0, started 6568)>, <myThread(Thread-1, started 7724)>, <myThread(Thread-2, started 7796)>]
    Thread-1:Sat Aug 22 11:36:29 2020:Queue:0
    Thread-1:Sat Aug 22 11:36:29 2020:Queue:1
    ...
    Thread-1:Sat Aug 22 11:36:30 2020:Queue:9998
    Thread-1:Sat Aug 22 11:36:30 2020:Queue:9999
    Exiting Main Thread

ThreadPoolExecutor线程池的使用

  • 锁依然可以运用到线程池
  • map的使用,接受一个List的数据,会循环调用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    from concurrent.futures.thread import ThreadPoolExecutor
    import time
    num = 0
    def print_time(data):
    global num
    num += 1
    time.sleep(2)
    print("start_%s" % data)
    print("============")
    data = []
    for i in range(50):
    data.append(i)
    with ThreadPoolExecutor(10) as pool:
    result = pool.map(print_time, data)
    # 等待所有线程执行完毕
    for i in result:
    pass
    print("循环次数=%s" % num)

  • 打印结果为:每次启动10个线程,启动了5次
    1
    2
    3
    4
    5
    6
    ============
    start_46
    start_49
    ============
    ============
    循环次数=50
  • submit接受list的数据,也可以接受字典
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    rom concurrent.futures.thread import ThreadPoolExecutor
    from concurrent.futures import as_completed

    import time
    def print_time(data):
    time.sleep(2)
    print("start_%s" % data)
    print("============")
    data = []
    for i in range(50):
    data.append(i)
    with ThreadPoolExecutor(10) as executor:
    future_list = []
    for i in range(10):
    # future = executor.submit(print_time,data)
    future = executor.submit(print_time, {"name": 111, "id": 222})
    future_list.append(future)
    for res in as_completed(future_list): # 这个futrure_list是你future对象的列表
    print(res.result())

参考

os.system

  • 执行的时候程序会打出cmd在linux上执行的信息
  • 执行命令成功返回为0,否则为1
  • 如果想获取在cmd输出的内容,是没办法获到的
    1
    2
    3
    4
    t2 = os.system("1adb devices")
    t3 = os.system("adb devices")
    print(t2) # 打印为1
    print(t3) # 打印为0

subprocess.call

  • 取代os.system,但是也是无法获取cmd输出的内容
1
2
3
import subprocess
t = subprocess.call('adb devices')
print(t) # 打印为0

os.popen

  • popen返回的是一个file对象,跟open打开文件一样操作了,r是以读的方式打开
1
2
output = os.popen('adb devices')
print(output.read()) # 得到List of devices attached

subprocess.Popen

  • subprocess模块代替os.systemos.popen,能够得到命令输出的值

shell参数

  • linux下,当shell=False(默认)时,Popen使用os.execvp()来执行子程序。args一般要是一个【列表】。如果args是个字符串的
    话,会被当做是可执行文件的路径,这样就不能传入任何参数了。
    1
    2
    3
    subprocess.Popen("cat test.txt", shell=True)
    这是因为它相当于
    subprocess.Popen(["/bin/sh", "-c", "cat test.txt"])

stdin stdout stderr 参数

  • 分别表示程序的标准输入、输出、错误句柄。他们可以是PIPE,文件描述符或文件对象,也可以设置为None表示从父进程继承
  • 执行结果使用管道输出的实例:
1
2
pipe=subprocess.Popen("adb devices",shell=True,stdout=subprocess.PIPE).stdout
print(pipe.read()) # 得到值b'List of devices attached\r\n\r\n'
  • 执行结果保存在文件实例

    1
    2
    3
    4
    cmd = "adb shell ls /sdcard/ | findstr aa.png"  
    fhandle = open(r"e:\aa.txt", "w")
    pipe = subprocess.Popen(cmd, shell=True, stdout=fhandle).stdout
    fhandle.close()
  • 子进程的文本流控制

    1
    2
    3
    4
    5
    6
    7
    8
    #!/usr/bin/env python

    import subprocess

    child1 = subprocess.Popen(["ls","-l"], stdout=subprocess.PIPE)
    child2 = subprocess.Popen(["wc"], stdin=child1.stdout,stdout=subprocess.PIPE)
    out = child2.communicate()
    print out
  • child1.stdout–>subprocess.PIPE

  • child2.stdin<–subprocess.PIPE

  • child2.stdout–>subprocess.PIPE

  • 相当于将child1.stdout–>child2.stdin->child2.stdout->subprocess.PIPE

  • subprocess.PIPE实际上为文本流提供一个缓存区。child1的stdout将文本输出到缓存区,随后child2的stdin从该PIPE中将文本读取走。child2的输出文本也被存放在PIPE中,直到communicate()方法从PIPE中读取出PIPE中的文本。

  • 要注意的是,communicate()是Popen对象的一个方法,该方法会阻塞父进程,直到子进程完成。

  • 我们还可以利用communicate()方法来使用PIPE给子进程输入:

    1
    2
    3
    import subprocess
    child = subprocess.Popen(["cat"], stdin=subprocess.PIPE)
    child.communicate("vamei") //()不为空,则写入subprocess.PIPE,为空,则从subprocess.PIPE读取
  • subprocess.PIPE–>child.stdin

  • commiuncate相当于写入subprocess.PIPE,然后child从subprocess.PIPE读取

  • 利用python的subprocess模块执行外部命令, 并捕获stdout, stderr的输出

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import subprocess


    # print ’popen3:’

    def external_cmd(cmd, msg_in=''):
    try:
    proc = subprocess.Popen(cmd,
    shell=True,
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    )
    stdout_value, stderr_value = proc.communicate(msg_in)
    return stdout_value, stderr_value
    except ValueError as err:
    # log("ValueError: %s" % err)
    return None, None
    except IOError as err:
    # log("IOError: %s" % err)
    return None, None


    if __name__ == '__main__':
    stdout_val, stderr_val = external_cmd('dir')
    print ('Standard Output: %s' % stdout_val)
    print ('Standard Error: %s' % stderr_val)
  • 得到命令的返回值,wait

    1
    2
    p=subprocess.Popen("dir", shell=True)  
    p.wait()
  • 但是Popen函数有一个缺陷,就是它是一个阻塞的方法。如果运行cmd时产生的内容非常多,函数非常容易阻塞住。解决办法是不使用wait()方法,但是也不能获得执行的返回值了。

commands.getstatusoutput

  • 不推荐使用,因为在python3中被废弃