0%

直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import smtplib
# 第三方 SMTP 服务
mail_host="smtp.qq.com" #设置服务器
mail_user="111@qq.com" #用户名
mail_pass="adfafda" #口令

def _format_addr(s):
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))


def send_mail(f, to_addr):
'''

:param f: 附件路径
:param to_addr:发给的人 []
:return:
'''
from_addr = mail_user
password = mail_pass
# to_addr = "ashikun@126.com"
smtp_server = mail_host

msg = MIMEMultipart()

# msg = MIMEText('hello, send by Python...', 'plain', 'utf-8')
msg['From'] = _format_addr('坤少发的邮件<%s>' % from_addr)
msg['To'] = _format_addr('大人 <%s>' % to_addr)
msg['Subject'] = Header('接口测试报告……', 'utf-8').encode()

msg.attach(MIMEText('接口测试报告.', 'plain', 'utf-8'))
part = MIMEApplication(open(f, 'rb').read())
part.add_header('Content-Disposition', 'attachment', filename=f)
msg.attach(part)

server = smtplib.SMTP_SSL(smtp_server, 465)
server.set_debuglevel(1)
server.login(from_addr, password)
server.sendmail(from_addr, to_addr, msg.as_string())
server.quit()
  • 被这里密码坑了好长时间,切记这里的密码不是登陆密码,而是授权码
  • QQ发邮件用25端口发送不成功,用搞定ssl的465

运行下面的代码,可以自动给指定的微信好友发消息

1
2
3
4
5
6
7
8
9
10
11
import itchat

#产生二维码
itchat.auto_login(hotReload=True)
#定义用户的昵称
send_userid='亲爱的'
#查找用户的userid
itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']
#利用send_msg发送消息
itchat.send_msg('这是一个测试',toUserName=itcaht_user_name)

运行下面的代码,好友发消息给你后自动回复

  • 自动回复的内容用的是图灵机器人
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import requests
import itchat
# 去图灵机器人官网注册后会生成一个apikey,可在个人中心查看
KEY = '8edce3ce905a4c1dbb96**************'
def get_response(msg):
apiUrl = 'http://www.tuling123.com/openapi/api'
data = {
'key' : KEY,
'info' : msg, # 这是要发送出去的信息
'userid' : 'wechat-rebot', #这里随意写点什么都行
}
try:
# 发送一个post请求
r = requests.post(apiUrl, data =data).json()
# 获取文本信息,若没有‘Text’ 值,将返回Nonoe
return r.get('text')
except:
return
# 通过定义装饰器加强函数 tuling_reply(msg) 功能,获取注册文本信息
@itchat.msg_register(itchat.content.TEXT)
def tuling_reply(msg):
# 设置一个默认回复,在出现问题仍能正常回复信息
defaultReply = 'I received: ' +msg['Text']
reply = get_response(msg['Text'])
# a or b 表示,如有a有内容,那么返回a,否则返回b
return reply or defaultReply
# 使用热启动,不需要多次扫码
itchat.auto_login(hotReload=True)
itchat.run()

其他

说明

  • 主要用途是想用多机压测服务器端
  • 可以组合任何平台,任意语言,任意压测方法,任意电脑等对服务器进行压测

设计思路

  • 开启一个webserver记录客户端压测情况
  • 然后可以运行多个电脑启动多个客户端,压测电脑在同一个内网,也可配置成外网apache。

代码

  • 配置信息
1
2
3
4
5
__author__ = "shikun"
class Const(object):
log = "D:/app/Apache2.2/htdocs/client.log" # apache 的log路径
APAHEC_IP = "192.168.1.38" # 本机ip
PORT_NUMBER = 8088 # 端口号
  • web server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from http.server import BaseHTTPRequestHandler,HTTPServer
import urllib.parse
from common import operateFile
from common.customConst import Const
class myHandler(BaseHTTPRequestHandler):
# Handler for the GET requests
def do_GET(self):
print('Get request received')
req = urllib.parse.unquote(self.path)
result = urllib.parse.parse_qs(req[2:]) # 得到请求参数
self.send_response(200)
self.send_header('Content-type','text/html')
self.end_headers()
# Send the html message
self.wfile.write(b"ok!") #发送信息给客户端
operateFile.write_txt(line=result["msg"][0], f_path=Const.log) # 记录各个客户端发来的信息
try:
server = HTTPServer((Const.APAHEC_IP, Const.PORT_NUMBER), myHandler)
print ('Started httpserver on port ' , Const.PORT_NUMBER)
server.serve_forever()
except KeyboardInterrupt:
print ('^C received, shutting down the web server')
server.socket.close()
  • client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from multiprocessing import Process
from gevent import monkey; monkey.patch_all()
import gevent
import requests
import json
from common.customConst import Const
url = "http://rap.taobao.org/mockjsdata/10296/getUserInfo?id=2"
num = 10
result = {"success": 0, "computer": "压测客户端1", "cpu": 4, "men": 4, sum: 10} # 客户端信息

