0%

ab压力测试中,发现你一次最多只能启动1024个线程

  • 默认情况下,一个线程的栈要预留1M的内存空间
    而一个进程中可用的内存空间只有2G,所以理论上一个进程中最多可以开2048个线程
  • 但是内存当然不可能完全拿来作线程的栈,所以实际数目要比这个值要小。你也可以通过连接时修改默认栈大小,将其改的比较小,这样就可以多开一些线程。如将默认栈的大小改成512K,这样理论上最多就可以开4096个线程。即使物理内存再大,一个进程中可以起的线程总要受到2GB这个内存空间的限制。
  • 比方说你的机器装了64GB物理内存,但每个进程的内存空间还是4GB,其中用户态可用的还是2GB。
    • 内核态. CPU可以访问内存所有数据, 包括外围设备, 例如硬盘, 网卡. CPU也可以将自己从一个程序切换到另一个程序
    • 用户态:.只能受限的访问内存, 且不允许访问外围设备. 占用CPU的能力被剥夺, CPU资源可以被其他程序获取
  • 如果是同一台机器内的话,能起多少线程也是受内存限制的。每个线程对象都要站用非页面内存,而非页面内存也是有限的,当非页面内存被耗尽时,也就无法创建线程了。
  • 如果物理内存非常大,同一台机器内可以跑的线程数目的限制值会越来越大。在Windows下写个程序,一个进程Fork出2000个左右线程就会异常退出了,为什么?这个问题的产生是因为windows32位系统,一个进程所能使用的最大虚拟内存为2G,而一个线程的默认线程栈StackSize为1024K(1M),这样当线程数量逼近2000时,2000*1024K=2G(大约),内存资源就相当于耗尽

影响最大线程大小的因素

  • Java虚拟机本身
    • Xms 初始堆大小
    • Xmx 最大堆大小
    • Xss 每个线程的堆栈大小
  • 系统限制
    • /proc/sys/kernel/pid_max
    • /proc/sys/kernel/thread-max
    • max_user_process(ulimit -u)
    • /proc/sys/vm/max_map_count

其他突破线程问题

  • 多进程-启动多线程
  • 使用异步请求
    • 无论是使用多进程-多线程,还是异步请求,最主要的影响还是电脑本身的配置

下面是具体配置

Windows

httpd.exe -l 会看见 mpm_winnt.c windows默认执行 mpm_winnt_module方式 (暂未找到修改成其他方式的方法)

1.httpd.conf 文件去掉 Include conf/extra/httpd-mpm.conf 前面的#

2.修改extra/httpd-mpm.conf 最下面 或 查找 mpm_winnt_module 修改ThreadsPerChildMaxRequestsPerChild

1
2
3
4
5
6
7
#每个子进程建立的线程数
ThreadsPerChild 1 默认150

#指令设置每个子进程在其生存期内允许伺服的最大请求数量。
#到达MaxRequestsPerChild的限制后,子进程将会结束。
#如果MaxRequestsPerChild为"0",子进程将永远不会结束。
MaxRequestsPerChild 10 默认0

根据自己网站并发数量设置:

  • ThreadsPerChild 设置 网站平均在线人数
  • MaxRequestsPerChild 设置最高在线人数的值

Linux

  • ps -ef | grep httpd | wc -l 查看当前 httpd进程数

  • apachectl -l 会看见 prefork.c Linux默认执行 mpm_prefork_module

  • httpd.conf 文件去掉 Include conf/extra/httpd-mpm.conf 前面的#

  • 修改extra/httpd-mpm.conf 最上面 或 查找 mpm_prefork_module

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
StartServers          5 #默认启动线程数

#指令设置空闲子进程的最小数量。
#所谓空闲子进程是指没有正在处理请求的子进程。
#如果当前空闲子进程数少于MinSpareServers ,
#那么Apache将以最大每秒一个的速度产生新的子进程。
#只有在非常繁忙机器上才需要调整这个参数。将此参数设的太大通常是一个坏主意。
MinSpareServers 5 #

#指令设置空闲子进程的最大数量。
#所谓空闲子进程是指没有正在处理请求的子进程。
#如果当前有超过MaxSpareServers数量的空闲子进程,
#那么父进程将杀死多余的子进程。
#只有在非常繁忙机器上才需要调整这个参数。
#将此参数设的太大通常是一个坏主意。
#如果你将该指令的值设置为比MinSpareServers小,
#Apache将会自动将其修改成"MinSpareServers+1"。
MaxSpareServers 10 #
MaxClients 150 #apache可以同时处理的请求
MaxRequestsPerChild 0 #如windows MaxRequestsPerChild

其他参考

关于PC自动化

无论使用那种PC自动化工具,首先需要知道应用程序是win32 APIbackend「 win32 」,还是MS UI Automationbackend「 uia 」

WinAppDriver

  • 它支持 Appium,可以使用 Appium-Python-Client 依赖库完成对 Windows 桌面程序的自动化操作

  • 项目地址:https://github.com/Microsoft/WinAppDriver

  • 需要注意的是,要使用 WinAppDriver 服务框架完成 Windows 的自动化,需要满足 Windows10 或 Windows Server 2016 以上系统

    另外,它支持的应用程序包含:

    • UWP - Universal Windows Platform
    • WinForms - Windows Forms
    • WPF - Windows Presentation Foundation
    • Win32 - Classic Windows

应用检测

  • 打开Insights 下载定位工具,检测是否能定位到元素

image-20220927155045493

  • 正常定位到元素

image-20220927160349788

配置

  • 开启「 开发者模式 」

image-20220927160535821

下载WinAppDriver

  • 打开链接下载,截至到现在最新版本为:WindowsApplicationDriver-1.2.99-win-x86.exe
  • 运行WinAppDriver.exe
1
2
3
C:\Program Files (x86)\Windows Application Driver>WinAppDriver.exe 4727
Windows Application Driver listening for requests at: http://127.0.0.1:4727/
Press ENTER to exit.

代码测试

  • 安装selenium,不能用4的版本
1
pip install selenium==3.14.1
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

import unittest
from selenium import webdriver

class SimpleTests(unittest.TestCase):

@classmethod
def setUpClass(self):
desired_caps = {}
desired_caps['app'] = r"C:\Windows\System32\notepad.exe"
self.driver = webdriver.Remote(
command_executor='http://127.0.0.1:4727',
desired_capabilities=desired_caps)

@classmethod
def tearDownClass(self):
self.driver.quit()

def test_edit(self):
self.driver.find_element_by_name("文本编辑器").send_keys("helloword")


if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(SimpleTests)
unittest.TextTestRunner(verbosity=2).run(suite)

pywinauto

  • 主要使用到 Application 类,用于应用程序管理(打开与关闭应用等)、窗口管理(最小化、最大化、关闭窗口)

  • 同时支持控件操作和图像操作,支持Win32 APIMS UI Automation API

  • 安装

1
pip install pywinauto

应用程序

 确定应用程序的可访问技术,pywinauto的后端,支持控件的访问技术:

  • Win32 API(backend=”win32”)- 默认的backend,MFC,VB6,VCL。简单的WinForms控件和大多数旧的应用程序
  • MS UI Automation API ( backend = “ uia “ ):WinForms , WPF , Store apps , Qt5 , 浏览器

注:可以借助于GUI对象检查工具来确定程序到底适用于那种backend。eg:如果使用 inspect 的uia模式,可见的控件和属性更多的话,backend可选uia,反之,backend可选win32。

  • 常用的检查工具

    • Inspect(定位元素工具(uia))

    • Spy++ (定位元素工具(win32))

    • UI Spy (定位元素工具)

    • Swapy(可简单生成pywinauto代码)

定位元素

  • 下载Inspect后,定位桌面微信

image-20220930104310955

  • 用UISpy定位,需要下载.net 3.5

image-20220930105530130

image-20220930110003234

  • 用SPYXX.EXE定位,拖动到Finder Tool图标到微信上,识别到后点击OK

image-20220930110155030

  • 看到当期微信的句柄等信息

image-20220930110350697

  • 打开并启动app
1
2
3
from pywinauto.application import Application
app = Application(backend = "uia").start(r'C:\Program Files (x86)\Tencent\WeChat\WeChat.exe') # 启动微信

  • 连接app
1
2
3
4
5
app = Application(backend='uia').connect(process=8948)  # 进程号
app = Application().connect(handle=0x010f0c) # 句柄
app = Application().connect(path="D:\Office14\EXCEL.exe") # path
app = Application().connect(title_re=".*Notepad", class_name=“Notepad”) # 参数组合

注意:应用程序必须先准备就绪,才能使用connect(),当应用程序start()后没有超时和重连的机制。在pywinauto外再启动应用程序,需要sleep,等程序start。

常用方法

Application对象app的常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
app.top_window() # 返回应用程序当前顶部窗口,是WindowSpecification对象,可以继续使用对象的方法往下继续查找控件
# eg:如:app.top_window().child_window(title='地址和搜索栏', control_type='Edit')

app.window(**kwargs) # 根据筛选条件,返回一个窗口, 是WindowSpecification对象,可以继续适用对象的方法往下继续查找控件
# eg: 微信主界面 app.window(class_name='WeChatMainWndForPC')

app.windows(**kwargs) # 根据筛选条件返回一个窗口列表,无条件默认全部,列表项为wrapped(装饰器)对象,可以使用wrapped对象的方法,注意不是WindowSpecification对象
# eg:[<uiawrapper.UIAWrapper - '李渝的早报 - Google Chrome', Pane, -2064264099699444098>]

app.kill(soft=False) # 强制关闭
app.cpu_usage() # 返回指定秒数期间的CPU使用率百分比
app.wait_cpu_usage_lower(threshold=2.5, timeout=None, usage_interval=None) # 等待进程CPU使用率百分比小于指定的阈值threshold
app.is64bit() # 如果操作的进程是64-bit,返回True

控件的定位和可用方法

