news 2026/5/10 6:19:54

Spring Boot 4.0 + MyBatis-Plus 实战响应式编程的能力实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Spring Boot 4.0 + MyBatis-Plus 实战响应式编程的能力实战

Spring Boot 4.0 + MyBatis-Plus 实战响应式编程的能力实战

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

目前 MyBatis-Plus 官方还不完全支持响应式编程,但我们可以结合 R2DBC 和 MyBatis-Plus 的部分特性来实现。这里提供两种方案:

方案一:使用 MyBatis-Plus 增强 R2DBC (推荐)

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

1. 项目依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version><!-- 注意:只使用其工具类,不启用SQL执行 --></dependency><!-- 数据库驱动 --><dependency><groupId>io.asyncer</groupId><artifactId>r2dbc-mysql</artifactId><version>1.0.2</version></dependency><!-- 或 PostgreSQL --><!-- <dependency> <groupId>org.postgresql</groupId> <artifactId>r2dbc-postgresql</artifactId> <scope>runtime</scope> </dependency> -->

2. 配置类

importcom.baomidou.mybatisplus.annotation.DbType;importcom.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;importcom.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.data.r2dbc.config.EnableR2dbcAuditing;@Configuration@EnableR2dbcAuditingpublicclassR2dbcMybatisConfig{/** * 只使用 MyBatis-Plus 的分页插件 */@BeanpublicMybatisPlusInterceptormybatisPlusInterceptor(){MybatisPlusInterceptorinterceptor=newMybatisPlusInterceptor();interceptor.addInnerInterceptor(newPaginationInnerInterceptor(DbType.MYSQL));returninterceptor;}}

3. 实体类 (使用 MyBatis-Plus 注解)

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

