Python 并发编程:多线程与多进程
1. 技术分析
1.1 并发编程概述
并发编程可以提升程序性能:
并发类型 多线程: 共享内存,适合IO密集型 多进程: 独立内存,适合CPU密集型 异步IO: 事件驱动,适合高IO场景
1.2 并发模型对比
| 模型 | 适用场景 | 优势 | 劣势 |
|---|
| 多线程 | IO密集型 | 低开销 | GIL限制 |
| 多进程 | CPU密集型 | 真正并行 | 高开销 |
| 异步IO | 高IO场景 | 高吞吐量 | 复杂度高 |
1.3 GIL影响
GIL (Global Interpreter Lock) 同一时刻只有一个线程执行Python字节码 多线程在CPU密集型任务上无法真正并行 IO密集型任务可以通过切换获得并发收益
2. 核心功能实现
2.1 多线程编程
import threading from concurrent.futures import ThreadPoolExecutor class ThreadPoolManager: def __init__(self, max_workers=None): self.executor = ThreadPoolExecutor(max_workers=max_workers) def submit_task(self, func, *args, **kwargs): return self.executor.submit(func, *args, **kwargs) def map_tasks(self, func, iterable): return self.executor.map(func, iterable) def shutdown(self, wait=True): self.executor.shutdown(wait=wait) class ThreadSafeCounter: def __init__(self): self.count = 0 self.lock = threading.Lock() def increment(self): with self.lock: self.count += 1 def get_count(self): with self.lock: return self.count def fetch_url(url): import requests response = requests.get(url) return response.text def fetch_urls_concurrent(urls): with ThreadPoolExecutor(max_workers=10) as executor: results = list(executor.map(fetch_url, urls)) return results
2.2 多进程编程
import multiprocessing from concurrent.futures import ProcessPoolExecutor class ProcessPoolManager: def __init__(self, max_workers=None): self.executor = ProcessPoolExecutor(max_workers=max_workers) def submit_task(self, func, *args, **kwargs): return self.executor.submit(func, *args, **kwargs) def map_tasks(self, func, iterable): return self.executor.map(func, iterable) def shutdown(self, wait=True): self.executor.shutdown(wait=wait) def compute_intensive_task(data): result = 0 for i in range(1, 10000000): result += i * data return result def compute_parallel(data_list): with ProcessPoolExecutor() as executor: results = list(executor.map(compute_intensive_task, data_list)) return results class SharedMemoryManager: def __init__(self): self.manager = multiprocessing.Manager() self.shared_dict = self.manager.dict() def set_value(self, key, value): self.shared_dict[key] = value def get_value(self, key): return self.shared_dict.get(key)
2.3 异步编程
import asyncio import aiohttp class AsyncIOManager: def __init__(self): self.loop = asyncio.get_event_loop() async def fetch_url(self, url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def fetch_all_urls(self, urls): tasks = [self.fetch_url(url) for url in urls] return await asyncio.gather(*tasks) def run(self, coroutine): return self.loop.run_until_complete(coroutine) async def process_data_async(data): results = [] for item in data: result = await process_item(item) results.append(result) return results async def process_item(item): await asyncio.sleep(0.1) return item * 2
3. 性能对比
3.1 并发模型性能
| 任务类型 | 单线程 | 多线程 | 多进程 | 异步IO |
|---|
| IO密集型(100请求) | 100s | 10s | 15s | 5s |
| CPU密集型(4核) | 100s | 95s | 25s | 100s |
3.2 线程/进程开销
| 操作 | 线程 | 进程 |
|---|
| 创建开销 | 1ms | 100ms |
| 内存开销 | 1MB | 100MB |
| 通信开销 | 低 | 高 |
3.3 并发框架对比
| 框架 | 适用场景 | 复杂度 | 性能 |
|---|
| threading | 简单并发 | 低 | 中 |
| concurrent.futures | 任务池 | 低 | 中 |
| asyncio | 高IO | 高 | 高 |
| multiprocessing | CPU密集 | 中 | 高 |
4. 最佳实践
4.1 并发模式选择
def choose_concurrency_model(task_type, data_size): if task_type == 'io_bound': if data_size > 1000: return 'asyncio' return 'threading' elif task_type == 'cpu_bound': return 'multiprocessing' else: return 'sequential' class ConcurrencyStrategySelector: @staticmethod def select(strategy, func, data): strategies = { 'threading': lambda: run_with_threads(func, data), 'multiprocessing': lambda: run_with_processes(func, data), 'asyncio': lambda: run_with_async(func, data), 'sequential': lambda: [func(item) for item in data] } return strategies[strategy]()
4.2 并发编程模式
class ConcurrentDataProcessor: def __init__(self, mode='threading'): self.mode = mode def process(self, data, func): if self.mode == 'threading': return self._process_threading(data, func) elif self.mode == 'multiprocessing': return self._process_multiprocessing(data, func) elif self.mode == 'asyncio': return self._process_asyncio(data, func) def _process_threading(self, data, func): with ThreadPoolExecutor() as executor: return list(executor.map(func, data)) def _process_multiprocessing(self, data, func): with ProcessPoolExecutor() as executor: return list(executor.map(func, data)) def _process_asyncio(self, data, func): async def async_func(item): return func(item) async def process_all(): tasks = [async_func(item) for item in data] return await asyncio.gather(*tasks) return asyncio.run(process_all())
5. 总结
并发编程可以显著提升性能:
- 多线程:适合IO密集型任务
- 多进程:适合CPU密集型任务
- 异步IO:适合高IO吞吐量场景
- 选择原则:根据任务类型选择合适模型
对比数据如下:
- IO密集型任务:异步IO > 多线程 > 多进程
- CPU密集型任务:多进程 > 其他
- 线程创建开销比进程小100倍
- 推荐使用concurrent.futures简化并发编程