层级查找控件的方法:定位控件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 通过层级查找控件相关方法
window(**kwargs) # 用于窗口的查找
child_window(**kwargs) # 可以无视层级的找后代中某个符合条件的元素===>【最常用】
parent() # 返回此元素的父元素,没有参数
children(**kwargs) # 返回符合条件的子元素列表,支持索引,是BaseWrapper对象(或子类)
iter_children(**kwargs) # 返回子元素的迭代器,是BaseWrapper对象(或子类)
descendants(**kwargs) # 返回符合条件的所有后代元素列表,是BaseWrapper对象(或子类)
iter_children(**kwargs) # 符合条件后代元素迭代器,是BaseWrapper对象(或子类)---> 存疑,是iter_descendants?

** kwargs的参数

# 常用的
class_name=None, # 类名
class_name_re=None, # 正则匹配类名
title=None, # 控件的标题文字,对应inspect中Name字段
title_re=None, # 正则匹配文字
control_type=None, # 控件类型,inspect界面LocalizedControlType字段的英文名
best_match=None, # 模糊匹配类似的title
auto_id=None, # inspect界面AutomationId字段,但是很多控件没有这个属性

# 不常用
parent=None,
process=None,# 这个基本不用,每次启动进程都会变化
top_level_only=True,
visible_only=True,
enabled_only=False,
handle=None,
ctrl_index=None,
found_index=None,
predicate_func=None,
active_only=False,
control_id=None,
framework_id=None,
backend=None,
  • 控件可用的方法属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 以下几个只支持窗口模式的控件
dlg.close() # 关闭界面
dlg.minimize() # 最小化界面
dlg.maximize() # 最大化界面
dlg.restore() # 将窗口恢复为正常大小,比如最小化的让他正常显示在桌面
dlg.get_show_state() # 正常0,最大化1,最小化2
dlg.menu_select() # 菜单栏,eg:app.window.menu_select(Edit -> Replace)
dlg.exists(timeout=None, retry_interval=None) # 判断是否存在
#timeout:等待时间,一般默认5s
#retry_interval:timeout内重试时间
dlg.wait(wait_for, timeout=None, retry_interval=None) # 等待窗口处于特定状态
dlg.wait_not(wait_for_not, timeout=None, retry_interval=None) # 等待窗口不处于特定状态,即等待消失
# wait_for/wait_for_not:
# * 'exists' means that the window is a valid handle
# * 'visible' means that the window is not hidden
# * 'enabled' means that the window is not disabled
# * 'ready' means that the window is visible and enabled
# * 'active' means that the window is active
# timeout:等待多久
# retry_interval:timeout内重试时间
# eg: dlg.wait('ready')

# 鼠标键盘操作,只列举了常用形式,他们有很多默认参数但不常用,可以在源码中查看
ctrl.click_input() # 最常用的点击方法,一切点击操作的基本方法(底层调用只是参数不同),左键单击,使用时一般都使用默认不需要带参数
ctrl.right_click_input() # 鼠标右键单击
ctrl.type_keys(keys, pause = None, with_spaces = False,) # 键盘输入,底层还是调用keyboard.send_keys
# keys:要输入的文字内容
# pause:每输入一个字符后等待时间,默认0.01就行
# with_spaces:是否保留keys中的所有空格,默认去除0
ctrl.double_click_input(button ="left", coords = (None, None)) # 左键双击
ctrl.press_mouse_input(coords = (None, None)) # 指定坐标按下左键,不传坐标默认左上角
ctrl.release_mouse_input(coords = (None, None)) # 指定坐标释放左键,不传坐标默认左上角
ctrl.move_mouse_input(coords=(0, 0)) # 将鼠标移动到指定坐标,不传坐标默认左上角
ctrl.drag_mouse_input(dst=(0, 0)) # 将ctrl拖动到dst,是press-move-release操作集合

# 控件的常用属性
ctrl.children_texts() # 所有子控件的文字列表,对应inspect中Name字段
ctrl.window_text() # 控件的标题文字,对应inspect中Name字段
# ctrl.element_info.name
ctrl.class_name() # 控件的类名,对应inspect中ClassName字段,有些控件没有类名
# ctrl.element_info.class_name
ctrl.element_info.control_type # 控件类型,inspect界面LocalizedControlType字段的英文名
ctrl.is_child(parent) # ctrl是否是parent的子控件
ctrl.legacy_properties().get('Value') # 可以获取inspect界面LegacyIAccessible开头的一系列字段,在源码uiawraper.py中找到了这个方法,非常有用

# 控件常用操作
ctrl.draw_outline(colour='green') # 空间外围画框,便于查看,支持'red', 'green', 'blue'
ctrl.print_control_identifiers(depth=None, filename=None) # 以树形结构打印其包含的元素,详见打印元素
# depth:打印的深度,缺省时打印最大深度。
# filename:将返回的标识存成文件(生成的文件与当前运行的脚本在同一个路径下)
ctrl.scroll(direction, amount, count=1,) # 滚动
# direction :"up", "down", "left", "right"
# amount:"line" or "page"
# count:int 滚动次数
ctrl.capture_as_image() # 返回控件的 PIL image对象,可继续使用其方法如下:
# eg: ctrl.capture_as_image().save(img_path)
ret = ctrl.rectangle() # 控件上下左右坐标,(L430, T177, R1490, B941),可输出上下左右
# eg: ret.top=177
# ret.bottom=941
# ret.left=430
# ret.right=1490

实战

  • 用UISpy定位,记事本

不用微信演示,经过测试微信应该做了类似的屏蔽,很多内容获取不到,比如用打印当前窗口的所有controller(控件和属性)win_main_Dialog.print_control_identifiers(depth=None, filename=None),报错了

image-20220930153132566

  • 测试代码
1
2
3
4
5
6
from pywinauto.application import Application
# 启动记事本
app = Application(backend = "uia").start(r'notepad.exe')
# 找到记事本主窗口
win = app.window(title_re=".*记事本*.")
win.child_window(class_name="Edit").type_keys("hello")
  • 还有一种方法是可以打印主窗口控件列表,然后你可以通过控件操作这个对象
1
2
3
4
5
6
7
8
from pywinauto.application import Application
# 启动记事本
app = Application(backend = "uia").start(r'notepad.exe')
# 找到记事本主窗口
win = app.window(title_re=".*记事本*.")
print('-----------------')
# 打印主窗口控件列表,然后你可以通过控件id操作这个对象
win.print_control_identifiers()
  • 打印的值的格式大概如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Control Identifiers:

Dialog - '无标题 - 记事本' (L267, T123, R1707, B794)
['无标题 - 记事本Dialog', 'Dialog', '无标题 - 记事本']
child_window(title="无标题 - 记事本", control_type="Window")
|
| Edit - '文本编辑器' (L278, T198, R1696, B749)
| ['', 'Edit', '0', '1']
| child_window(title="文本编辑器", auto_id="15", control_type="Edit")
| |
| ...
|
| Menu - '应用程序' (L278, T168, R1696, B197)
| ['应用程序Menu', '应用程序', 'Menu2']
| child_window(title="应用程序", auto_id="MenuBar", control_type="MenuBar")
| |
| | MenuItem - '文件(F)' (L278, T168, R350, B197)
| | ['文件(F)', 'MenuItem2', '文件(F)MenuItem']
| | child_window(title="文件(F)", control_type="MenuItem")
| |
| | MenuItem - '编辑(E)' (L350, T168, R422, B197)
| | ['编辑(E)MenuItem', 'MenuItem3', '编辑(E)']
| | child_window(title="编辑(E)", control_type="MenuItem")
| |
| | MenuItem - '格式(O)' (L422, T168, R499, B197)
| | ['MenuItem4', '格式(O)MenuItem', '格式(O)']
| | child_window(title="格式(O)", control_type="MenuItem")
| |
| | MenuItem - '查看(V)' (L499, T168, R573, B197)
| | ['MenuItem5', '查看(V)MenuItem', '查看(V)']
| | child_window(title="查看(V)", control_type="MenuItem")
| |
| | MenuItem - '帮助(H)' (L573, T168, R649, B197)
| | ['帮助(H)', '帮助(H)MenuItem', 'MenuItem6']
| | child_window(title="帮助(H)", control_type="MenuItem")
  • 调整后的代码为
1
2
3
4
5
6
7
8
9
10
11
12
from pywinauto.application import Application
# 启动记事本
app = Application(backend = "uia").start(r'notepad.exe')
# 找到记事本主窗口
win = app.window(title_re=".*记事本*.")
print('-----------------')
# 打印主窗口控件列表
win.print_control_identifiers()
# win.child_window(class_name="Edit").type_keys("hello")
# 操作对应窗口下面的组件
win.Edit.type_keys('hello')
win['Edit'].type_keys('world')

image-20220930154435316

  • 如果打印全部控件内容太多,也可以打印部分
1
2
3
4
5
6
7
8
from pywinauto.application import Application
# 启动记事本
app = Application(backend = "uia").start(r'notepad.exe')
# 找到记事本主窗口
win = app.window(title_re=".*记事本*.")
print('-----------------')
dlg = win['Edit']
dlg.print_control_identifiers()
  • 对应打印的内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
dit - '文本编辑器'    (L781, T200, R2199, B751)
['', 'Edit']
child_window(title="文本编辑器", auto_id="15", control_type="Edit")
|
| ScrollBar - '垂直滚动条' (L2173, T200, R2199, B751)
| ['垂直滚动条', '垂直滚动条ScrollBar', 'ScrollBar']
| child_window(title="垂直滚动条", auto_id="NonClientVerticalScrollBar", control_type="ScrollBar")
| |
| | Button - '上一行' (L2173, T200, R2199, B226)
| | ['上一行Button', 'Button', '上一行', 'Button0', 'Button1']
| | child_window(title="上一行", auto_id="UpButton", control_type="Button")
| |
| | Button - '下一行' (L2173, T725, R2199, B751)
| | ['下一行', '下一行Button', 'Button2']
| | child_window(title="下一行", auto_id="DownButton", control_type="Button")
  • 两者结合使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time

from pywinauto.application import Application
# 启动记事本
app = Application(backend = "uia").start(r'notepad.exe')
# 找到记事本主窗口
win = app.window(title_re=".*记事本*.")
print('-----------------')
# 打印主窗口控件列表
# win.print_control_identifiers()
# 记事本中输入内容
win.child_window(class_name="Edit").type_keys("hello")
# 选择菜单另存为
win.menu_select("文件->另存为")
time.sleep(2)
# 定位另存为窗口的输入框,并且输入内容;注意这里有三层,一层层往下面找
win.child_window(class_name='DUIViewWndClassName', control_type='Pane').\
child_window(class_name='AppControlHost',control_type='ComboBox', auto_id='FileNameControlHost').\
child_window(class_name='Edit').type_keys('hello')
# 点击保存
win[u"保存"].click_input()