class Producer(object):
'''
协程发请求
'''
def __init__(self):
self._rungevent()
self.h = 0
def _rungevent(self):
jobs = []
for i in range(num): #windows下有端口限制
jobs.append(gevent.spawn(self.produce))
gevent.joinall(jobs)
requests.get("http://"+Const.APAHEC_IP+":"+ str(Const.PORT_NUMBER)+"/?msg="+str(result)) # 发送客户端的请求情况
def produce(self):
r = requests.get(url)
if r.status_code == 200:
r.encoding = 'UTF-8'
if json.loads(r.text)["code"] == 0:
result["success"] += 1
else:
print("失败咯")
def main():
p = Process(target=Producer, args=()) # 一个进程启动协程
p.start()

# p1 = Process(target = Producer, args=())
# p1.start()

if __name__ == '__main__':
main()
  • 压测端可以用其他语言,其他方法如,多进程+线程,异步等

结果

Paste_Image.png

结束语

  • 这样算不算分布式压力测试,如果哪里有错误,欢迎指正。
  • 服务器监控,现在做的比较简单.直接用命名即可。也可以看此篇的压测统计思路

线程基类

1
2
3
4
5
6
7
8
import threading
class base_thread(threading.Thread):
def __init__(self, func):
threading.Thread.__init__(self)
self.func = func
#print(type(self.func))
def run(self):
self.func

爬虫百度贴吧中某帖子图片

  • 先要安装BeautifulSoup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import urllib.request as request
from bs4 import BeautifulSoup
from Base.Threads import base_thread
path = 'e:/apps/pic/'
def CrawlerFunc(url):
list_img = getUrl(url)
multi_thread(len(list_img), downloadImg(list_img))
def getUrl(url):
response = request.urlopen(url)
html = response.read()
data = html.decode('utf-8')
soup = BeautifulSoup(data)
list_img = []
for list in soup.find_all("img", {"class", "BDE_Image"}):
list_img.append(list.attrs["src"])
return list_img

def downloadImg(list_img):
count = 1
for i in list_img:
filepath = path + str(count)+".jpg"
with open(filepath, 'wb') as file:
print(filepath)
image_data = request.urlopen(i).read()
file.write(image_data)
count += 1
def multi_thread(count, func):
threads = []
for i in range(0, count):
threads.append(base_thread(func))
for j in range(0, count):
threads[j].start()
for k in range(0, count):
threads[k].join()
#CrawlerFunc("http://tieba.baidu.com/p/3764230390")

参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import paramiko
import paramiko
server_ip = '192.168.1.1'
server_user = 'root'
server_passwd = ''
server_port = 22
ssh = paramiko.SSHClient()
def ssh_connect():
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.load_system_host_keys()
ssh.connect(server_ip, server_port,server_user, server_passwd)
return ssh
def client_connect():
client = paramiko.Transport((server_ip, server_port))
client.connect(username = server_user, password = server_passwd)
return client
def ssh_disconnect(client):
client.close()

def exec_cmd(command, ssh):
'''
windows客户端远程执行linux服务器上命令
'''
stdin, stdout, stderr = ssh.exec_command(command)
err = stderr.readline()
out = stdout.readline()
print(stdout.read())

def win_to_linux(localpath, remotepath,client):
'''
windows向linux服务器上传文件.
localpath 为本地文件的绝对路径。如:D: est.py
remotepath 为服务器端存放上传文件的绝对路径,而不是一个目录。如:/tmp/my_file.txt
'''

sftp = paramiko.SFTPClient.from_transport(client)
sftp.put(localpath,remotepath)
client.close()

def linux_to_win(localpath, remotepath,client):
'''
从linux服务器下载文件到本地
localpath 为本地文件的绝对路径。如:D: est.py
remotepath 为服务器端存放上传文件的绝对路径,而不是一个目录。如:/tmp/my_file.txt
'''
sftp = paramiko.SFTPClient.from_transport(client)
sftp.get(remotepath, localpath)
client.close()

class AllowAllKeys(paramiko.MissingHostKeyPolicy):
def missing_host_key(self, client, hostname, key):
return

def muit_exec_cmd(ssh,cmd):
'''
ssh ssh连接
cmd 多命名
'''
ssh.set_missing_host_key_policy(AllowAllKeys())
channel = ssh.invoke_shell()
stdin = channel.makefile('wb')
stdout = channel.makefile('rb')

stdin.write(cmd)
print(stdout.read())

stdout.close()
stdin.close()

cl = client_connect()
sh = ssh_connect()
muit_exec_cmd(sh,'''
cd ~
ls
sar
exit
''')
win_to_linux("t.txt","/data/t1.txt",cl)
cl.close()
sh.close()