concurrency_models

常见的并发模型(concurrency_models)

1. 生产者-消费者模型【Producer-Consumer Model】

在生产者-消费者模型中,生产者生成数据并放入缓冲区,而消费者从缓冲区取出数据进行处理。缓冲区通常通过队列来实现,生产者和消费者可以并发地运行,通过队列协调生产和消费的速率。

  • 优点:解耦了生产和消费的速度,可以通过队列容量控制并发量。
  • 适用场景:数据流处理、日志记录、任务队列等。
  • 实现方式:可以使用线程/进程配合队列实现。

简单实例

2. 消息发布-订阅模型【Publish-Subscribe Model】

在发布-订阅模型中,消息发布者(Publisher)将消息发布到一个主题(Topic),多个订阅者(Subscriber)可以订阅该主题,并在有新消息发布时收到通知。发布者和订阅者之间通过主题解耦。

  • 优点:支持一对多的消息通信,发布者和订阅者之间解耦。
  • 适用场景:通知系统、实时数据流、多用户消息广播。
  • 实现方式:在 Python 中可以使用 asyncio.Queue 模拟该模式,分布式系统中常使用 RabbitMQ、Redis 等消息中间件。

简单实例

3. 任务分发-执行模型(Task Distribution Model)

任务分发-执行模型将任务分配给多个工作者(Worker),每个工作者独立完成任务并返回结果。通常使用调度器将任务分发到不同的工作者,以均衡负载。

  • 优点:支持负载均衡,适合分布式计算。
  • 适用场景:任务队列、分布式系统的负载均衡、数据处理流水线。
  • 实现方式:在 Python 中可以使用 concurrent.futures.ThreadPoolExecutor 或 Celery 这样的分布式任务队列框架。

简单实例

import concurrent.futures
import time
import random

# 定义一个模拟任务的函数
def task(task_id):
    print(f"任务 {task_id} 开始执行")
    time_to_sleep = random.uniform(1, 3)  # 随机等待时间,模拟任务处理
    time.sleep(time_to_sleep)
    print(f"任务 {task_id} 完成,耗时 {time_to_sleep:.2f} 秒")
    return task_id, time_to_sleep

# 任务调度器,分发多个任务
def main():
    # 创建任务列表
    tasks = [i for i in range(10)]  # 假设有10个任务

    # 使用 ThreadPoolExecutor 创建一个线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        # 将任务分发给线程池
        future_to_task = {executor.submit(task, task_id): task_id for task_id in tasks}

        # 监控每个任务的完成情况
        for future in concurrent.futures.as_completed(future_to_task):
            task_id = future_to_task[future]
            try:
                result = future.result()
                print(f"任务 {result[0]} 执行完成,耗时 {result[1]:.2f} 秒")
            except Exception as e:
                print(f"任务 {task_id} 生成异常:{e}")

if __name__ == "__main__":
    main()
github