image-20220930165315916

  • 优化下代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time

from pywinauto.application import Application

def driver():
app = Application(backend="uia").start(r"notepad.exe")
return app

def test():
# 启动记事本
app = driver()
# 找到记事本主窗口
win = app.window(title_re=".*记事本*.")
print('-----------------')
# 打印主窗口控件列表
# win.print_control_identifiers()
# 记事本中输入内容
win.Edit.type_keys("hello")
# 选择菜单另存为
win.menu_select("文件->另存为")
time.sleep(2)
# 定位另存为窗口的输入框,并且输入内容;注意这里有三层,一层层往下面找
win.child_window(class_name='DUIViewWndClassName', control_type='Pane').\
child_window(class_name='AppControlHost',control_type='ComboBox', auto_id='FileNameControlHost').\
child_window(class_name='Edit').type_keys('hello')
# 点击保存
win[u"保存"].click_input()
time.sleep(2)
# 关闭窗口
win.close()

test()

pywin32

  • pywin32 包含 win32gui、win32api、win32con 3个子模块,主要用于窗口管理(定位窗口、显示和关闭窗口、窗口前置、窗口聚焦、获取窗口位置等),通常用的较多的是 win32gui,因此本文仅对此子模块进行介绍
  • 安装
1
pip install pywin32
  • 常用的事件
1
2
3
4
5
6
7
8
9
10
11
12
#查找窗口句柄
win32gui.FindWindow()
#查找指定窗口的菜单
win32gui.GetMenu()
#查找某个菜单的子菜单
win32gui.GetSubMenu()
#获得子菜单的ID
win32gui.GetMenuItemID()
#获得某个子菜单的内容
win32gui.GetMenuItemInfo()
#给句柄发送通知(点击事件)
win32gui.PostMessage() 
  • 实例代码打开记事本,点击文件另存为
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import win32gui, win32con, win32api
import time

# 打开记事本
win32api.ShellExecute(0,"open","notepad.exe","","",1)

# 等待记事本窗口打开
time.sleep(1)

# 寻找记事本窗口
notepad = win32gui.FindWindow("Notepad", None)

# 若为非 0 的值,代表有找到窗口
if (notepad != 0):

# 设置记事本标题
win32gui.SendMessage(notepad, win32con.WM_SETTEXT, None, "Hello")

# 寻找输入文字的地方
edit = win32gui.FindWindowEx(notepad, None, "Edit", None)

# 输入文字
win32gui.SendMessage(edit, win32con.WM_SETTEXT, None, "您好!Pythonn")
# 插入一个回车
win32gui.PostMessage(edit, win32con.WM_KEYDOWN, win32con.VK_RETURN, 0)

menu_handle = win32gui.GetMenu(notepad)
# 这里的代码分块逻辑,搞不明白
for index in [0,3]:
cmd_ID = win32gui.GetMenuItemID(menu_handle, 3)
menu_handle = win32gui.GetSubMenu(menu_handle, 0)

# 点击另存为
win32gui.PostMessage(notepad, win32con.WM_COMMAND, cmd_ID, 0)

放弃对这个模块的研究,写用例非常累

pyautogui

  • pyautogui 模块主要用于屏幕控制(获取屏幕尺寸、截屏等)、鼠标控制(移动鼠标、单击、双击、右击、拖拽、滚动等)、键盘控制(编辑、按键等)
  • 主要是图片识别

几个特有功能

  • 移动鼠标并在其他应用程序的窗口中单击
  • 向应用程序发送击键(例如,填写表单)。
  • 截取屏幕截图,并给出一个图像(例如,按钮或复选框的图像),然后在屏幕上找到它。
  • 找到应用程序的窗口,然后移动、调整大小、最大化、最小化或关闭它(目前仅限 Windows)。
  • 显示警报和消息框。

前置参数

1
2
3
4
5
import pyautogui 
# 停顿功能
pyautogui.PAUSE = 1 # 调用在执行动作后暂停的秒数,只能在执行一些pyautogui动作后才能使用,建议用time.sleep
# 自动 防故障功能
pyautogui.FAILSAFE = True # 启用自动防故障功能,左上角的坐标为(0,0),将鼠标移到屏幕的左上角,来抛出failSafeException异常

鼠标操作

获取屏幕的宽度和高度

1
2
width, height = pyautogui.size() # 获取屏幕的宽度和高度
print(width, height)

获取鼠标当前位置

1
2
currentMouseX, currentMouseY = pyautogui.position() # 鼠标当前位置
print(currentMouseX, currentMouseY)

鼠标移动类操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# pyautogui.moveTo(x,y,持续时间) 在持续时间内 将光标移动到目标位置(x,y)
pyautogui.moveTo(100, 100, duration=0.25) # 移动到 (100,100)

#pyautogui.moveRel(xOffset,yxOffset,持续时间) 在持续时间内 将光标偏移 距离原始位置 xOffset,yxOffset 的位置
pyautogui.moveRel(50, 0, duration=0.25) # 从当前位置右移50像素

# 实现拖拽效果
pyautogui.mouseDown(740,73) #鼠标按下指定位置
pyautogui.moveRel(100,0,2) #移动/可以使用其他移动方法
pyautogui.mouseUp() # 鼠标抬起
#或者
pyautogui.dragTo(100,300,duration=1)
#或者
pyautogui.dragRel(100,300,duration=4)

鼠标滚动类操作

1
2
3
4
5
6
7
# scroll函数控制鼠标滚轮的滚动,amount_to_scroll参数表示滚动的格数。正数则页面向上滚动,负数则向下滚动
# pyautogui.scroll(clicks=amount_to_scroll, x=moveToX, y=moveToY)
# 默认从当前光标位置进行滑动 amount_to_scroll是个数字 数字太小效果可能不明显, 正数表示往上划 负数表示往下化
pyautogui.scroll(500, 20, 2)
pyautogui.scroll(100) # 向上滚动100格
pyautogui.scroll(-100) # 向下滚动100格
pyautogui.scroll(100, x=100, y=100) # 移动到(100, 100)位置再向上滚动100格

鼠标点击类操作

1
2
3
4
5
6
7
8
9
# pyautogui.click(x,y,clicks=点击次数,interval=每次点击间隔频率,button=可以是left表示左击 可以是right表示右击 可以是middle表示中击)
pyautogui.click(10, 20, 2, 0.25, button='left')
pyautogui.click(x=100, y=200, duration=2) # 先移动到(100, 200)再单击
pyautogui.click() # 鼠标当前位置点击一下
pyautogui.doubleClick() # 鼠标当前位置左击两下
pyautogui.doubleClick(x=100, y=150, button="left") # 鼠标在(100,150)位置左击两下
pyautogui.tripleClick() # 鼠标当前位置左击三下
pyautogui.rightClick(10,10) # 指定位置,双击右键
pyautogui.middleClick(10,10) # 指定位置,双击中键

键盘操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 相关操作
# pyautogui.typewrite(要输入的字符只能是英文,interval=输入每个字符的间隔频率)
pyautogui.typewrite('python', 1)
# typewrite 还可以传入单字母的列表
# 运行下面代码,编辑器里面就会输出 python 之后换行。
pyautogui.typewrite(['p','y','t','h','o','n','enter'])

# pyautogui.keyDown():模拟按键按下
# pyautogui.keyUP():模拟按键松开
# pyautogui.press(键盘按键字母) 模拟一次按键过程,即 keyDown 和 keyUP 的组合 按下指定的键盘按键
# pyautogui.hotkey("ctrl","a") 实现组合键功能

# 按住 shift 按键,然后再按住 1 按键,就可以了。用 pyautogui 控制就是
pyautogui.keyDown('shift')
pyautogui.press('1')
pyautogui.keyUp('shift')


# 输入中文字符的方法 借用 pyperclip模块
import pyperclip
pyperclip.copy("要书写的字符串") #复制字符串
time.sleep(2)
pyautogui.hotkey("ctrl","v") #实现复制


# pyautogui.KEYBOARD_KEYS数组中就是press(),keyDown(),keyUp()和hotkey()函数可以输入的按键名称
pyautogui.KEYBOARD_KEYS = ['\t', '\n', '\r', ' ', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.',
'/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '<', '=', '>', '?', '@',
'[', '\\', ']', '^', '_', '`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '{', '|', '}', '~',
'accept', 'add', 'alt', 'altleft', 'altright', 'apps', 'backspace', 'browserback',
'browserfavorites', 'browserforward', 'browserhome', 'browserrefresh', 'browsersearch',
'browserstop', 'capslock', 'clear', 'convert', 'ctrl', 'ctrlleft', 'ctrlright', 'decimal',
'del', 'delete', 'divide', 'down', 'end', 'enter', 'esc', 'escape', 'execute', 'f1', 'f10',
'f11', 'f12', 'f13', 'f14', 'f15', 'f16', 'f17', 'f18', 'f19', 'f2', 'f20', 'f21', 'f22',
'f23', 'f24', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'f9', 'final', 'fn', 'hanguel', 'hangul',
'hanja', 'help', 'home', 'insert', 'junja', 'kana', 'kanji', 'launchapp1', 'launchapp2',
'launchmail', 'launchmediaselect', 'left', 'modechange', 'multiply', 'nexttrack',
'nonconvert', 'num0', 'num1', 'num2', 'num3', 'num4', 'num5', 'num6', 'num7', 'num8', 'num9',
'numlock', 'pagedown', 'pageup', 'pause', 'pgdn', 'pgup', 'playpause', 'prevtrack', 'print',
'printscreen', 'prntscrn', 'prtsc', 'prtscr', 'return', 'right', 'scrolllock', 'select',
'separator', 'shift', 'shiftleft', 'shiftright', 'sleep', 'space', 'stop', 'subtract', 'tab',
'up', 'volumedown', 'volumemute', 'volumeup', 'win', 'winleft', 'winright', 'yen', 'command',
'option', 'optionleft', 'optionright']

