非阻塞启动线程
1 | import threading |
- 得到值如下,线程启动函数后,非阻塞执行
1
2
3
4
5
6start thread
start....
111
222
end thread
end...
多线程并发处理
1 | import threading |
- 打印结果:每次运行三个线程,每个线程循环打印10次
1
2
3
4
5
6
7
8
9start run
start run
start run
0
0
...
thread_id=1:name=Thread-1
所有线程执行完毕
总循环次数为:30 - 多线程共享资源,可以使用全局变量
global
多线程加锁
- 对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间
1 | # -*- coding: utf-8 -*- |
- 打印结果是每次只能运行一个线程
1 | start |
多线程与队列
- 我们经常会遇到这样的一个问题,这里有成千上万条数据,每次需要取出其中的一条数据进行处理,那么引入多线程该怎么进行任务分配?
- 我们可以将数据进行分割然后交给多个线程去跑,可是这并不是一个明智的做法。在这里我们可以使用队列与线程相结合的方式进行任务分配。
- 队列线程的思想: 首先创建一个全局共享的队列,队列中只存在有限个元素,并将所有的数据逐条加入到队列中,并调用队列的join函数进行等待。之后便可以开启若干线程,线程的任务就是不断的从队列中取数据进行处理就可以了。
1 | import threading |
- 这里必须要在判断
q.empty()
前加上线程锁,因为可能会出现这样的一种情况。 - 某一时刻,队列中还有一个元素,该元素正在被线程A取出,而与此同时线程B正在判断队列q是否为空,而此时线程B中队列q不为空进入后面的操作,但是待B去取元素时,最后一个元素已经被A取出,造成线程等待,显示出被挂起的状态。
- 我们也可以通过加入
q.get(timeout=10)
超时操作来弥补这一问题。 - 打印的结果
1
2
3
4
5
6
7[<myThread(Thread-0, started 6568)>, <myThread(Thread-1, started 7724)>, <myThread(Thread-2, started 7796)>]
Thread-1:Sat Aug 22 11:36:29 2020:Queue:0
Thread-1:Sat Aug 22 11:36:29 2020:Queue:1
...
Thread-1:Sat Aug 22 11:36:30 2020:Queue:9998
Thread-1:Sat Aug 22 11:36:30 2020:Queue:9999
Exiting Main Thread
ThreadPoolExecutor线程池的使用
- 锁依然可以运用到线程池
map
的使用,接受一个List的数据,会循环调用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19from concurrent.futures.thread import ThreadPoolExecutor
import time
num = 0
def print_time(data):
global num
num += 1
time.sleep(2)
print("start_%s" % data)
print("============")
data = []
for i in range(50):
data.append(i)
with ThreadPoolExecutor(10) as pool:
result = pool.map(print_time, data)
# 等待所有线程执行完毕
for i in result:
pass
print("循环次数=%s" % num)- 打印结果为:每次启动10个线程,启动了5次
1
2
3
4
5
6============
start_46
start_49
============
============
循环次数=50 submit
接受list的数据,也可以接受字典1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19rom concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures import as_completed
import time
def print_time(data):
time.sleep(2)
print("start_%s" % data)
print("============")
data = []
for i in range(50):
data.append(i)
with ThreadPoolExecutor(10) as executor:
future_list = []
for i in range(10):
# future = executor.submit(print_time,data)
future = executor.submit(print_time, {"name": 111, "id": 222})
future_list.append(future)
for res in as_completed(future_list): # 这个futrure_list是你future对象的列表
print(res.result())