Skip to content

Python并发编程

程序提速的方法

  • threading
  • multiprocessing
  • Hadoop/hive/spark

Python对并发编程的支持

  • 多进程: multiprocessing(Process-based), 利用多核CPU的能力
  • 多线程: threading(Thread-based),利用CPU和IO可以同时执行的原理
  • 异步IO:asyncio(Coroutine-based),在单线程利用CPU和IO同时执行的原理实现函数的异步执行

Python-Concurrency-API-Decision-Tree

  • 使用Lock对资源加锁,防止访问冲突
  • 使用Queue实现不同线程/进程之间的数据通信, 实现生产者-消费者模式
  • 使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果
  • 使用subprocess启动外部程序的进程,并进行输入输出交互

多进程Process(multiprocessing)

  • 优点:利用多核CPU并行运算
  • 缺点:占用资源最多、可启动数目比线程少
  • 适用:CPU密集型运算

多线程Thread(threading)

  • 优点:相比进程,更轻量级、占用资源少
  • 缺点:
  • 相比进程:多线程只能并发执行,不能用于多CPU(GIL)
  • 相比协程:启动数目有限制,占用内存资源,有线程切换开销
  • 适用:IO密集型计算、同时运行的任务数目要求不高

多协程Coroutine(asyncio)

  • 优点:内存开销最小、启动协程数量最多
  • 缺点:支持的库有限制(例:aiohttp VS requests)、代码实现复杂
  • 适用:IO密集型计算、需要超多任务运行、有现成库支持

Python创建多线程的方法

  1. 准备函数
def my_func(a, b):
    do_something(a, b)
  1. 创建线程
import threading

t = threading.Thread(target=my_func, args=(100, 200))
  1. 启动线程
t.start()
  1. 等待结束
t.join()

多线程数据通信-queue.Queue

queue.Queue可以用于多线程之间的、线程安全的数据通信

# 1.导入类库
import queue

# 2.创建Queue
q = queue.Queue()

# 3.添加元素
q.put(item)

# 4.获取元素
item = q.get()

# 5.查询状态
q.qsize() # 查看元素的多少
q.empty() # 判断是否为空
q.full()  # 判断是否已满

Lock用于解决线程安全问题

  • try-finally模式
import threading

lock = threading.Lock()

lock.acquire()

try:
    # do something
finally:
    lock.release()
  • with模式
import threading

lock = threading.Lock()

with lock:
    # do something

线程池

好处:

  1. 提升性能:减去了大量新建、终止线程的开销, 重用了线程资源
  2. 适用场景:适用处理突发性大量请求或需要大量线程完成任务、但实际处理任务时间较短
  3. 防御功能:能有效避免系统因为创建线程过多而导致负荷过大变慢等问题
  4. 代码优势:使用线程池的语法比自己新建线程执行线程更加简洁

ThreadPoolExecutor使用语法:

  • map函数,注意map的结果和入参是顺序对应的
from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor() as pool:
    results = pool.map(craw, urls)

    for result in results:
        print(result)
  • future模式,更强大
from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor() as pool:
    futures = [pool.submit(craw, url) for urls in urls]

    for future in futures:
        print(future.result())

    for future in as_completed(futures):
        print(future.result())

Python异步IO库(asyncio)

要用在异步IO编程中,依赖的库必须支持异步IO的特性

import asyncio

# 获取事件循环
loop = asyncio.get_event_loop()

# 定义协程
async def my_func(url):
    await get_url(url)

# 创建task列表
tasks = [loop.create_task(my_func(url)) for url in urls]

# 执行事件列表
loop.run_until_complete(asyncio.wait(tasks))