弹窗操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pyautogui

# 显示一个简单的带文字和OK按钮的消息弹窗。用户点击后返回button的文字。
pyautogui.alert(text='', title='', button='OK')
b = pyautogui.alert(text='要开始程序么?', title='请求框', button='OK')
print(b) # 输出结果为OK

# 显示一个简单的带文字、OK和Cancel按钮的消息弹窗,用户点击后返回被点击button的文字,支持自定义数字、文字的列表。
pyautogui.confirm(text='', title='', buttons=['OK', 'Cancel']) # OK和Cancel按钮的消息弹窗
pyautogui.confirm(text='', title='', buttons=range(10)) # 10个按键0-9的消息弹窗
a = pyautogui.confirm(text='', title='', buttons=range(10))
print(a) # 输出结果为你选的数字

# 可以输入的消息弹窗,带OK和Cancel按钮。用户点击OK按钮返回输入的文字,点击Cancel按钮返回None。
pyautogui.prompt(text='', title='', default='')

# 样式同prompt(),用于输入密码,消息用*表示。带OK和Cancel按钮。用户点击OK按钮返回输入的文字,点击Cancel按钮返回None。
pyautogui.password(text='', title='', default='', mask='*')

图像操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pyautogui
im = pyautogui.screenshot() # 返回屏幕的截图,是一个Pillow的image对象
im.save('屏幕截图.png') #保存图片
# 或者
im = pyautogui.screenshot('屏幕截图.png') # 截全屏并设置保存图片的位置和名称
print(im) # 打印图片的属性

# 不截全屏,截取区域图片。截取区域region参数为:左上角XY坐标值、宽度和高度
pyautogui.screenshot('屏幕截图.png', region=(0, 0, 300, 400))


# 获得文件图片在现在的屏幕上面的坐标,返回的是一个元组(top, left, width, height)
# 如果截图没找到,pyautogui.locateOnScreen()函数返回None
a = pyautogui.locateOnScreen(r'目标图片路径')
print(a) # 打印结果为Box(left=0, top=0, width=300, height=400)
x, y = pyautogui.center(a) # 获得文件图片在现在的屏幕上面的中心坐标
print(x, y) # 打印结果为150 200
# 或者
x, y = pyautogui.locateCenterOnScreen(r'目标图片路径') # 这步与上面的四行代码作用一样
print(x, y) # 打印结果为150 200

# 匹配屏幕所有与目标图片的对象,可以用for循环和list()输出
for pos in pyautogui.locateAllOnScreen(r'C:\Users\ZDH\Desktop\PY\region_screenshot.png'):
print(pos)
# 打印结果为Box(left=0, top=0, width=300, height=400)
a = list(pyautogui.locateAllOnScreen(r'C:\Users\ZDH\Desktop\PY\region_screenshot.png'))
print(a) # 打印结果为[Box(left=0, top=0, width=300, height=400)]

分辨率适配

注意:pyautogui的图像识别是模板匹配算法 无法跨分辨率识别(图片放大缩小就无法识别) 提供以下图像识别算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# -*- coding: utf-8 -*-
"""
使用需求:
需要安装 airtest pip install airtest -i https://mirrors.aliyun.com/pypi/simple/
运行时如果出现以下错误:
import win32api
ImportError: DLL load failed: 找不到指定的程序。
重新安装win32api版本
pip install pywin32==227 # 安装 227版本
不行的话再试试
pip install pywin32==223 # 安装 223版本

"""


import sys
import types
from copy import deepcopy
from airtest import aircv
from airtest.aircv import cv2
from airtest.aircv.template_matching import TemplateMatching
from airtest.core.cv import MATCHING_METHODS, Predictor
from airtest.core.error import InvalidMatchingMethodError
from airtest.core.helper import logwrap, G
from airtest.core.win.screen import screenshot
from airtest.utils.transform import TargetPos
from six import PY3
from airtest.core.settings import Settings as ST # noqa

# # -*- encoding=utf8 -*-
import logging
logger = logging.getLogger("airtest")
logger.setLevel(logging.ERROR)
# 日志级别有[DEBUG]、[INFO]、[WARNING] 和 [ERROR]

class Template(object):
"""
picture as touch/swipe/wait/exists target and extra info for cv match
filename: pic filename
target_pos: ret which pos in the pic
record_pos: pos in screen when recording
resolution: screen resolution when recording
rgb: 识别结果是否使用rgb三通道进行校验.
scale_max: 多尺度模板匹配最大范围.
scale_step: 多尺度模板匹配搜索步长.
"""

def __init__(self, filename, threshold=None, target_pos=TargetPos.MID, record_pos=None, resolution=(), rgb=False, scale_max=800, scale_step=0.005):
self.filename = filename
# self.filename =os.path.join(Settings.Picture_Path,filename)
self._filepath = None
self.threshold = threshold or ST.THRESHOLD
self.target_pos = target_pos
self.record_pos = record_pos
self.resolution = resolution
self.rgb = rgb
self.scale_max = scale_max
self.scale_step = scale_step

@property
def filepath(self):

return self.filename

def __repr__(self):
filepath = self.filepath if PY3 else self.filepath.encode(sys.getfilesystemencoding())
return "Template(%s)" % filepath

def match_in(self, screen):
match_result = self._cv_match(screen)
G.LOGGING.debug("match result: %s", match_result)
if not match_result:
return None
focus_pos = TargetPos().getXY(match_result, self.target_pos)
return focus_pos

def match_all_in(self, screen):
image = self._imread()
image = self._resize_image(image, screen, ST.RESIZE_METHOD)
return self._find_all_template(image, screen)

@logwrap
def _cv_match(self, screen):
# in case image file not exist in current directory:
ori_image = self._imread()
image = self._resize_image(ori_image, screen, ST.RESIZE_METHOD)
ret = None
for method in ST.CVSTRATEGY:
# get function definition and execute:
func = MATCHING_METHODS.get(method, None)
if func is None:
raise InvalidMatchingMethodError("Undefined method in CVSTRATEGY: '%s', try 'kaze'/'brisk'/'akaze'/'orb'/'surf'/'sift'/'brief' instead." % method)
else:
if method in ["mstpl", "gmstpl"]:
ret = self._try_match(func, ori_image, screen, threshold=self.threshold, rgb=self.rgb, record_pos=self.record_pos,
resolution=self.resolution, scale_max=self.scale_max, scale_step=self.scale_step)
else:
ret = self._try_match(func, image, screen, threshold=self.threshold, rgb=self.rgb)
if ret:
break
return ret

@staticmethod
def _try_match(func, *args, **kwargs):
G.LOGGING.debug("try match with %s" % func.__name__)
try:
ret = func(*args, **kwargs).find_best_result()
except aircv.NoModuleError as err:
G.LOGGING.warning("'surf'/'sift'/'brief' is in opencv-contrib module. You can use 'tpl'/'kaze'/'brisk'/'akaze'/'orb' in CVSTRATEGY, or reinstall opencv with the contrib module.")
return None
except aircv.BaseError as err:
G.LOGGING.debug(repr(err))
return None
else:
return ret

def _imread(self):
return aircv.imread(self.filepath)

def _find_all_template(self, image, screen):
return TemplateMatching(image, screen, threshold=self.threshold, rgb=self.rgb).find_all_results()

def _find_keypoint_result_in_predict_area(self, func, image, screen):
if not self.record_pos:
return None
# calc predict area in screen
image_wh, screen_resolution = aircv.get_resolution(image), aircv.get_resolution(screen)
xmin, ymin, xmax, ymax = Predictor.get_predict_area(self.record_pos, image_wh, self.resolution, screen_resolution)
# crop predict image from screen
predict_area = aircv.crop_image(screen, (xmin, ymin, xmax, ymax))
if not predict_area.any():
return None
# keypoint matching in predicted area:
ret_in_area = func(image, predict_area, threshold=self.threshold, rgb=self.rgb)
# calc cv ret if found
if not ret_in_area:
return None
ret = deepcopy(ret_in_area)
if "rectangle" in ret:
for idx, item in enumerate(ret["rectangle"]):
ret["rectangle"][idx] = (item[0] + xmin, item[1] + ymin)
ret["result"] = (ret_in_area["result"][0] + xmin, ret_in_area["result"][1] + ymin)
return ret

def _resize_image(self, image, screen, resize_method):
"""模板匹配中,将输入的截图适配成 等待模板匹配的截图."""
# 未记录录制分辨率,跳过
if not self.resolution:
return image
screen_resolution = aircv.get_resolution(screen)
# 如果分辨率一致,则不需要进行im_search的适配:
if tuple(self.resolution) == tuple(screen_resolution) or resize_method is None:
return image
if isinstance(resize_method, types.MethodType):
resize_method = resize_method.__func__
# 分辨率不一致则进行适配,默认使用cocos_min_strategy:
h, w = image.shape[:2]
w_re, h_re = resize_method(w, h, self.resolution, screen_resolution)
# 确保w_re和h_re > 0, 至少有1个像素:
w_re, h_re = max(1, w_re), max(1, h_re)
# 调试代码: 输出调试信息.
G.LOGGING.debug("resize: (%s, %s)->(%s, %s), resolution: %s=>%s" % (
w, h, w_re, h_re, self.resolution, screen_resolution))
# 进行图片缩放:
image = cv2.resize(image, (w_re, h_re))
return image

if __name__ == '__main__':
"""
用法:
res = Template(目标图片路径,threshold=匹配阈值,target_pos=可以是123456789 分别对应图片的九个点).match_in(screenshot(None))
"""
res = Template("pppp.png",threshold=0.8,target_pos=5).match_in(screenshot(None))
print(res)

实战

  • 安装
1
pip install PyAutoGUI
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time
import pyautogui
import os

# 打开记事本
os.popen("notepad.exe")
time.sleep(2)
# 查找图片
coords = pyautogui.locateOnScreen('1.png')
# 得到图片的中心坐标
pos = pyautogui.center(coords)
# 移动鼠标
pyautogui.moveTo(pos, duration=0.5)
# 点击
pyautogui.click()
# 输入内容
pyautogui.typewrite('I like Python.')
  • 查找的1.png图片如下

image-20221008161852609

  • 测试结果

image-20221008161814486

