0%

python-多线程知识全面解析

非阻塞启动线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import time
def one_thread(name,id):
print("start....")
print(name)
print(id)
time.sleep(5)
print("end...")

print("start thread")
threading.Thread(target=one_thread, args=(), kwargs={"name": 111, "id": 222}).start()
# args是一个list
# kwargs是一个字典,需要对应函数的key
print("end thread")
  • 得到值如下,线程启动函数后,非阻塞执行
    1
    2
    3
    4
    5
    6
    start thread
    start....
    111
    222
    end thread
    end...

多线程并发处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import threading
import time


class myThread(threading.Thread):
def __init__(self, threadID, name):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name

def run(self):
print_time(self.threadID, self.name)

num = 0
def print_time(threadID, name):
global num
# 每一个线程循环10次,最终总循环次数为30次
for i in range(10):
print("start run")
time.sleep(2)
print(i)
num += 1
print("thread_id=%s:name=%s" % (threadID, name))


if __name__ == '__main__':
threads = []
# 新增三个线程
for i in range(3):
name = "Thread-%d" % i
t = myThread(i, name)
t.start()
threads.append(t)
for t in threads:
t.join()
print("所有线程执行完毕")
print("总循环次数为:%s" % num)
  • 打印结果:每次运行三个线程,每个线程循环打印10次
    1
    2
    3
    4
    5
    6
    7
    8
    9
    start run
    start run
    start run
    0
    0
    ...
    thread_id=1:name=Thread-1
    所有线程执行完毕
    总循环次数为:30
  • 多线程共享资源,可以使用全局变量global

多线程加锁

  • 对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# -*- coding: utf-8 -*-
import time
import threading
# 创建锁对象
lock = threading.Lock()
num = 0

def run(n):
global num
for i in range(10):
# 加锁 为了确保下面代码只能由一个线程从头到尾的执行
# 会阻止多线程的并发执行,所以效率会大大降低
"""
lock.acquire()
try:
num = num - n
num = num + n
finally:
# 解锁
lock.release()
"""
with lock:
time.sleep(2)
print("start")
num = num + 1
print("==============")


if __name__ == '__main__':
t1 = threading.Thread(target=run,args=(6,))
t2 = threading.Thread(target=run,args=(9,))
t1.start()
t2.start()
t1.join()
t2.join()
print("num = %s"%(num))
  • 打印结果是每次只能运行一个线程
1
2
3
4
start
==============
...
num = 20

多线程与队列

  • 我们经常会遇到这样的一个问题,这里有成千上万条数据,每次需要取出其中的一条数据进行处理,那么引入多线程该怎么进行任务分配?
  • 我们可以将数据进行分割然后交给多个线程去跑,可是这并不是一个明智的做法。在这里我们可以使用队列与线程相结合的方式进行任务分配。
  • 队列线程的思想: 首先创建一个全局共享的队列,队列中只存在有限个元素,并将所有的数据逐条加入到队列中,并调用队列的join函数进行等待。之后便可以开启若干线程,线程的任务就是不断的从队列中取数据进行处理就可以了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import threading
import time
import queue

q = queue.Queue(10)

threadLock = threading.Lock()


class myThread(threading.Thread):
def __init__(self, threadID, name):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.exitFlag = 0

def run(self):
while not self.exitFlag:
threadLock.acquire()
if not q.empty():
id = q.get()
print_time(self.name, id)
threadLock.release()
else:
threadLock.release()


def print_time(threadName, id):
print ("%s:%s:%s"%(threadName,time.ctime(time.time()),id))
# pass


# 创建3个线程
threads = []
for i in range(3):
name = "Thread-%d" % i
t = myThread(i, name)
t.start()
threads.append(t)
print(threads)

# 新增队列数据
for i in range(10000):
q_name = "Queue:%d" % i
q.put(q_name)

# 等待队列清空
while not q.empty():
pass

# 也可以join方法,与上同效
# q.join()

# 通知线程,处理完之后关闭
for t in threads:
t.exitFlag = 1

# 等待所有线程结束之后才退出
for t in threads:
t.join()

print("Exiting Main Thread")
  • 这里必须要在判断q.empty()前加上线程锁,因为可能会出现这样的一种情况。
  • 某一时刻,队列中还有一个元素,该元素正在被线程A取出,而与此同时线程B正在判断队列q是否为空,而此时线程B中队列q不为空进入后面的操作,但是待B去取元素时,最后一个元素已经被A取出,造成线程等待,显示出被挂起的状态。
  • 我们也可以通过加入q.get(timeout=10)超时操作来弥补这一问题。
  • 打印的结果
    1
    2
    3
    4
    5
    6
    7
    [<myThread(Thread-0, started 6568)>, <myThread(Thread-1, started 7724)>, <myThread(Thread-2, started 7796)>]
    Thread-1:Sat Aug 22 11:36:29 2020:Queue:0
    Thread-1:Sat Aug 22 11:36:29 2020:Queue:1
    ...
    Thread-1:Sat Aug 22 11:36:30 2020:Queue:9998
    Thread-1:Sat Aug 22 11:36:30 2020:Queue:9999
    Exiting Main Thread

ThreadPoolExecutor线程池的使用

  • 锁依然可以运用到线程池
  • map的使用,接受一个List的数据,会循环调用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    from concurrent.futures.thread import ThreadPoolExecutor
    import time
    num = 0
    def print_time(data):
    global num
    num += 1
    time.sleep(2)
    print("start_%s" % data)
    print("============")
    data = []
    for i in range(50):
    data.append(i)
    with ThreadPoolExecutor(10) as pool:
    result = pool.map(print_time, data)
    # 等待所有线程执行完毕
    for i in result:
    pass
    print("循环次数=%s" % num)

  • 打印结果为:每次启动10个线程,启动了5次
    1
    2
    3
    4
    5
    6
    ============
    start_46
    start_49
    ============
    ============
    循环次数=50
  • submit接受list的数据,也可以接受字典
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    rom concurrent.futures.thread import ThreadPoolExecutor
    from concurrent.futures import as_completed

    import time
    def print_time(data):
    time.sleep(2)
    print("start_%s" % data)
    print("============")
    data = []
    for i in range(50):
    data.append(i)
    with ThreadPoolExecutor(10) as executor:
    future_list = []
    for i in range(10):
    # future = executor.submit(print_time,data)
    future = executor.submit(print_time, {"name": 111, "id": 222})
    future_list.append(future)
    for res in as_completed(future_list): # 这个futrure_list是你future对象的列表
    print(res.result())

参考