importcom.baomidou.mybatisplus.annotation.*;importlombok.Data;importorg.springframework.data.annotation.CreatedDate;importorg.springframework.data.annotation.LastModifiedDate;importorg.springframework.data.relational.core.mapping.Table;importjava.time.LocalDateTime;@Data@Table("users")publicclassUser{@TableId(type=IdType.AUTO)privateLongid;@TableField("username")privateStringusername;@TableField("email")privateStringemail;@TableField("password")privateStringpassword;@TableField("age")privateIntegerage;@TableLogic@TableField("deleted")privateIntegerdeleted=0;@TableField(value="version",fill=FieldFill.INSERT)@VersionprivateIntegerversion=1;@CreatedDate@TableField("create_time")privateLocalDateTimecreateTime;@LastModifiedDate@TableField("update_time")privateLocalDateTimeupdateTime;// 响应式编程友好的构造方法publicstaticMono<User>of(Stringusername,Stringemail){Useruser=newUser();user.setUsername(username);user.setEmail(email);returnMono.just(user);}}

4. Repository 接口 (R2DBC)

importorg.springframework.data.r2dbc.repository.R2dbcRepository;importorg.springframework.data.r2dbc.repository.Query;importorg.springframework.stereotype.Repository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;@RepositorypublicinterfaceUserR2dbcRepositoryextendsR2dbcRepository<User,Long>{Mono<User>findByUsername(Stringusername);Mono<User>findByEmail(Stringemail);Flux<User>findByAgeGreaterThan(Integerage);@Query("SELECT * FROM users WHERE username LIKE :keyword OR email LIKE :keyword")Flux<User>searchUsers(Stringkeyword);@Query("UPDATE users SET age = :age WHERE id = :id")Mono<Integer>updateAgeById(Longid,Integerage);}

5. Service 层 (结合 MyBatis-Plus 工具)

importcom.baomidou.mybatisplus.core.conditions.query.QueryWrapper;importcom.baomidou.mybatisplus.extension.plugins.pagination.Page;importlombok.RequiredArgsConstructor;importorg.springframework.data.domain.Pageable;importorg.springframework.r2dbc.core.DatabaseClient;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importreactor.core.scheduler.Schedulers;importjava.time.Duration;importjava.util.Map;@Service@RequiredArgsConstructorpublicclassReactiveUserService{privatefinalUserR2dbcRepositoryuserRepository;privatefinalDatabaseClientdatabaseClient;publicMono<User>createUser(Useruser){returnuserRepository.save(user);}publicMono<User>getUserById(Longid){returnuserRepository.findById(id).switchIfEmpty(Mono.error(newRuntimeException("User not found")));}publicFlux<User>getAllUsers(){returnuserRepository.findAll().delayElements(Duration.ofMillis(100))// 模拟流处理.subscribeOn(Schedulers.boundedElastic());}publicMono<User>updateUser(Longid,Useruser){returnuserRepository.findById(id).flatMap(existing->{existing.setUsername(user.getUsername());existing.setEmail(user.getEmail());existing.setAge(user.getAge());returnuserRepository.save(existing);});}@TransactionalpublicMono<Void>deleteUser(Longid){returnuserRepository.deleteById(id).then(Mono.fromRunnable(()->System.out.println("User deleted: "+id)));}/** * 使用 MyBatis-Plus 的 QueryWrapper 构建查询条件 * 然后转换为 R2DBC 查询 */publicFlux<User>queryUsers(Map<String,Object>params){// 使用 MyBatis-Plus 的 QueryWrapper 构建条件QueryWrapper<User>queryWrapper=newQueryWrapper<>();if(params.containsKey("username")){queryWrapper.like("username",params.get("username"));}if(params.containsKey("email")){queryWrapper.like("email",params.get("email"));}if(params.containsKey("minAge")){queryWrapper.ge("age",params.get("minAge"));}if(params.containsKey("maxAge")){queryWrapper.le("age",params.get("maxAge"));}queryWrapper.orderByDesc("create_time");// 将 QueryWrapper 转换为 SQLStringsql=buildQueryWrapperSql(queryWrapper);// 执行响应式查询returndatabaseClient.sql(sql).fetch().all().map(row->{Useruser=newUser();user.setId((Long)row.get("id"));user.setUsername((String)row.get("username"));user.setEmail((String)row.get("email"));user.setAge((Integer)row.get("age"));returnuser;});}/** * 响应式分页查询 */publicMono<Page<User>>getUsersPage(Pageablepageable){// 使用 MyBatis-Plus 的 Page 对象Page<User>mybatisPage=newPage<>(pageable.getPageNumber(),pageable.getPageSize());// 计算总数Mono<Long>countMono=databaseClient.sql("SELECT COUNT(*) FROM users").map(row->row.get(0,Long.class)).one();// 查询数据Flux<User>usersFlux=databaseClient.sql("SELECT * FROM users ORDER BY create_time DESC LIMIT :limit OFFSET :offset").bind("limit",pageable.getPageSize()).bind("offset",pageable.getOffset()).fetch().all().map(this::mapRowToUser);returnMono.zip(countMono,usersFlux.collectList()).map(tuple->{mybatisPage.setTotal(tuple.getT1());mybatisPage.setRecords(tuple.getT2());returnmybatisPage;});}privateStringbuildQueryWrapperSql(QueryWrapper<User>queryWrapper){// 简化示例,实际需要更复杂的转换return"SELECT * FROM users WHERE "+queryWrapper.getTargetSql();}privateUsermapRowToUser(Map<String,Object>row){Useruser=newUser();user.setId((Long)row.get("id"));user.setUsername((String)row.get("username"));user.setEmail((String)row.get("email"));user.setAge((Integer)row.get("age"));returnuser;}/** * 批量保存 */publicFlux<User>saveAll(Flux<User>users){returnuserRepository.saveAll(users).onErrorContinue((error,user)->System.err.println("Error saving user: "+error.getMessage()));}/** * 流式查询 */publicFlux<User>streamUsers(){returndatabaseClient.sql("SELECT * FROM users").fetch().all().delayElements(Duration.ofMillis(50))// 控制流速度.map(this::mapRowToUser);}}

6. Controller 层

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

importcom.baomidou.mybatisplus.extension.plugins.pagination.Page;importlombok.RequiredArgsConstructor;importorg.springframework.data.domain.PageRequest;importorg.springframework.http.HttpStatus;importorg.springframework.http.MediaType;importorg.springframework.http.ResponseEntity;importorg.springframework.web.bind.annotation.*;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importjavax.validation.Valid;importjava.time.Duration;importjava.util.Map;@RestController@RequestMapping("/api/reactive/users")@RequiredArgsConstructorpublicclassReactiveUserController{privatefinalReactiveUserServiceuserService;@PostMapping@ResponseStatus(HttpStatus.CREATED)publicMono<ResponseEntity<User>>create(@Valid@RequestBodyUseruser){returnuserService.createUser(user).map(saved->ResponseEntity.status(HttpStatus.CREATED).body(saved)).onErrorResume(e->Mono.just(ResponseEntity.badRequest().build()));}@GetMapping("/{id}")publicMono<ResponseEntity<User>>getById(@PathVariableLongid){returnuserService.getUserById(id).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.notFound().build());}@GetMappingpublicFlux<User>getAll(){returnuserService.getAllUsers();}@GetMapping("/search")publicFlux<User>search(@RequestParamMap<String,Object>params){returnuserService.queryUsers(params);}@GetMapping("/page")publicMono<Page<User>>getPage(@RequestParam(defaultValue="0")intpage,@RequestParam(defaultValue="10")intsize){PageRequestpageRequest=PageRequest.of(page,size);returnuserService.getUsersPage(pageRequest);}@PutMapping("/{id}")publicMono<ResponseEntity<User>>update(@PathVariableLongid,@Valid@RequestBodyUseruser){returnuserService.updateUser(id,user).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.notFound().build());}@DeleteMapping("/{id}")publicMono<ResponseEntity<Void>>delete(@PathVariableLongid){returnuserService.deleteUser(id).then(Mono.just(ResponseEntity.noContent().<Void>build())).defaultIfEmpty(ResponseEntity.notFound().build());}/** * Server-Sent Events (SSE) 流式接口 */@GetMapping(value="/stream",produces=MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<User>stream(){returnuserService.streamUsers();}/** * WebFlux WebSocket 支持 */@MessageMapping("users.chat")publicFlux<UserMessage>userChat(Flux<UserMessage>messages){returnmessages.doOnNext(message->System.out.println("Received: "+message.getContent())).map(message->newUserMessage("Server: "+message.getContent())).delayElements(Duration.ofSeconds(1));}/** * 批量操作 */@PostMapping("/batch")publicFlux<User>batchCreate(@RequestBodyFlux<User>users){returnuserService.saveAll(users);}}

7. 自定义响应式 Repository

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

importcom.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;importorg.springframework.data.r2dbc.repository.R2dbcRepository;importorg.springframework.data.repository.reactive.ReactiveCrudRepository;importorg.springframework.r2dbc.core.DatabaseClient;importorg.springframework.stereotype.Repository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;@RepositorypublicinterfaceCustomReactiveRepository{/** * 使用 MyBatis-Plus 的 Lambda 查询 */Flux<User>findUsersByCondition(LambdaQueryWrapper<User>wrapper);/** * 响应式分页查询 */Mono<Page<User>>findPage(Page<User>page,LambdaQueryWrapper<User>wrapper);}

8. 响应式事务配置

importorg.springframework.context.annotation.Configuration;importorg.springframework.transaction.ReactiveTransactionManager;importorg.springframework.transaction.reactive.TransactionalOperator;@ConfigurationpublicclassReactiveTransactionConfig{@BeanpublicTransactionalOperatortransactionalOperator(ReactiveTransactionManagertransactionManager){returnTransactionalOperator.create(transactionManager);}}

方案二:使用 MyBatis-Plus 响应式扩展 (第三方)

有一些第三方项目正在尝试为 MyBatis-Plus 添加响应式支持:
Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

1. 添加依赖

<!-- 第三方响应式扩展 --><dependency><groupId>com.github.yulichang</groupId><artifactId>mybatis-plus-join</artifactId><version>1.4.6</version></dependency>

2. 自定义响应式 Mapper

importorg.apache.ibatis.annotations.SelectProvider;importorg.springframework.data.repository.reactive.ReactiveCrudRepository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;publicinterfaceReactiveBaseMapper<T>{Mono<Integer>insertReactive(Tentity);Mono<Integer>updateByIdReactive(Tentity);Mono<T>selectByIdReactive(Serializableid);Flux<T>selectListReactive(Wrapper<T>queryWrapper);Mono<Integer>deleteByIdReactive(Serializableid);}

重要提示

  1. MyBatis-Plus 官方还不完全支持响应式,上述方案是结合 R2DBC 和 MyBatis-Plus 的工具类
  2. 真正的响应式编程需要使用 R2DBC 或 MongoDB Reactive
  3. 如果需要复杂 SQL 查询,可以使用 DatabaseClient 或 R2DBC Entity Callbacks
  4. 生产环境建议使用成熟的响应式数据库驱动

完整配置 application.yml

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

spring:r2dbc:url:r2dbc:mysql://localhost:3306/reactive_dbusername:rootpassword:passwordpool:initial-size:5max-size:20max-idle-time:30mwebflux:base-path:/apistatic-path-pattern:/static/**codec:max-in-memory-size:10MBlogging:level:org.springframework.r2dbc:DEBUGio.r2dbc:DEBUG

这种架构结合了 MyBatis-Plus 的便利性和 R2DBC 的响应式能力,适合需要复杂查询的场景。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/8 4:46:43

IT运维托管真的能省心又省钱吗?

办公室突然断网&#xff0c;会议正在进行中&#xff0c;PPT传不上大屏;员工集体打不开邮箱&#xff0c;客服电话开始堆积——这种场景你是不是太熟悉了?很多企业主都问过我一个问题&#xff1a;“我们到底要不要做IT运维托管?”这不像买台打印机那么简单&#xff0c;它牵扯到…

作者头像 李华
网站建设 2026/5/2 18:54:28

55、网络配置与邮件管理全解析

网络配置与邮件管理全解析 1. 远程执行命令与邮件访问概述 在网络操作中,有时需要在远程计算机上执行命令并将结果显示在本地终端。例如,要在远程服务器 remoteserver 上运行 who 命令并将结果显示到本地终端,可以使用以下命令: [root@server1 root]# rsh remotese…

作者头像 李华
网站建设 2026/5/2 22:28:06

【拯救HMI】我们为什么要重新定义工业交互?

别让一台百万级的设备&#xff0c;毁在了一块「丑」屏幕上 在工业4.0的浪潮下&#xff0c;我们的设备越来越精密&#xff0c;算法越来越智能。但请回头看看我们设备上的那块屏幕——它是充满了“年代感”的按钮堆砌&#xff1f;还是操作逻辑混乱的参数迷宫&#xff1f; 在装备制…

作者头像 李华
网站建设 2026/4/30 9:11:49

57、Linux常见问题排查与解决指南

Linux常见问题排查与解决指南 一、PAM与登录问题 1.1 PAM配置检查 若在查看 /etc/pam.d/system - auth 时无任何消息,可查看 auth 部分的第二行。建议登录系统并对 /etc/pam.d/login 进行修改以查看结果。操作时,记得保留一个虚拟终端处于登录状态,避免因修改文件出…

作者头像 李华
网站建设 2026/5/4 14:32:37

【轨物交流】兰溪商会企业家代表团携手杭电科技园领导共访轨物科技

2025年12月11日&#xff0c;杭州兰溪商会及多家兰溪企业代表在杭州电子科技大学大学科技园相关领导的陪同下&#xff0c;走进杭州轨物科技有限公司&#xff08;以下简称“轨物科技”&#xff09;&#xff0c;就“赋能产业升级共探创新路径”等议题展开深入交流。轨物科技总经理…

作者头像 李华