1.并行集合
1.并行集合
a.在C#中,并行集合(ConcurrentCollections)是.NET 为多线程/并行编程设计的线程安全集合 b.位于System.Collections.Concurrent命名空间,专门解决普通集合(如List<T>、Dictionary<TKey,TValue>)在多线程 读写时的线程安全问题(普通集合非线程安全,多线程操作会导致数据错乱)
1).ConcurrentDictionary<TKey,TValue>(最常用)a.线程安全的字典,是多线程场景下Dictionary的直接替代,核心优势是原子操作方法 b.避免手动加锁实现"检查 - 添加","更新"等复合操作)
usingSystem;usingSystem.Collections.Concurrent;usingSystem.Threading.Tasks;classConcurrentDictDemo{staticvoidMain(){// 初始化并发字典varconcurrentDict=newConcurrentDictionary<int,string>();// 并行循环(多线程)添加元素Parallel.For(0,100,i=>{// 原子操作:不存在则添加,存在则返回已有值concurrentDict.GetOrAdd(i,$"Value_{i}");// 原子操作:尝试更新(只有当前值匹配时才更新)concurrentDict.TryUpdate(i,$"Updated_{i}",$"Value_{i}");});// 多线程查询Parallel.For(0,100,i=>{if(concurrentDict.TryGetValue(i,outvarvalue)){Console.WriteLine($"Key:{i}, Value:{value}");}});}}
2).ConcurrentQueue<T>线程安全的FIFO队列,适合"生产者线程添加任务,消费者线程处理任务"的场景(如后台任务池)
usingSystem;usingSystem.Collections.Concurrent;usingSystem.Threading;usingSystem.Threading.Tasks;classConcurrentQueueDemo{staticvoidMain(){varqueue=newConcurrentQueue<int>();varcts=newCancellationTokenSource();// 生产者线程:持续添加数据Taskproducer=Task.Run(()=>{inti=0;while(!cts.Token.IsCancellationRequested){queue.Enqueue(i++);Console.WriteLine($"生产:{i-1}");Thread.Sleep(100);}});// 消费者线程:持续消费数据Taskconsumer=Task.Run(()=>{while(!cts.Token.IsCancellationRequested){if(queue.TryDequeue(outintvalue)){Console.WriteLine($"消费:{value}");}Thread.Sleep(150);}});// 运行5秒后停止Thread.Sleep(5000);cts.Cancel();Task.WaitAll(producer,consumer);}}
3).BlockingCollection<T>(增强版生产者-消费者)封装了ConcurrentQueue/ConcurrentBag等底层集合,提供阻塞操作("无数据时消费者阻塞, 队列满时生产者阻塞")和边界 限制(限制集合最大容量),是生产者-消费者场景的一站式解决方案
usingSystem;usingSystem.Collections.Concurrent;usingSystem.Threading;usingSystem.Threading.Tasks;classBlockingCollectionDemo{staticvoidMain(){// 初始化:底层用ConcurrentQueue,最大容量10varblockingCollection=newBlockingCollection<int>(newConcurrentQueue<int>(),10);varcts=newCancellationTokenSource();// 生产者(2个线程)Parallel.For(0,2,producerId=>{inti=0;while(!cts.Token.IsCancellationRequested){intvalue=producerId*1000+i++;// 队列满时会阻塞,直到有空间blockingCollection.Add(value,cts.Token);Console.WriteLine($"生产者{producerId}添加:{value}");Thread.Sleep(200);}});// 消费者(3个线程)Parallel.For(0,3,consumerId=>{try{// 无数据时阻塞,直到有数据或完成添加foreach(varvalueinblockingCollection.GetConsumingEnumerable(cts.Token)){Console.WriteLine($"消费者{consumerId}处理:{value}");Thread.Sleep(300);}}catch(OperationCanceledException){Console.WriteLine($"消费者{consumerId}停止");}});// 运行10秒后停止Thread.Sleep(10000);cts.Cancel();blockingCollection.CompleteAdding();// 标记“添加完成”,消费者遍历结束}}