常见的并发模型(concurrency_models)
1. 生产者-消费者模型【Producer-Consumer Model】
在生产者-消费者模型中,生产者生成数据并放入缓冲区,而消费者从缓冲区取出数据进行处理。缓冲区通常通过队列来实现,生产者和消费者可以并发地运行,通过队列协调生产和消费的速率。
- 优点:解耦了生产和消费的速度,可以通过队列容量控制并发量。
- 适用场景:数据流处理、日志记录、任务队列等。
- 实现方式:可以使用线程/进程配合队列实现。
简单实例
2. 消息发布-订阅模型【Publish-Subscribe Model】
在发布-订阅模型中,消息发布者(Publisher)将消息发布到一个主题(Topic),多个订阅者(Subscriber)可以订阅该主题,并在有新消息发布时收到通知。发布者和订阅者之间通过主题解耦。
- 优点:支持一对多的消息通信,发布者和订阅者之间解耦。
- 适用场景:通知系统、实时数据流、多用户消息广播。
- 实现方式:在 Python 中可以使用
asyncio.Queue
模拟该模式,分布式系统中常使用 RabbitMQ、Redis 等消息中间件。
简单实例
3. 任务分发-执行模型(Task Distribution Model)
任务分发-执行模型将任务分配给多个工作者(Worker),每个工作者独立完成任务并返回结果。通常使用调度器将任务分发到不同的工作者,以均衡负载。
- 优点:支持负载均衡,适合分布式计算。
- 适用场景:任务队列、分布式系统的负载均衡、数据处理流水线。
- 实现方式:在 Python 中可以使用
concurrent.futures.ThreadPoolExecutor
或 Celery 这样的分布式任务队列框架。
简单实例
import concurrent.futures
import time
import random
# 定义一个模拟任务的函数
def task(task_id):
print(f"任务 {task_id} 开始执行")
time_to_sleep = random.uniform(1, 3) # 随机等待时间,模拟任务处理
time.sleep(time_to_sleep)
print(f"任务 {task_id} 完成,耗时 {time_to_sleep:.2f} 秒")
return task_id, time_to_sleep
# 任务调度器,分发多个任务
def main():
# 创建任务列表
tasks = [i for i in range(10)] # 假设有10个任务
# 使用 ThreadPoolExecutor 创建一个线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 将任务分发给线程池
future_to_task = {executor.submit(task, task_id): task_id for task_id in tasks}
# 监控每个任务的完成情况
for future in concurrent.futures.as_completed(future_to_task):
task_id = future_to_task[future]
try:
result = future.result()
print(f"任务 {result[0]} 执行完成,耗时 {result[1]:.2f} 秒")
except Exception as e:
print(f"任务 {task_id} 生成异常:{e}")
if __name__ == "__main__":
main()