说明:最近在开发一个功能,大概是一个区域下有多个电站,而一个电站下又有多个摄像头,一个摄像头上又有几百个预置位(固有的转动位置),需要转动预置位,等待10s,然后获取预置位上的PTZ(坐标值)数据;单线程的话,算了一笔时间账,需要80多个小时;多线程的话,5个就只需要16小时左右,线程池设置更大,则用时更少,就可以大幅度的减少时间提高效率。
publicbooleansetPresetPtzTask(){if(!enabled){log.info("未开启获取预置位PTZ的定时任务!");returntrue;}// 重置停止标志this.stopFlag=false;// 记录执行进度(使用原子变量确保线程安全)inttotalStations=0;AtomicIntegerexecutedStations=newAtomicInteger(0);inttotalCameras=0;AtomicIntegerexecutedCameras=newAtomicInteger(0);inttotalPresets=0;AtomicIntegerexecutedPresets=newAtomicInteger(0);try{// 查询符合条件的预置位List<CameraPresetVo>presets=baseMapper.getEffectivePresets();if(CollectionUtil.isEmpty(presets)){log.info("没有PTZ值为零的预置位数据!");returntrue;}// 按电站分组Map<String,List<CameraPresetVo>>stationPresetMap=presets.stream().collect(Collectors.groupingBy(CameraPresetVo::getOrgId));totalStations=stationPresetMap.size();// 计算总摄像头数和总预置位数for(List<CameraPresetVo>stationPresetList:stationPresetMap.values()){Map<String,List<CameraPresetVo>>cameraMap=stationPresetList.stream().collect(Collectors.groupingBy(CameraPresetVo::getCameraId));totalCameras+=cameraMap.size();// 计算每个电站的预置位数for(List<CameraPresetVo>cameraPresetList:cameraMap.values()){totalPresets+=cameraPresetList.size();}}log.info("任务开始执行,总共 {} 个电站,{} 个摄像头,{} 个预置位",totalStations,totalCameras,totalPresets);// 创建线程池// int corePoolSize = Runtime.getRuntime().availableProcessors();ExecutorServiceexecutorService=Executors.newFixedThreadPool(corePoolSize);List<Future<?>>futures=newArrayList<>();// 遍历每个电站for(Map.Entry<String,List<CameraPresetVo>>stationEntry:stationPresetMap.entrySet()){// 检查是否需要停止if(stopFlag){log.info("任务停止执行,收到停止指令");log.info("执行进度: 总共 {} 个电站,已执行 {} 个电站;总共 {} 个摄像头,已执行 {} 个摄像头;总共 {} 个预置位,已执行 {} 个预置位",totalStations,executedStations.get(),totalCameras,executedCameras.get(),totalPresets,executedPresets.get());executorService.shutdownNow();returnfalse;}StringstationId=stationEntry.getKey();List<CameraPresetVo>stationPresets=stationEntry.getValue();log.info("开始处理电站: {} ({} / {})",stationId,executedStations.get()+1,totalStations);// 按摄像头ID分组Map<String,List<CameraPresetVo>>cameraPresetMap=stationPresets.stream().collect(Collectors.groupingBy(CameraPresetVo::getCameraId));// 为每个摄像头创建任务for(Map.Entry<String,List<CameraPresetVo>>cameraEntry:cameraPresetMap.entrySet()){StringcameraId=cameraEntry.getKey();//根据预置位序号升序List<CameraPresetVo>cameraPresets=cameraEntry.getValue().stream().sorted(Comparator.comparing(CameraPresetVo::getPresetIndex)).collect(Collectors.toList());finalStringcurrentCameraId=cameraId;finalList<CameraPresetVo>currentPresets=cameraPresets;finalintcameraIndex=executedCameras.get()+1;// 提交摄像头处理任务intfinalTotalCameras=totalCameras;Future<?>future=executorService.submit(()->{log.info("开始处理摄像头: {} ({} / {})",currentCameraId,cameraIndex,finalTotalCameras);try{// 遍历每个预置位for(CameraPresetVopreset:currentPresets){// 检查是否需要停止if(stopFlag){log.info("摄像头 {} 任务停止执行,收到停止指令",currentCameraId);return;}try{// 转动到预置位CameraActionRequestDtorequestDto=newCameraActionRequestDto();requestDto.setId(currentCameraId);requestDto.setPresetIndex(preset.getPresetIndex());requestDto.setThirdApp(false);//无锁,直接调用转动预置位booleangotoResult=gotoPresetImpl(requestDto);if(!gotoResult){log.error("转动预置位失败,摄像头: {},id: {}, 预置位: {}",preset.getDevName(),currentCameraId,preset.getPresetIndex());// 更新未获取成功预置的位状态this.updatePreset(currentCameraId,preset);continue;}// 等待10秒TimeUnit.SECONDS.sleep(10);// 检查是否需要停止if(stopFlag){log.info("摄像头 {} 任务停止执行,收到停止指令",currentCameraId);return;}// 获取PTZ值PtzVoptzVo=getPtz(currentCameraId);if(ptzVo==null){log.error("获取PTZ值失败,摄像头: {},id: {}",preset.getDevName(),currentCameraId);this.updatePreset(currentCameraId,preset);continue;}// 保存PTZ值,更新状态值this.savePreset(currentCameraId,preset,ptzVo);// 预置位处理完成executedPresets.incrementAndGet();}catch(Exceptione){log.error("处理预置位失败,摄像头: {},id: {} 预置位: {}, 错误: {}",preset.getDevName(),currentCameraId,preset.getPresetIndex(),e.getMessage());// 继续处理下一个预置位}}// 摄像头处理完成intcompletedCameraCount=executedCameras.incrementAndGet();log.info("摄像头处理完成,已执行 {} / {} 个摄像头",completedCameraCount,finalTotalCameras);}catch(Exceptione){log.error("处理摄像头 {} 失败,错误: {}",currentCameraId,e.getMessage());}});futures.add(future);}// 等待当前电站的所有摄像头任务完成for(Future<?>future:futures){try{future.get();}catch(Exceptione){log.error("等待摄像头任务完成失败,错误: {}",e.getMessage());}}futures.clear();// 电站处理完成intcompletedStationCount=executedStations.incrementAndGet();log.info("电站处理完成,已执行 {} / {} 个电站",completedStationCount,totalStations);}// 关闭线程池executorService.shutdown();try{if(!executorService.awaitTermination(60,TimeUnit.SECONDS)){executorService.shutdownNow();}}catch(InterruptedExceptione){executorService.shutdownNow();Thread.currentThread().interrupt();}// 统一输出最终执行结果log.info("任务执行完成,执行结果如下:");log.info("总共 {} 个电站,已执行 {} 个电站",totalStations,executedStations.get());log.info("总共 {} 个摄像头,已执行 {} 个摄像头",totalCameras,executedCameras.get());log.info("总共 {} 个预置位,已执行 {} 个预置位",totalPresets,executedPresets.get());returntrue;}catch(Exceptione){log.error("任务执行失败,错误: {}",e.getMessage());// 统一输出执行进度log.info("执行进度如下:");log.info("总共 {} 个电站,已执行 {} 个电站",totalStations,executedStations.get());log.info("总共 {} 个摄像头,已执行 {} 个摄像头",totalCameras,executedCameras.get());log.info("总共 {} 个预置位,已执行 {} 个预置位",totalPresets,executedPresets.get());returnfalse;}}停止任务:
publicvoidstopSetPresetPtz(){this.stopFlag=true;log.info("手动停止 setPresetPtz() 任务");}全局变量:
// 停止标志,用于手动停止任务执行privatevolatilebooleanstopFlag=false;Volatile 是 Java 中一个非常有用的关键字,用于保证多线程环境下共享变量的可见性和一致性。
可见性:Volatile 可以保证所有线程都能看到共享变量的最新值。当一个线程修改了共享变量的值时,JVM 会将该值刷新到主内存中。其他线程读取共享变量的值时,会从主内存中读取最新的值。
一致性:Volatile 可以保证共享变量的写操作对所有线程都具有原子性。这意味着对共享变量的写操作要么全部成功,要么全部失败,不会出现部分成功的情况。
实现原理:
内存屏障: Volatile 会在指令序列中插入内存屏障,禁止处理器对指令进行重排序。
缓存一致性协议: Volatile 会使用缓存一致性协议来保证所有线程都能看到共享变量的最新值。
多线程的使用逻辑:
1.创建线程池,设置默认大小,或者根据运行容器获取的数量调节;
2.线程池提交任务,然后添加到Futrue进行任务管理;
3.循环等待,等待结果获取;
4.清理管理器Future;
5.关闭线程池;
6.等待未完成任务结束,异常则中断线程;