sikulix

  • 基于图像识别的开源工具,支持:

    • Windows XP, 7, 8 and 10 (development on Windows 10)

    • Mac OSX 10.10 and later (development on macOS 10.15)

    • Linux/Unix systems depending on the availability of the prerequisites

  • 有自己的IDE

环境搭建

  • java的环境
1
2
3
4
C:\Users\Administrator>java --version
java 17.0.2 2022-01-18 LTS
Java(TM) SE Runtime Environment (build 17.0.2+8-LTS-86)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.2+8-LTS-86, mixed mode, sharing)
  • 打开官网地址下载sikuli的jar包,现在最新版本为sikulixide-2.0.5-win.jar

  • 运行jar,打开ide

1
2
3
java -jar d:\sikulixide-2.0.5-win.jar

java -jar d:\sikulixide-2.0.5-win.jar -v -c

实战

  • 打开ide后,写入测试的代码,运行正常,发现一个问题左侧并没有显示常用的api(包括使用快捷键ctrl+t都无反应)

image-20221008184205087

基于python使用sikuli图像识别

  • 由于Python不能直接调用Java的方法,需要借助一些第三方的库比如Jython、Jpype、Pyjnius等
  • 安装依赖包
1
2
3
4
5
6
7
C:\Users\Administrator>pip install JPype1
Collecting JPype1
Downloading JPype1-1.4.0-cp37-cp37m-win_amd64.whl (343 kB)
---------------------------------------- 343.9/343.9 kB 484.9 kB/s eta 0:00:00
Requirement already satisfied: typing-extensions in d:\app\python37\lib\site-packages (from JPype1) (3.7.4.3)
Installing collected packages: JPype1
Successfully installed JPype1-1.4.0
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# coding=utf-8
import time

from jpype import *
import jpype
# 启动java虚拟机,调用sikulixide
if not jpype.isJVMStarted():
jpype.startJVM(jpype.getDefaultJVMPath(),'-ea',r'-Djava.class.path=D:\sikulixide-2.0.5-win.jar')
# 调用java的方法
java.lang.System.out.println("hello world")
# 获取对应的类,对应api:https://sikulix-2014.readthedocs.io/en/latest/appclass.html#App
app =JClass('org.sikuli.script.App')
app.open(r"c:\windows\System32\notepad.exe")
# 对应api https://sikulix-2014.readthedocs.io/en/latest/screen.html#Screen
Screen = JClass('org.sikuli.script.Screen')
screen=Screen()
time.sleep(2)
screen.click(r"D:\project\pc-auto\1.png")
screen.type("hello")

jpype.shutdownJVM()
  • Python调用jar包需要借助于jvm虚拟机,然后需要指定sikulixapi的安装路径,然后将jvm的路径添加到系统的path中(否则会提示jvm无法启动),然后在下方声明一个screen,app类,都是api中的类,然后就可以根据上方的一些方法进行调用

总结

  • 如果使用sikuli和PyAutoGUI,最好配合cv2(opencv)进行不同分辨率适配(这里没有测试)

  • 也有人推荐使用Ranorex ,不过这个是收费的

  • 当然还有一些其他场景没有测试,比如嵌入浏览器的场景

说明

90%的单词都是词根+词缀组成,如下:

image-20220913094440926

秒记词根

元音互换,改变了词性

image-20220913094704642

拼音元音互换

image-20220913095347765

U=V=W

image-20220913100636138

image-20220913100725118

b=p=m=f=v

image-20220913100904732

g=k(c)=h

image-20220913101442142

m=n=l=r

image-20220913101714851

image-20220913101827379

d=t=S(c)=th

dt互换,S(c)和th互换

image-20220913102039924

image-20220913102243220

image-20220913102319133

英文元音互换

  • 英语中有26个字母,其中5个元音字母和21个辅音字母。5个元音字母分别为:a[ei]、e[i:]、i[ ai]、o[eu]、u[ju:]。元音发音时声带震动、气流通过口腔不受阻碍

  • 词源不变,语音改变

image-20220913095257600

image-20220913100438436

无词根

image-20220913102729501

image-20220913102822900

image-20220913102948406

其他

说明

  • 本次搭建的为在win10下搭建httprunner

步骤

  • 通过github下载最新的版本,进行编译,我现在下载的版本为:hrp-v4.2.0-windows-amd64.tar

  • 配置环境变量PATH中的hrp的exe路径,D:\app\hrp-v4.2.0-windows-amd64

  • 检查配置环境,打开cmd输入:hrp -h

1
2
3
4
5
6
7
8
9
C:\Users\Administrator>hrp -h

License: Apache-2.0
Website: https://httprunner.com
Github: https://github.com/httprunner/httprunner
Copyright 2017 debugtalk

Usage:
hrp [command]
  • 创建项目,选择py插件
1
2
3
4
5
D:\project>hrp startproject demo --py
...
1:23AM INF python package is ready name=httprunner version=v4.2.0
11:23AM INF set python3 executable path Python3Executable="C:\\Users\\Administrator\\.hrp\\venv\\Scripts\\python.exe"
11:23AM INF create scaffold success projectName=demo

本地python版本为3.7

hrp startproject demo –go 选择go插件

代码分析

  • 查看项目代码结构如下:

image-20220905153421725

har

用 Charles 等抓包工具,以及 Chrome 等浏览器均可以导出 HAR 格式的请求文件,查看这里

reports

存放测试报告的目录

testcases

放测试用例的目录,支持json和yml,看下demo.json的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
{
"config": {
"name": "demo with complex mechanisms",
"base_url": "https://postman-echo.com",
"variables": {
"a": "${sum(10, 2.3)}",
"b": 3.45,
"n": "${sum_ints(1, 2, 2)}",
"varFoo1": "${gen_random_string($n)}",
"varFoo2": "${max($a, $b)}"
}
},
"teststeps": [
{
"name": "transaction 1 start",
"transaction": {
"name": "tran1",
"type": "start"
}
},
{
"name": "get with params",
"request": {
"method": "GET",
"url": "/get",
"params": {
"foo1": "$varFoo1",
"foo2": "$varFoo2"
},
"headers": {
"User-Agent": "HttpRunnerPlus"
}
},
"variables": {
"b": 34.5,
"n": 3,
"name": "get with params",
"varFoo2": "${max($a, $b)}"
},
"setup_hooks": [
"${setup_hook_example($name)}"
],
"teardown_hooks": [
"${teardown_hook_example($name)}"
],
"extract": {
"varFoo1": "body.args.foo1"
},
"validate": [
{
"check": "status_code",
"assert": "equals",
"expect": 200,
"msg": "check response status code"
},
{
"check": "headers.\"Content-Type\"",
"assert": "startswith",
"expect": "application/json"
},
{
"check": "body.args.foo1",
"assert": "length_equals",
"expect": 5,
"msg": "check args foo1"
},
{
"check": "$varFoo1",
"assert": "length_equals",
"expect": 5,
"msg": "check args foo1"
},
{
"check": "body.args.foo2",
"assert": "equals",
"expect": "34.5",
"msg": "check args foo2"
}
]
},
{
"name": "transaction 1 end",
"transaction": {
"name": "tran1",
"type": "end"
}
},
{
"name": "post json data",
"request": {
"method": "POST",
"url": "/post",
"body": {
"foo1": "$varFoo1",
"foo2": "${max($a, $b)}"
}
},
"validate": [
{
"check": "status_code",
"assert": "equals",
"expect": 200,
"msg": "check status code"
},
{
"check": "body.json.foo1",
"assert": "length_equals",
"expect": 5,
"msg": "check args foo1"
},
{
"check": "body.json.foo2",
"assert": "equals",
"expect": 12.3,
"msg": "check args foo2"
}
]
},
{
"name": "post form data",
"request": {
"method": "POST",
"url": "/post",
"headers": {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
},
"body": {
"foo1": "$varFoo1",
"foo2": "${max($a, $b)}",
"time": "${get_timestamp()}"
}
},
"extract": {
"varTime": "body.form.time"
},
"validate": [
{
"check": "status_code",
"assert": "equals",
"expect": 200,
"msg": "check status code"
},
{
"check": "body.form.foo1",
"assert": "length_equals",
"expect": 5,
"msg": "check args foo1"
},
{
"check": "body.form.foo2",
"assert": "equals",
"expect": "12.3",
"msg": "check args foo2"
}
]
},
{
"name": "get with timestamp",
"request": {
"method": "GET",
"url": "/get",
"params": {
"time": "$varTime"
}
},
"validate": [
{
"check": "body.args.time",
"assert": "length_equals",
"expect": 13,
"msg": "check extracted var timestamp"
}
]
}
]
}
  • 在json里面引用的函数如{sum(10, 2.3)},需要在debugtalk.py自定义,但是max应该为内置函数,不用自定义编写
  • 事务代码说明
1
2
3
4
type Transaction struct {
Name string `json:"name" yaml:"name"` // 事务名称,可定义为任意字符串
Type transactionType `json:"type" yaml:"type"` // 事务类型,仅包括 2 种类型,start(事务开始)和 end(结束事务)
}
  • 在测试用例中,transaction 应该成对出现,即必须同时定义 start 和 end;如果存在配对缺失的情况,会按照如下逻辑进行处理:
    • 仅设置开始事务,则会在测试用例最后一个测试步后添加结束事务
    • 仅设置结束事务,则会在测试用例第一个测试步前添加开始事务
  • 更多的手工编写用例介绍

其他文件

  • .env 配置环境变量信息,如登录的信息
  • debugtalk.py 自定义函数
  • proj.json 项目的信息

运行用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
D:\project\demo>hrp run testcases\demo.json --gen-html-report

...
==================== response ====================
Connected via TLSv1.2
HTTP/1.1 200 OK
Content-Length: 406
Connection: keep-alive
Content-Type: application/json; charset=utf-8
Date: Mon, 05 Sep 2022 08:08:52 GMT
Etag: W/"196-F4bYgnayuG8mswnHB8kP+Hk/6J4"
Set-Cookie: sails.sid=s%3AF7CgL7_NwzZjvKsH7GKmjPHovUPl7kza.7RUa8O71oEJjVUKcWKAkLw0m0ztEnlmlVzJimZz5H2E; Path=/; HttpOnly
Vary: Accept-Encoding

