006Python-threading__3

以下是一个使用 生产者-消费者模型 的线程间通信示例,通过 queue.Queue 实现安全的数据传递,并包含详细的注释和关键机制说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import threading
import time
from queue import Queue

# 创建线程安全的队列,最大容量为5
task_queue = Queue(maxsize=5)

# 终止信号(用于通知消费者停止)
STOP_SIGNAL = None

def producer():
"""生产者线程:生成任务并放入队列"""
for i in range(10):
# 如果队列已满,阻塞等待消费者取出数据
task_queue.put(f"Task-{i}")
print(f"[生产者] 已生产: Task-{i}")
time.sleep(0.1) # 模拟生产耗时

# 发送终止信号
task_queue.put(STOP_SIGNAL)
print("[生产者] 已发送终止信号")

def consumer():
"""消费者线程:从队列中取出任务并处理"""
while True:
# 如果队列为空,阻塞等待生产者放入数据
item = task_queue.get()

# 检查终止信号
if item is STOP_SIGNAL:
task_queue.task_done() # 必须标记终止信号的处理完成
print("[消费者] 收到终止信号,停止运行")
break

# 处理任务
print(f"[消费者] 正在处理: {item}")
time.sleep(0.2) # 模拟处理耗时

# 标记任务完成(必须在处理完成后调用)
task_queue.task_done()

# 创建并启动线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()

# 主线程等待队列清空
task_queue.join() # 阻塞直到所有任务被标记为完成(包括终止信号)
print("[主线程] 所有任务已完成")

# 等待线程结束
producer_thread.join()
consumer_thread.join()

关键机制说明

1. 队列的线程安全性

  • queue.Queue 内部已经实现了锁机制,无需手动加锁即可保证多线程安全。
  • put()get() 方法在队列满/空时会自动阻塞,避免忙等待。

2. 终止信号的处理

  • 生产者通过向队列发送 STOP_SIGNAL(示例中使用 None)通知消费者停止。
  • 消费者检查到终止信号后,跳出循环并标记任务完成(task_done() 必须调用)。

3. task_done()join() 的配合

  • task_queue.task_done()
    每处理完一个任务必须调用一次,否则 task_queue.join() 会无限阻塞。
  • task_queue.join()
    主线程通过该方法等待队列中所有任务(包括终止信号)被标记为完成。

4. 输出示例

1
2
3
4
5
6
7
8
9
10
[生产者] 已生产: Task-0
[消费者] 正在处理: Task-0
[生产者] 已生产: Task-1
[生产者] 已生产: Task-2
[消费者] 正在处理: Task-1
[生产者] 已生产: Task-3
...
[生产者] 已发送终止信号
[消费者] 收到终止信号,停止运行
[主线程] 所有任务已完成

适用场景

  • I/O密集型任务:如批量下载文件时,生产者生成下载链接,消费者执行下载。
  • 任务调度:主线程分配任务,多个工作线程并行处理。
  • 实时数据处理:生产者接收传感器数据,消费者实时分析。

扩展:多消费者模式

若需多个消费者并行处理任务,只需启动多个消费者线程:

1
2
3
# 启动3个消费者
for _ in range(3):
threading.Thread(target=consumer).start()

注意事项

  1. 终止信号的广播
    如果有多个消费者,需为每个消费者发送一个终止信号(例如放入 NSTOP_SIGNAL)。
  2. 资源释放
    消费者退出前应释放资源(如关闭文件、数据库连接)。
  3. 异常处理
    task_done() 前添加 try...finally 块确保任务标记:
    1
    2
    3
    4
    try:
    # 处理任务
    finally:
    task_queue.task_done()

006Python-threading__3
https://jackiedai.github.io/2025/03/19/011Python/008Python-threading3/
Author
lingXiao
Posted on
March 19, 2025
Licensed under