一、昇腾异构架构
1.1 为什么需要异构计算
昇腾 NPU 采用的是 CPU+NPU 分离架构,CPU 负责控制流和预处理,NPU 负责高性能矩阵运算。这种设计与 NVIDIA GPU 有明显区别——在 GPU 中 CUDA 核既是计算核心也是控制核心,而在昇腾架构中,控制逻辑和数据准备由更灵活的 CPU 处理,NPU 专注计算。
分离设计的好处在于:CPU 更适合控制流(分支判断、循环控制)、内存管理灵活(CPU 内存比 HBM 大得多)、编译简单(无需为控制流生成专用 GPU 代码)。
┌──────────────────────────────────────────────────┐ │ 昇腾 CPU + NPU 异构架构 │ ├──────────────────────────────────────────────────┤ │ │ │ CPU │ │ ┌──────────────────────────────────────────┐ │ │ │ 数据准备 │ 控制流 │ 调度 │ 预处理 │ │ │ └──────────────────────────────────────────┘ │ │ ↓ 数据拷贝 ↑ 结果回传 │ │ ┌──────────────────────────────────────────┐ │ │ │ NPU │ │ │ │ Cube(矩阵)│ Vector(向量)│ Scalar │ │ │ │ 高性能矩阵运算 │ │ │ └──────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────┘1.2 Host 与 Device 的角色划分
| 组件 | 角色 | 职责 |
|---|---|---|
| Host(CPU) | 控制与调度 | 数据准备、模型构建、内存管理、异常处理 |
| Device(NPU) | 计算执行 | 神经网络算子执行、矩阵运算、梯度计算 |
CPU 适合的任务:
- 数据读取与预处理(IO 密集)
- Python 胶水代码(控制流)
- 动态 shape 处理
- 内存分配与回收
- 调试与日志
NPU 适合的任务:
- 矩阵乘加运算(MatMul、Conv2d)
- 批量向量运算(LayerNorm、Softmax)
- 梯度计算(反向传播)
- 部署推理(前向计算)
1.3 数据传输开销
CPU 与 NPU 之间的数据传输是异构编程的主要开销:
| 操作 | 延迟 | 带宽 | 优化方向 |
|---|---|---|---|
| HBM → Host | 100-200 ns | 25 GB/s | 减少传输次数/批量传输 |
| Host → HBM | 100-200 ns | 25 GB/s | 使用 pinned memory |
| NPU 内部 | < 10 ns | 256 GB/s | 尽量在 NPU 内部完成 |
优化原则:最小化 Host-Device 交互,尽量在 NPU 上完成计算,减少数据回传。
二、Host-Device 数据交互
2.1 基础内存分配
Host 内存分配:
#include"acl/acl.h"// 分配 Host 内存void*host_ptr;size_t host_size=1024*1024;// 1MBaclError ret=acl.rt.malloc_host(&host_ptr,host_size);if(ret!=ACL_SUCCESS){printf("Failed to allocate host memory\n");}// 释放ret=acl.rt.free_host(host_ptr);NPU 内存分配:
// 分配 Device 内存void*device_ptr;size_t device_size=1024*1024;// 1MBret=acl.rt.malloc(&device_ptr,device_size);if(ret!=ACL_SUCCESS){printf("Failed to allocate device memory\n");}// 释放ret=acl.rt.free(device_ptr);2.2 同步拷贝
同步拷贝会阻塞等待数据复制完成:
8.1 及之前(同步拷贝):
// 同步拷贝 Host → Deviceret=acl.rt.memcpy(device_ptr,// 目标device_size,// 目标大小host_ptr,// 源host_size,// 源大小ACL_MEMCPY_HOST_TO_DEVICE// 方向);// 此时 Host 线程被阻塞// 同步拷贝 Device → Hostret=acl.rt.memcpy(host_ptr,// 目标host_size,// 目标大小device_ptr,// 源device_size,// 源大小ACL_MEMCPY_DEVICE_TO_HOST);8.2 新增(异步拷贝,通过 Stream):
// 创建 StreamaclrtStream stream;ret=acl.rt.create_stream(&stream);// 异步拷贝 Host → Device// 不阻塞,提交后立即返回ret=acl.rt.memcpy_async(device_ptr,device_size,host_ptr,host_size,ACL_MEMCPY_HOST_TO_DEVICE,stream);// 在其他计算中使用异步拷贝// 计算和拷贝并行// 同步等待拷贝完成ret=acl.rt.stream_synchronize(stream);2.3 Pinned Memory 加速传输
Pinned Memory(锁页内存)可以显著加速 Host-Device 传输:
// 分配 Pinned Memoryvoid*pinned_ptr;size_t pinned_size=1024*1024;ret=acl.rt.malloc_host(&pinned_ptr,pinned_size,ACL_MEM_MALLOC_HUGE_FIRST);if(ret!=ACL_SUCCESS){// fallback to normal host memoryret=acl.rt.malloc_host(&pinned_ptr,pinned_size);}// Pinned Memory 拷贝更快ret=acl.rt.memcpy(device_ptr,device_size,pinned_ptr,pinned_size,ACL_MEMCPY_HOST_TO_DEVICE);// 释放ret=acl.rt.free_host(pinned_ptr);Pinned Memory vs 普通 Host 内存:
| 特性 | 普通 Host 内存 | Pinned Memory |
|---|---|---|
| 延迟 | 较高 | 较低 |
| 带宽 | 受限于 PCIe | 接近 PCIe 上限 |
| 内存占用 | 无限制 | 受系统限制 |
| 适用场景 | 一般数据 | 频繁传输数据 |
2.4 Python 接口的数据传输
importtorchimportnumpyasnp# PyTorch Tensor 创建和拷贝# Host → NPUdata_np=np.random.randn(1024,1024).astype(np.float32)data_tensor=torch.from_numpy(data_np)data_npu=data_tensor.npu()# 自动拷贝到 NPU# NPU → Hostresult_npu=model(data_npu)result_cpu=result_npu.cpu()# 拷贝回 Host# Python 下的异步传输withtorch.npu.stream(stream):data_npu=data_tensor.npu()# 异步提交# 后续计算可以和拷贝并行三、CPU-NPU 协同计算
3.1 算子放置策略
合理的算子放置是性能关键:
# 在 NPU 上执行计算密集算子classModel(nn.Module):defforward(self,x):# CPU 预处理x=x.cpu()# 移回 CPUx=self.preprocess_cpu(x)# CPU 处理x=x.npu()# 移回 NPU# NPU 主计算x=self.conv1.npu()(x)x=self.conv2.npu()(x)x=self.matmul.npu()(x)# CPU 后处理x=x.cpu()x=self.postprocess_cpu(x)returnx3.2 流水线并行
通过流水线让 CPU 和 NPU 同时工作:
8.1 及之前(串行处理):
# CPU 处理完才到 NPU,效率低forbatchindataloader:data=preprocess_cpu(batch)# CPU 处理result=model_npu(data)# NPU 计算postprocess_cpu(result)# CPU 处理8.2 新增(流水线并行):
importthreadingfromqueueimportQueueclassPipelineModel:def__init__(self,model,preprocess,postprocess):self.model=model.npu()self.preprocess=preprocess self.postprocess=postprocess self.input_queue=Queue(maxsize=4)self.output_queue=Queue(maxsize=4)# 启动流水线线程self.preprocess_thread=threading.Thread(target=self._preprocess_loop)self.postprocess_thread=threading.Thread(target=self._postprocess_loop)self.preprocess_thread.start()self.postprocess_thread.start()def_preprocess_loop(self):forbatchindataloader:data=self.preprocess(batch)self.input_queue.put(data)def_postprocess_loop(self):whileTrue:result=self.output_queue.get()self.postprocess(result)defpredict(self,batch):# 提交到 NPU 处理data=self.input_queue.get()result=self.model(data)self.output_queue.put(result)returnresult3.3 CPU 预处理优化
预处理放在 CPU 执行可以减轻 NPU 负担:
defcpu_preprocess(image):"""CPU 预处理(resize, normalize, transpose)"""# 这些操作 CPU 更高效image=cv2.resize(image,(224,224))image=image.astype(np.float32)/255.0# 归一化mean=np.array([0.485,0.456,0.406])std=np.array([0.229,0.224,0.225])image=(image-mean)/std# 通道转换 HWC → CHWimage=np.transpose(image,(2,0,1))returnimageclassHybridModel(nn.Module):def__init__(self):self.model=MyModel().npu()defforward(self,images):# 批量预处理(CPU 多核并行)processed=[cpu_preprocess(img)forimginimages]batch=np.stack(processed)# 拷贝到 NPUbatch_npu=torch.from_numpy(batch).npu()# NPU 推理output=self.model(batch_npu)returnoutput3.4 结果回传优化
减少不必要的 CPU 回传:
# 8.1 及之前(频繁回传)definference_old(images):results=[]forimginimages:# 每次都回传 CPUresult=model(img.npu()).cpu()results.append(result)returnresults# 8.2 新增(批量回传)definference_new(images):# 批量拷贝到 NPUbatch=torch.stack([img.npu()forimginimages])# 批量推理(一次 NPU 调用)results=model(batch)# 一次性回传 CPUresults_cpu=results.cpu()returnresults_cpu四、异构场景实践
4.1 视频处理流水线
视频处理是典型的异构场景——解码用 CPU,推理用 NPU:
importcv2importthreadingfromqueueimportQueueclassVideoInferencePipeline:def__init__(self,model,max_queue_size=8):self.model=model.npu()self.frame_queue=Queue(maxsize=max_queue_size)self.result_queue=Queue()self.running=True# 启动处理线程self.decode_thread=threading.Thread(target=self._decode_loop)self.inference_thread=threading.Thread(target=self._inference_loop)self.decode_thread.start()self.inference_thread.start()def_decode_loop(self):"""CPU 解码线程"""cap=cv2.VideoCapture("video.mp4")whileself.running:ret,frame=cap.read()ifnotret:break# CPU 预处理frame_processed=self.preprocess_cpu(frame)# 加入推理队列(阻塞直到队列有空位)self.frame_queue.put(frame_processed,block=True)cap.release()self.frame_queue.put(None)# 结束标记def_inference_loop(self):"""NPU 推理线程"""whileTrue:frame=self.frame_queue.get()ifframeisNone:break# NPU 推理frame_npu=torch.from_numpy(frame).npu()result=self.model(frame_npu.unsqueeze(0))self.result_queue.put(result)defget_result(self):"""获取结果(非阻塞)"""try:returnself.result_queue.get_nowait()except:returnNonedefstop(self):self.running=Falseself.decode_thread.join()self.inference_thread.join()4.2 大批量数据分块处理
内存受限时需要分块处理:
classChunkedInference:def__init__(self,model,chunk_size=32):self.model=model.npu()self.chunk_size=chunk_sizedefpredict(self,data,labels=None):"""大批量数据分块推理"""n_samples=len(data)results=[]foriinrange(0,n_samples,self.chunk_size):chunk=data[i:i+self.chunk_size]# 分块拷贝到 NPUchunk_npu=chunk.npu()# 分块推理chunk_result=self.model(chunk_npu)# 分块回传results.append(chunk_result.cpu())# 释放中间张量delchunk_npu,chunk_result torch.npu.empty_cache()returntorch.cat(results,dim=0)defpredict_with_labels(self,data,labels,metric_fn):"""分块推理并计算指标(内存友好)"""n_samples=len(data)metrics=[]foriinrange(0,n_samples,self.chunk_size):chunk=data[i:i+self.chunk_size]chunk_labels=labels[i:i+self.chunk_size]chunk_npu=chunk.npu()chunk_labels_npu=chunk_labels.npu()output=self.model(chunk_npu)metric=metric_fn(output,chunk_labels_npu)metrics.append(metric.item())delchunk_npu,chunk_labels_npu,output torch.npu.empty_cache()returnnp.mean(metrics)4.3 动态 shape 处理
动态 shape 场景下 CPU 处理更灵活:
classDynamicShapeModel:def__init__(self,model):self.model=model.npu()defpredict(self,input_dict):"""处理不同 shape 的输入"""# CPU 端解析不同格式的输入batch_data=self._cpu_preprocess(input_dict)# 动态 shape 处理max_len=max(len(x)forxinbatch_data)batch_padded=self._pad_sequence(batch_data,max_len)# NPU 推理batch_npu=batch_padded.npu()output=self.model(batch_npu)# CPU 端还原原始 shaperesult=self._cpu_postprocess(output,batch_data)returnresultdef_cpu_preprocess(self,input_dict):"""CPU 端预处理"""ifisinstance(input_dict,dict):returninput_dict['texts']returninput_dictdef_pad_sequence(self,sequences,max_len):"""CPU 端填充"""padded=[]forseqinsequences:iflen(seq)<max_len:seq=seq+[0]*(max_len-len(seq))padded.append(seq)returntorch.LongTensor(padded)def_cpu_postprocess(self,output,original_data):"""CPU 端后处理"""returnoutput.cpu().numpy()五、性能调优
5.1 数据传输瓶颈诊断
importtimeclassDataTransferProfiler:defprofile(self,data):"""分析数据传输开销"""# CPU → NPUcopy_start=time.time()data_npu=data.npu()copy_time=time.time()-copy_start# NPU 计算compute_start=time.time()result=self.model(data_npu)compute_time=time.time()-compute_start# NPU → CPUresult_cpu=result.cpu()result_time=time.time()-compute_start-copy_start total=copy_time+compute_time+result_timeprint(f""" 传输性能分析: CPU→NPU 拷贝:{copy_time*1000:.2f}ms ({copy_time/total*100:.1f}%) NPU 计算:{compute_time*1000:.2f}ms ({compute_time/total*100:.1f}%) NPU→CPU 回传:{result_time*1000:.2f}ms ({result_time/total*100:.1f}%) 总计:{total*1000:.2f}ms """)return{'copy':copy_time,'compute':compute_time,'result':result_time,'total':total}5.2 优化策略总结
| 策略 | 说明 | 收益 |
|---|---|---|
| 批量处理 | 一次拷贝多个样本 | 减少固定开销 |
| 流水线 | CPU/NPU 并行工作 | 隐藏延迟 |
| pinned memory | 使用锁页内存传输 | 提升带宽 |
| 异步拷贝 | 计算和传输 overlap | 隐藏传输开销 |
| 减少回传 | 只在必要时回传 CPU | 减少传输次数 |
六、常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 拷贝性能差 | 使用普通 Host 内存 | 使用 pinned memory |
| CPU 成为瓶颈 | 预处理太慢 | 使用流水线并行 |
| 显存不足 | batch size 过大 | 分块处理 |
| NPU 空闲 | 数据供给不及时 | 使用异步预取 |
| 结果不对齐 | shape 不匹配 | 检查预处理流程 |
| 多线程不安全 | ACL 非线程安全 | 使用线程锁或进程隔离 |
相关仓库
- ascend-cl- ACL 异构编程接口 https://gitee.com/ascend/ascend-cl
- torch_npu- PyTorch NPU 协同接口 https://gitee.com/ascend/torch_npu
- ascend-toolkit- Profiling 工具 https://gitee.com/ascend/ascend-toolkit