{"args":{"time":"1662365339120"},"headers":{"x-forwarded-proto":"https","x-forwarded-port":"443","host":"postman-echo.com","x-amzn-trace-id":"Root=1-6315ae94-58d38b1f1c45c8b94273a5a3","user-agent":"Go-http-client/1.1","cookie":"sails.sid=s%3A9zczVs9TODPR3d-xi5F59muThb8LgjjY.bDP%2FcJnt8Ou9QPjs7Y9v15utVqCU%2FrK3EddVu6ThODc","accept-encoding":"gzip"},"url":"https://postman-echo.com/get?time=1662365339120"}
--------------------------------------------------
4:08PM INF validate body.args.time assertMethod=length_equals checkExpr=body.args.time checkValue=1662365339120 checkValueType=string expectValue=13 expectValueType=int64 result=true
4:08PM INF run step end exportVars=null step="get with timestamp" success=true type=request
4:08PM INF run testcase end testcase="demo with complex mechanisms"
4:08PM INF quit hashicorp plugin process
2022-09-05T16:09:01.733+0800 [WARN] grpc-py: plugin failed to exit gracefully

pytest也支持,测试 case 的文件应该以 _test 结尾

1
>hrp pytest testcases/py_test.py --html=/report/index.html  --junit-xml=/report/report.xml
  • 执行完成后,在reports目录发现了测试报告的html文件

image-20220905161310265

关于登录

很多用例都是需要登录后才能处理,如果比较简单可以类似这样处理,下面取token

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- config:
name: logincase
variables: {}


teststeps:
-
name: login case1
request:
url: http://127.0.0.1:8000/api/v1/login/
method: POST
headers:
Content-Type: application/json
User-Agent: python-requests/2.18.4
json:
username: test
password: 123456
extract:
- token: content.token # 提取token
-
name: get user info case1
request:
url: http://127.0.0.1:8000/api/v1/user/info/
method: GET
headers:
Content-Type: application/json
User-Agent: python-requests/2.18.4
Authorization: Token $token # 引用token
  • cookie
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- config:
name: logincase
variables: {}


teststeps:
-
name: login case1
request:
url: http://127.0.0.1:8000/api/v1/login/
method: POST
headers:
Content-Type: application/json
User-Agent: python-requests/2.18.4
json:
username: test
password: 123456
extract:
- cookievalue: headers.Set-Cookie # 提取Cookie
-
name: get user info case1
request:
url: http://127.0.0.1:8000/api/v1/user/info/
method: GET
headers:
Content-Type: application/json
User-Agent: python-requests/2.18.4
cookie: $cookievalue # 把提取到的 cookies 附加到本次请求头域
  • 如果是比较复杂的登录校验,可以采用setup_hooks调用自定义函数(debugtalk.py中编写)完成登录校验

性能压测

  • 关于性能测试,可以参考这里,需要用go
  • 本地的项目需要采用hrp startproject demo --go来新建go插件的项目

说明

  • 由于某原因,把node和hexo升级后,造成无法发生文章到服务器了,记录解决方法
  • 迁移博客到腾讯云

本地环境

  • 准备空的文件夹,用hexo进行初始化
1
hexo init XXXXX/
  • 安装依赖文件
1
2
3
4
5
6
npm install hexo-helper-live2d --save
npm install hexo-asset-image --save
npm install hexo-tag-cloud --save
npm install hexo-abbrlink --save
npm install hexo-deployer-git --save
npm install hexo-generator-searchdb --save
  • hexo-asset-imagehexo-abbrlink 有冲突,造成图片无法展示,修改asset-imageindex.js中代码
1
2
3
4
# E:\moon-full-blog\node_modules\hexo-asset-image\index.js

//var endPos = link.lastIndexOf('.');
var endPos = link.lastIndexOf('/');
  • 创建本地的id_rsa.pub的密码
1
ssh-keygen -C "2847XXX@qq.com"
  • 服务器上新增文件,把本地id_rsa.pub的密码拷贝进去
1
2
vi ~/.ssh/authorized_keys

  • 修改权限
1
2
3
chmod 755 ~
chmod 700 ~/.ssh
chmod 600 ~/.ssh/authorized_keys
  • 测试本地连接服务器,提示输入密码,注意我的用的root用户名
1
2
3
4
5
ssh -v root@82.XX.xx.xx

debug1: client_input_global_request: rtype hostkeys-00@openssh.com want_reply 0
Last login: Fri Aug 26 17:59:47 2022 from 116.128.233.192
Last login: Fri Aug 26 17:59:47 2022 from 116.128.233.192
  • 然后把本地之前的博客的config.yml,主题文件夹,source 拷贝带最新的博客目录

  • 设置邮箱

1
git config --global user.email '28477XXX4@qq.com'
  • 提交到服务器
1
hexo cl &&  hexo g && hexo d

说明

本次主要记录win10_X64下搭建Sonic的使用步骤

软件安装

docker

  • 安装过程中出现很多问题,因为我的系统为家庭版本,所以需要安装wsl2Ubuntu 18.04.5 LTS等,重装系统了为专业版本,直接就可以安装

image-20220823170502758

  • 打开官网下载并安装,重启电脑,进入到bios,打开虚拟化

  • 打开Docker Desktop,正常启动后,打开cmd,输入命令

    1
    2
    3
    4
    5
    6
    7
    C:\Users\Administrator>docker run -d -p 80:80 docker/getting-started
    Unable to find image 'docker/getting-started:latest' locally
    latest: Pulling from docker/getting-started
    ....
    dsha256:b558be874169471bd4e65bd6eac8c303b271a7ee8553ba47481b73b2bf597aae
    Status: Downloaded newer image for docker/getting-started:latest
    8df47bc4fdb1ec2ac47a70f609e8e73dc6afb62298c570b27b1ae9032fdd036d

    进入 docker 里可以看到测试运行的容器。

    image-20220823170833149

MySQL

打开官网,下载8.0的版本

image-20220823171802303

  • 设置环境变量:D:\app\mysql-8.0.13-winx64\bin

  • 在mysql的安装目录新建my.ini

1
2
3
4
5
6
7
8
9
10
11
12
[client]
port=3306
# 字符集设置
default-character-set=utf8

[mysqld]
port=3306
character_set_server=utf8
# 设置安装目录
basedir=D:\app\mysql-8.0.13-winx64
# 设置数据存放位置
datadir=D:\app\mysql-8.0.13-winx64\data
  • 通过 mysqld --initialize --console 命令来重置密码。
1
2
3
C:\Users\Administrator>mysqld --initialize --console
2022-08-23T09:26:32.002602Z 0 [System] [MY-013169] [Server] D:\app\mysql-8.0.13-generated for root@localhost: QCHDW*9OpYQX
2022-08-23T09:26:37.215593Z 0 [System] [MY-013170] [Server] D:\app\mysql-8.0.13-winx64\bin\mysqld.exe (mysqld 8.0.13) initializing of server has completed
  • 登录并修改root用户的密码
1
2
3
4
5
6
7
8
9
10
11
C:\Users\Administrator>mysqld -install
Service successfully installed.

C:\Users\Administrator>net start mysql
MySQL 服务正在启动 .
MySQL 服务已经启动成功。

C:\Users\Administrator>mysql -u root -p

mysql> alter user 'root'@'localhost' identified by '123456';
Query OK, 0 rows affected (0.05 sec)
  • 退出重新登陆验证
1
2
3
4
5
6
7
mysql> exit;
Bye

C:\Users\Administrator>mysql -u root -p
Enter password: ******
Welcome to the MySQL monitor. Commands end with ; or \g.
Type 'help;' or '\h' for help. Type '\c' to
  • 修改可以支持远程用IP访问
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use mysql;
mysql> select host, user from user;
+-----------+------------------+
| host | user |
+-----------+------------------+
| localhost | mysql.infoschema |
| localhost | mysql.session |
| localhost | mysql.sys |
| localhost | root |
+-----------+------------------+
4 rows in set (0.01 sec)
# 查看表格中 root 用户的 host,默认应该显示的 localhost,只支持本地访问,不允许远程访问

mysql> update user set host='%' where user ='root';
# 说明: % 代表任意的客户端,可替换成具体IP地址。

Query OK, 1 row affected (0.08 sec)
Rows matched: 1 Changed: 1 Warnings: 0

image-20220823175019789

本地配置

  • d:\sonic 目录中下载官网的最新的部署包,我下载的为sonic-server-v1.5.0-beta,根据实际情况修改env的内容,主要是 ip,数据库的密码

  • 手动建立数据库:sonic

  • 执行docker-compose

1
2
3
4
5
6
D:\sonic\sonic-server-v1.5.0-beta>docker-compose up -d
sonic-server-v150-beta_sonic-server-eureka_1 is up-to-date
sonic-server-v150-beta_sonic-server-controller_1 is up-to-date
sonic-server-v150-beta_sonic-server-gateway_1 is up-to-date
sonic-server-v150-beta_sonic-client-web_1 is up-to-date
Creating sonic-server-v150-beta_sonic-server-folder_1 ... done
  • 打开http://localhost:3000/,注册后,自动登录

image-20220824095329636

Sonic配置

本机环境-java

  • 下载并配置Java,我的版本为
1
2
3
4
C:\Users\Administrator>java -version
java version "17.0.2" 2022-01-18 LTS
Java(TM) SE Runtime Environment (build 17.0.2+8-LTS-86)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.2+8-LTS-86, mixed mode, sharing)

appium安装

下载android sdk tools,选择exe文件安装后,并打开

image-20220824103828375

  • 因为用于 Appium 自动化,只需要勾选下面的 4 个包就行了,需要挂梯子

image-20220824104049673

image-20220824104117977

  • 创建ANDROID_HOME的环境变量:D:\app\android-sdk
  • SDK的platform-tools和tool的路径添加到Path 变量内 %ANDROID_HOME%\platform-tools%ANDROID_HOME%\tools%ANDROID_HOME%\build-tools\29.0.2
1
2
3
4
C:\Users\Administrator>adb version
Android Debug Bridge version 1.0.41
Version 29.0.6-6198805
Installed as D:\app\android-sdk\platform-tools\adb.exe
  • 下载并安装node.js
