news 2026/5/16 0:42:21

Python 并发编程:多线程与多进程

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python 并发编程:多线程与多进程

Python 并发编程:多线程与多进程

1. 技术分析

1.1 并发编程概述

并发编程可以提升程序性能:

并发类型 多线程: 共享内存,适合IO密集型 多进程: 独立内存,适合CPU密集型 异步IO: 事件驱动,适合高IO场景

1.2 并发模型对比

模型适用场景优势劣势
多线程IO密集型低开销GIL限制
多进程CPU密集型真正并行高开销
异步IO高IO场景高吞吐量复杂度高

1.3 GIL影响

GIL (Global Interpreter Lock) 同一时刻只有一个线程执行Python字节码 多线程在CPU密集型任务上无法真正并行 IO密集型任务可以通过切换获得并发收益

2. 核心功能实现

2.1 多线程编程

import threading from concurrent.futures import ThreadPoolExecutor class ThreadPoolManager: def __init__(self, max_workers=None): self.executor = ThreadPoolExecutor(max_workers=max_workers) def submit_task(self, func, *args, **kwargs): return self.executor.submit(func, *args, **kwargs) def map_tasks(self, func, iterable): return self.executor.map(func, iterable) def shutdown(self, wait=True): self.executor.shutdown(wait=wait) class ThreadSafeCounter: def __init__(self): self.count = 0 self.lock = threading.Lock() def increment(self): with self.lock: self.count += 1 def get_count(self): with self.lock: return self.count def fetch_url(url): import requests response = requests.get(url) return response.text def fetch_urls_concurrent(urls): with ThreadPoolExecutor(max_workers=10) as executor: results = list(executor.map(fetch_url, urls)) return results

2.2 多进程编程

import multiprocessing from concurrent.futures import ProcessPoolExecutor class ProcessPoolManager: def __init__(self, max_workers=None): self.executor = ProcessPoolExecutor(max_workers=max_workers) def submit_task(self, func, *args, **kwargs): return self.executor.submit(func, *args, **kwargs) def map_tasks(self, func, iterable): return self.executor.map(func, iterable) def shutdown(self, wait=True): self.executor.shutdown(wait=wait) def compute_intensive_task(data): result = 0 for i in range(1, 10000000): result += i * data return result def compute_parallel(data_list): with ProcessPoolExecutor() as executor: results = list(executor.map(compute_intensive_task, data_list)) return results class SharedMemoryManager: def __init__(self): self.manager = multiprocessing.Manager() self.shared_dict = self.manager.dict() def set_value(self, key, value): self.shared_dict[key] = value def get_value(self, key): return self.shared_dict.get(key)

2.3 异步编程

import asyncio import aiohttp class AsyncIOManager: def __init__(self): self.loop = asyncio.get_event_loop() async def fetch_url(self, url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def fetch_all_urls(self, urls): tasks = [self.fetch_url(url) for url in urls] return await asyncio.gather(*tasks) def run(self, coroutine): return self.loop.run_until_complete(coroutine) async def process_data_async(data): results = [] for item in data: result = await process_item(item) results.append(result) return results async def process_item(item): await asyncio.sleep(0.1) return item * 2

3. 性能对比

3.1 并发模型性能

任务类型单线程多线程多进程异步IO
IO密集型(100请求)100s10s15s5s
CPU密集型(4核)100s95s25s100s

3.2 线程/进程开销

操作线程进程
创建开销1ms100ms
内存开销1MB100MB
通信开销

3.3 并发框架对比

框架适用场景复杂度性能
threading简单并发
concurrent.futures任务池
asyncio高IO
multiprocessingCPU密集

4. 最佳实践

4.1 并发模式选择

def choose_concurrency_model(task_type, data_size): if task_type == 'io_bound': if data_size > 1000: return 'asyncio' return 'threading' elif task_type == 'cpu_bound': return 'multiprocessing' else: return 'sequential' class ConcurrencyStrategySelector: @staticmethod def select(strategy, func, data): strategies = { 'threading': lambda: run_with_threads(func, data), 'multiprocessing': lambda: run_with_processes(func, data), 'asyncio': lambda: run_with_async(func, data), 'sequential': lambda: [func(item) for item in data] } return strategies[strategy]()

4.2 并发编程模式

class ConcurrentDataProcessor: def __init__(self, mode='threading'): self.mode = mode def process(self, data, func): if self.mode == 'threading': return self._process_threading(data, func) elif self.mode == 'multiprocessing': return self._process_multiprocessing(data, func) elif self.mode == 'asyncio': return self._process_asyncio(data, func) def _process_threading(self, data, func): with ThreadPoolExecutor() as executor: return list(executor.map(func, data)) def _process_multiprocessing(self, data, func): with ProcessPoolExecutor() as executor: return list(executor.map(func, data)) def _process_asyncio(self, data, func): async def async_func(item): return func(item) async def process_all(): tasks = [async_func(item) for item in data] return await asyncio.gather(*tasks) return asyncio.run(process_all())

5. 总结

并发编程可以显著提升性能:

  1. 多线程:适合IO密集型任务
  2. 多进程:适合CPU密集型任务
  3. 异步IO:适合高IO吞吐量场景
  4. 选择原则:根据任务类型选择合适模型

对比数据如下:

  • IO密集型任务:异步IO > 多线程 > 多进程
  • CPU密集型任务:多进程 > 其他
  • 线程创建开销比进程小100倍
  • 推荐使用concurrent.futures简化并发编程
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/16 0:42:18

分布式内存技术:原理、应用与性能优化

1. 分布式内存技术概述在传统数据中心架构中,计算节点与内存资源采用紧耦合设计,每个计算节点配备固定容量的本地内存。这种架构存在明显的资源利用率问题——某些节点可能因内存不足而性能受限,而其他节点的内存却处于闲置状态。分布式内存技…

作者头像 李华
网站建设 2026/5/16 0:41:22

领域驱动设计(DDD)实战:构建清晰边界的企业级应用

领域驱动设计(DDD)实战:构建清晰边界的企业级应用 一、DDD概述 1.1 什么是DDD 领域驱动设计(Domain-Driven Design,DDD)是一种软件开发方法论,强调: 以业务领域为核心:将…

作者头像 李华
网站建设 2026/5/16 0:39:11

Python数据分析实战:线性回归与关联规则挖掘的完整工作流

1. 项目概述:当线性回归遇上关联规则挖掘最近在整理数据分析项目时,我经常遇到一个场景:既要预测未来的趋势,又要理解当下数据中隐藏的“共生”关系。比如,分析电商销售数据,我们既想知道下个季度的销售额&…

作者头像 李华
网站建设 2026/5/16 0:30:39

AI人工智能未来发展趋势

当ChatGPT实现自然语言的深度交互,当AI机器人走进工厂车间,当智能算法助力疫苗研发提速,人工智能已从实验室的前沿探索,成为渗透社会各领域的核心生产力。当前,AI技术正处于从“弱智能”向“强智能”跨越的关键节点&am…

作者头像 李华