1
2
3
4
5
6
7
8
Administrator@WIN-5TF67LA12I4 MINGW64 /d/app/node-v14.20.0-win-x64
$ npm -v
6.14.17

Administrator@WIN-5TF67LA12I4 MINGW64 /d/app/node-v14.20.0-win-x64
$ node -v
v14.20.0

  • 安装appium,采用npm,尝试过了用cnpm经常报错,使用git的命令窗口
1
2
3
4
5
npm cache clear --force # 清空缓存,如果报错的话,可以
# 挂了代理,在git窗口中设置了代理
export https_proxy=http://127.0.0.1:33210 http_proxy=http://127.0.0.1:33210 all_proxy=socks5://127.0.0.1:33211

npm i -g appium
  • 装doctor,用来检查appium是否正常
1
2
3
4
5
6
7
$ npm install appium-doctor -g
npm WARN deprecated authorize-ios@1.2.1: Moved into appium
npm WARN deprecated debug@4.1.1: Debug versions >=3.2.0 <3.2.7 || >=4 <4.3.1 have a low-severity ReDos regression when used in a Node.js environment. It is recommended you upgrade to 3.2.7 or 4.3.1. (https://github.com/visionmedia/debug/issues/797)
D:\app\node-v14.20.0-win-x64\appium-doctor -> D:\app\node-v14.20.0-win-x64\node_modules\appium-doctor\appium-doctor.js
+ appium-doctor@1.16.0
added 240 packages from 297 contributors in 74.211s

  • 检查appium,执行./appium-doctor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Administrator@WIN-5TF67LA12I4 MINGW64 /d/app/node-v14.20.0-win-x64
$ ./appium-doctor
info AppiumDoctor Appium Doctor v.1.16.0
info AppiumDoctor ### Diagnostic for necessary dependencies starting ###
info AppiumDoctor ✔ The Node.js binary was found at: D:\app\node-v14.20.0-win-x64\node.EXE
info AppiumDoctor ✔ Node version is 14.20.0
info AppiumDoctor ✔ ANDROID_HOME is set to: D:\app\android-sdk
info AppiumDoctor ✔ JAVA_HOME is set to: D:\app\Java\jdk-17.0.2
info AppiumDoctor Checking adb, android, emulator
info AppiumDoctor 'adb' is in D:\app\android-sdk\platform-tools\adb.exe
info AppiumDoctor 'android' is in D:\app\android-sdk\tools\android.bat
info AppiumDoctor 'emulator' is in D:\app\android-sdk\tools\emulator.exe
info AppiumDoctor ✔ adb, android, emulator exist: D:\app\android-sdk
info AppiumDoctor ✔ 'bin' subfolder exists under 'D:\app\Java\jdk-17.0.2'
info AppiumDoctor ### Diagnostic for necessary dependencies completed, no fix needed. ###
info AppiumDoctor
info AppiumDoctor ### Diagnostic for optional dependencies starting ###
WARN AppiumDoctor ✖ opencv4nodejs cannot be found.
WARN AppiumDoctor ✖ ffmpeg cannot be found
WARN AppiumDoctor ✖ mjpeg-consumer cannot be found.
WARN AppiumDoctor ✖ bundletool.jar cannot be found
WARN AppiumDoctor ✖ gst-launch-1.0.exe and/or gst-inspect-1.0.exe cannot be found
info AppiumDoctor ### Diagnostic for optional dependencies completed, 5 fixes possible. ...

info AppiumDoctor Bye!

  • 直接输入appium 启动,没问题后就可以关闭了,不要手动去启动appium
1
2
3
4
5
Administrator@WIN-5TF67LA12I4 MINGW64 /d/app/node-v14.20.0-win-x64
$ appium
[Appium] Welcome to Appium v1.22.3
[Appium] Appium REST http interface listener started on 0.0.0.0:4723

  • 安装adbkit,用adb进行调试
1
npm i -g adbkit

sonic文件配置

  • 新增angent

image-20220824174006767

  • 打开官网下载sonic-agent-v1.5.0-beta-windows_x86_64解压

  • 打开配置文件,我的路径为:D:\sonic\sonic-agent-v1.5.0-beta-windows_x86_64\config\application-sonic-agent.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
sonic:
agent:
# 替换为部署Agent机器的ipv4
host: 172.31.104.166
# 替换为Agent服务的端口,可以自行更改
port: 7777
# 替换为前端新增Agent生成的key
key: XXXXXX
server:
# 改成server的SONIC_SERVER_HOST
host: 172.31.104.166
# 改成server的SONIC_SERVER_PORT
port: 3000

modules:
android:
# 是否开启安卓模块
enable: true
# 是否开启远程adb调试功能
use-adbkit: true
ios:
# 如果不需要连接iOS设备,请将true改为false。开启的windows用户请确保本机已安装iTunes
enable: false
# 替换为你自己使用的wda的bundleId,如果没有.xctrunner后缀会自动补全,建议使用公司的开发者证书
wda-bundle-id: com.sonic.WebDriverAgentRunner
appium:
# 是否开启Appium功能
enable: true
webview:
# 是否开启在线webView调试功能
enable: false
# 谷歌调试端口,一般不需要修改(默认0使用随机端口,如果需要开启防火墙给外部使用,请设置固定端口如7778)
chrome-driver-debug-port: 0
# 替换为Agent机器上的chrome浏览器的driver路径,可以去http://npm.taobao.org/mirrors/chromedriver/下载
chrome-driver-path: "/Applications/Google Chrome.app/Contents/MacOS/chromedriver"
sgm:
enable: true
  • 启动agent
1
D:\sonic\sonic-agent-v1.5.0-beta-windows_x86_64>java -jar sonic-agent-windows-x86_64.jar

image-20220824174158573

  • 手机打开开发者模式后,连接电脑(agent)后,打开web页面的设备

image-20220824175641741

开始使用

sonic的配置

  • 新建项目,点击项目

image-20220824175944576

  • 打开设备中心,选择空闲设备,选择使用

image-20220824180220297

image-20220824180249666

  • 点击控件元素,获取到元素后,直接添加控件

image-20220826085653821

image-20220826085812310

  • 用例的步骤关联-上面的控件元素

  • 套件关联多个用例

  • 查看测试报告

image-20220826094320795

  • 支持多个设备同时运行所有用例

总结

  • 优点:远程的手机管理,统一的用例维护,多机运行用例。

  • 不足

    • 不支持分布式用例
    • 用例查找不支持显示等待,这个问题需要等后续的版本脱离appium后,可以处理
  • 已经支持使用poco的控件的获取,这对游戏应用来说,是个很好的消息。

说明

这篇文章,后续的结论说是服务器配置不够观点是错误的,经过排查发现是服务器中毒了,经过top排查到了是有个叫kdevtmpfsi的后台进程太大造成,搜了下相关资料刚好有人和我相同的问题,把解决方案就转载了过来

中毒具体描述

  • 某一天,我写完本地程序,准备部署到云服务器,然而我的xshell始终是连接不上服务器,或者是连接上反应非常慢,我怀疑阿里云对我的服务器动了什么了(实在是对不住,毕竟我也不知道还有这种事情),然后通过阿里云控制台,发现出现了CPU占用非常高,基本上就100%了,并且内存的占用也是非常高。通过登录服务器,具体查看,然后使用命令查看,确实正如控制台的监控系统所展示的那样。

命令以及过程

  • 查看cpu占用

    1
    top
  • 查看异常进程 kdevtmpfsi

    1
    ps -ef | grep kdevtmpfsi
  • 处理异常定时任务

1
2
3
4
5
# 查看定时任务,找出异常任务
crontab -l

# 删除一次任务
crontab -e
  • 查看和其相关联的进程

    1
    2
    systemctl status pid
    # pid 为kdevtmpfsi的进程号
  • 杀掉进程(包括其守护进程,由上一步可以看出),先杀守护进程,后杀挖矿进程

1
kill -9 pid
  • 删除守护进程文件
1
2
3
4
5
6
# 查找守护进程相关文件
find / -name kins*

# 删除文件
rm -rf path

  • 删除挖矿程序文件
1
2
3
4
# 查找文件
find / -name kdevtmpfsi*
# 删除
rm -rf path

说明

  • chrome导入密码后,无反应
  • chrome保存密码,也没有任何提示

解决方案

  • 浏览器中输入chrome://flags/,找到Password import,把值重新禁止和打开。

此操作也使用无导入密码按钮的问题

image-20220822094625395

  • 运行中输入:%LOCALAPPDATA%\Google\Chrome\User Data\Default,打开Chrome保存用户信息的路径,删除已经保存的用户信息(Login开头的文件)

image-20220822095203160

  • 重启chrome浏览器后,无论是导入密码,还是提示密码保存功能,都已经可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python
# -*- coding:utf-8 -*-

"""
------------------------------------
# @FileName :handle_log.py
# @Time :2020/8/31 19:59
# @Author :xieyuanzuo
# @description :
------------------------------------
"""

import logging
import os
import colorlog
from logging.handlers import RotatingFileHandler
from datetime import datetime

cur_path = os.path.dirname(os.path.realpath(__file__)) # 当前项目路径
log_path = os.path.join(os.path.dirname(cur_path), 'leetcode', 'logs') # log_path为存放日志的路径
if not os.path.exists(log_path): os.mkdir(log_path) # 若不存在logs文件夹,则自动创建

log_colors_config = {
# 终端输出日志颜色配置
'DEBUG': 'white',
'INFO': 'cyan',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
}

default_formats = {
# 终端输出格式
'color_format': '%(log_color)s%(asctime)s-%(name)s-%(filename)s-[line:%(lineno)d]-%(levelname)s: %(message)s',
# 日志输出格式
'log_format': '%(asctime)s-%(name)s-%(filename)s-[line:%(lineno)d]-%(levelname)s: %(message)s'
}


class HandleLog:
"""
先创建日志记录器(logging.getLogger),然后再设置日志级别(logger.setLevel),
接着再创建日志文件,也就是日志保存的地方(logging.FileHandler),然后再设置日志格式(logging.Formatter),
最后再将日志处理程序记录到记录器(addHandler)
"""

def __init__(self):
self.__now_time = datetime.now().strftime('%Y-%m-%d') # 当前日期格式化
self.__all_log_path = os.path.join(log_path, self.__now_time + "-all" + ".log") # 收集所有日志信息文件
self.__error_log_path = os.path.join(log_path, self.__now_time + "-error" + ".log") # 收集错误日志信息文件
self.__logger = logging.getLogger() # 创建日志记录器
self.__logger.setLevel(logging.DEBUG) # 设置默认日志记录器记录级别

@staticmethod
def __init_logger_handler(log_path):
"""
创建日志记录器handler,用于收集日志
:param log_path: 日志文件路径
:return: 日志记录器
"""
# 写入文件,如果文件超过1M大小时,切割日志文件,仅保留3个文件
logger_handler = RotatingFileHandler(filename=log_path, maxBytes=1 * 1024 * 1024, backupCount=3, encoding='utf-8')
return logger_handler

@staticmethod
def __init_console_handle():
"""创建终端日志记录器handler,用于输出到控制台"""
console_handle = colorlog.StreamHandler()
return console_handle

def __set_log_handler(self, logger_handler, level=logging.DEBUG):
"""
设置handler级别并添加到logger收集器
:param logger_handler: 日志记录器
:param level: 日志记录器级别
"""
logger_handler.setLevel(level=level)
self.__logger.addHandler(logger_handler)

def __set_color_handle(self, console_handle):
"""
设置handler级别并添加到终端logger收集器
:param console_handle: 终端日志记录器
:param level: 日志记录器级别
"""
console_handle.setLevel(logging.DEBUG)
self.__logger.addHandler(console_handle)

@staticmethod
def __set_color_formatter(console_handle, color_config):
"""
设置输出格式-控制台
:param console_handle: 终端日志记录器
:param color_config: 控制台打印颜色配置信息
:return:
"""
formatter = colorlog.ColoredFormatter(default_formats["color_format"], log_colors=color_config)
console_handle.setFormatter(formatter)

@staticmethod
def __set_log_formatter(file_handler):
"""
设置日志输出格式-日志文件
:param file_handler: 日志记录器
"""
formatter = logging.Formatter(default_formats["log_format"], datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)

@staticmethod
def __close_handler(file_handler):
"""
关闭handler
:param file_handler: 日志记录器
"""
file_handler.close()

def __console(self, level, message):
"""构造日志收集器"""
all_logger_handler = self.__init_logger_handler(self.__all_log_path) # 创建日志文件
error_logger_handler = self.__init_logger_handler(self.__error_log_path)
console_handle = self.__init_console_handle()

self.__set_log_formatter(all_logger_handler) # 设置日志格式
self.__set_log_formatter(error_logger_handler)
self.__set_color_formatter(console_handle, log_colors_config)

self.__set_log_handler(all_logger_handler) # 设置handler级别并添加到logger收集器
self.__set_log_handler(error_logger_handler, level=logging.ERROR)
self.__set_color_handle(console_handle)

if level == 'info':
self.__logger.info(message)
elif level == 'debug':
self.__logger.debug(message)
elif level == 'warning':
self.__logger.warning(message)
elif level == 'error':
self.__logger.error(message)
elif level == 'critical':
self.__logger.critical(message)

self.__logger.removeHandler(all_logger_handler) # 避免日志输出重复问题
self.__logger.removeHandler(error_logger_handler)
self.__logger.removeHandler(console_handle)

self.__close_handler(all_logger_handler) # 关闭handler
self.__close_handler(error_logger_handler)

def debug(self, message):
self.__console('debug', message)

def info(self, message):
self.__console('info', message)

def warning(self, message):
self.__console('warning', message)

def error(self, message):
self.__console('error', message)

def critical(self, message):
self.__console('critical', message)


log = HandleLog()

if __name__ == '__main__':
for i in range(5):
log.info("这是日志信息")
log.debug("这是debug信息")
log.warning("这是警告信息")
log.error("这是错误日志信息")
log.critical("这是严重级别信息")

结果分析

  • 在Logs文件夹下生成了一个为全部日志文件,一个为错误日志文件

image-20220802192813131

  • 日志文件的生成核心代码如下,可根据需求修改
1
2
3
4
5
6
7
8
9
10
11
12
13
def __console(self, level, message):
"""构造日志收集器"""
all_logger_handler = self.__init_logger_handler(self.__all_log_path) # 创建日志文件
error_logger_handler = self.__init_logger_handler(self.__error_log_path)
console_handle = self.__init_console_handle()

self.__set_log_formatter(all_logger_handler) # 设置日志格式
self.__set_log_formatter(error_logger_handler)
self.__set_color_formatter(console_handle, log_colors_config)

self.__set_log_handler(all_logger_handler) # 设置handler级别并添加到logger收集器--all的Log文件
self.__set_log_handler(error_logger_handler, level=logging.ERROR) #--error的log文件
self.__set_color_handle(console_handle)
  • 注意这里设置了日志文件大小
1
2
3
4
5
6
7
8
9
10
@staticmethod
def __init_logger_handler(log_path):
"""
创建日志记录器handler,用于收集日志
:param log_path: 日志文件路径
:return: 日志记录器
"""
# 写入文件,如果文件超过1M大小时,切割日志文件,仅保留3个文件
logger_handler = RotatingFileHandler(filename=log_path, maxBytes=1 * 1024 * 1024, backupCount=3, encoding='utf-8')
return logger_handler

之前写过monkey方面的测试,这次刚好有项目用到,并且需要监控性能信息,所以重构了一次

monkey 压力测试android

  • python3
  • 统计性能信息cpu,men,fps,battery,flow
  • 支持wifi,gprs统计
  • 统计crash信息
  • 查看源码

monkey.ini 配置文件

1
2
3
4
5

cmd=adb shell monkey -p com.jianshu.haruki --throttle 500 --ignore-timeouts --ignore-crashes --monitor-native-crashes -v -v -v 200 >
package_name=com.jianshu.haruki
activity = com.baiji.jianshu.account.SplashScreenActivity
net = wifi
  • throttle 每次事件等待500毫秒
  • net 支持gprs和wifi

image-20220623165014473

image-20220623165039643

image-20220623165114249

代码分析

主要监控代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
def get_cpu(pkg_name):
cmd = "adb shell dumpsys cpuinfo | findstr " + pkg_name
print(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.readlines()
for info in output:
if info.split()[1].decode().split("/")[1][:-1] == pkg_name: # 只有包名相等
# print("cpu=" + info.split()[2].decode())
cpu.append(float(info.split()[2].decode().split("%")[0]))
print("----cpu-----")
print(cpu)
return cpu


def get_men(pkg_name):
cmd = "adb shell dumpsys meminfo %s" % (pkg_name)
print(cmd)
men_s = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.readlines()
for info in men_s:
if len(info.split()) and info.split()[0].decode() == "TOTAL":
# print("men="+info.split()[1].decode())
men.append(int(info.split()[1].decode()))
print("----men----")
print(men)
return men


# 得到fps
'''
@author fenfenzhong
'''


def get_fps(pkg_name):
_adb = "adb shell dumpsys gfxinfo %s" % pkg_name
print(_adb)
results = os.popen(_adb).read().strip()
frames = [x for x in results.split('\n') if validator(x)]
frame_count = len(frames)
jank_count = 0
vsync_overtime = 0
render_time = 0
for frame in frames:
time_block = re.split(r'\s+', frame.strip())
if len(time_block) == 3:
try:
render_time = float(time_block[0]) + float(time_block[1]) + float(time_block[2])
except Exception as e:
render_time = 0


if render_time > 16.67:
jank_count += 1
if render_time % 16.67 == 0:
vsync_overtime += int(render_time / 16.67) - 1
else:
vsync_overtime += int(render_time / 16.67)

_fps = int(frame_count * 60 / (frame_count + vsync_overtime))
fps.append(_fps)
# return (frame_count, jank_count, fps)
print("-----fps------")
print(fps)
return fps


def get_battery():
_batter = subprocess.Popen("adb shell dumpsys battery", shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE).stdout.readlines()
for info in _batter:
if info.split()[0].decode() == "level:":
battery.append(int(info.split()[1].decode()))
print("-----battery------")
print(battery)
return int(info.split()[1].decode())


def get_pid(pkg_name):
pid = subprocess.Popen("adb shell ps | findstr " + pkg_name, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE).stdout.readlines()
for item in pid:
if item.split()[8].decode() == pkg_name:
return item.split()[1].decode()


def get_flow(pkg_name, type):
pid = get_pid(pkg_name)
if pid is not None:
_flow = subprocess.Popen("adb shell cat /proc/" + pid + "/net/dev", shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE).stdout.readlines()
for item in _flow:
if type == "wifi" and item.split()[0].decode() == "wlan0:": # wifi
# 0 上传流量,1 下载流量
flow[0].append(int(item.split()[1].decode()))
flow[1].append(int(item.split()[9].decode()))
print("------flow---------")
print(flow)
return flow
if type == "gprs" and item.split()[0].decode() == "rmnet0:": # gprs
print("--------------")
flow[0].append(int(item.split()[1].decode()))
flow[1].append(int(item.split()[9].decode()))
return flow
else:
flow[0].append(0)
flow[1].append(0)
return flow

  • 代码入口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if ba.attached_devices():
mc = BaseMonkeyConfig.monkeyConfig(PATH("monkey.ini"))
# 打开想要的activity
ba.open_app(mc["package_name"], mc["activity"])
temp = ""
# monkey开始测试
start_monkey(mc["cmd"], mc["log"])
time.sleep(1)
starttime = datetime.datetime.now()
while True:
with open(mc["monkey_log"], encoding='utf-8') as monkeylog:
BaseMonitor.get_cpu(mc["package_name"])
BaseMonitor.get_men(mc["package_name"])
BaseMonitor.get_fps(mc["package_name"])
BaseMonitor.get_battery()
BaseMonitor.get_flow(mc["package_name"], mc["net"])
time.sleep(1) # 每1秒采集检查一次
if monkeylog.read().count('Monkey finished') > 0:
endtime = datetime.datetime.now()
print("测试完成咯")
app = {"beforeBattery": BaseMonitor.get_battery(), "net": mc["net"], "monkey_log": mc["monkey_log"]}
report(app, str((endtime - starttime).seconds) + "秒")
bo.close()