Spring Boot 4.0 + MyBatis-Plus 实战响应式编程的能力实战
Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!
目前 MyBatis-Plus 官方还不完全支持响应式编程,但我们可以结合 R2DBC 和 MyBatis-Plus 的部分特性来实现。这里提供两种方案:
方案一:使用 MyBatis-Plus 增强 R2DBC (推荐)
Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!
1. 项目依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version><!-- 注意:只使用其工具类,不启用SQL执行 --></dependency><!-- 数据库驱动 --><dependency><groupId>io.asyncer</groupId><artifactId>r2dbc-mysql</artifactId><version>1.0.2</version></dependency><!-- 或 PostgreSQL --><!-- <dependency> <groupId>org.postgresql</groupId> <artifactId>r2dbc-postgresql</artifactId> <scope>runtime</scope> </dependency> -->2. 配置类
importcom.baomidou.mybatisplus.annotation.DbType;importcom.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;importcom.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.data.r2dbc.config.EnableR2dbcAuditing;@Configuration@EnableR2dbcAuditingpublicclassR2dbcMybatisConfig{/** * 只使用 MyBatis-Plus 的分页插件 */@BeanpublicMybatisPlusInterceptormybatisPlusInterceptor(){MybatisPlusInterceptorinterceptor=newMybatisPlusInterceptor();interceptor.addInnerInterceptor(newPaginationInnerInterceptor(DbType.MYSQL));returninterceptor;}}3. 实体类 (使用 MyBatis-Plus 注解)
Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!
importcom.baomidou.mybatisplus.annotation.*;importlombok.Data;importorg.springframework.data.annotation.CreatedDate;importorg.springframework.data.annotation.LastModifiedDate;importorg.springframework.data.relational.core.mapping.Table;importjava.time.LocalDateTime;@Data@Table("users")publicclassUser{@TableId(type=IdType.AUTO)privateLongid;@TableField("username")privateStringusername;@TableField("email")privateStringemail;@TableField("password")privateStringpassword;@TableField("age")privateIntegerage;@TableLogic@TableField("deleted")privateIntegerdeleted=0;@TableField(value="version",fill=FieldFill.INSERT)@VersionprivateIntegerversion=1;@CreatedDate@TableField("create_time")privateLocalDateTimecreateTime;@LastModifiedDate@TableField("update_time")privateLocalDateTimeupdateTime;// 响应式编程友好的构造方法publicstaticMono<User>of(Stringusername,Stringemail){Useruser=newUser();user.setUsername(username);user.setEmail(email);returnMono.just(user);}}4. Repository 接口 (R2DBC)
importorg.springframework.data.r2dbc.repository.R2dbcRepository;importorg.springframework.data.r2dbc.repository.Query;importorg.springframework.stereotype.Repository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;@RepositorypublicinterfaceUserR2dbcRepositoryextendsR2dbcRepository<User,Long>{Mono<User>findByUsername(Stringusername);Mono<User>findByEmail(Stringemail);Flux<User>findByAgeGreaterThan(Integerage);@Query("SELECT * FROM users WHERE username LIKE :keyword OR email LIKE :keyword")Flux<User>searchUsers(Stringkeyword);@Query("UPDATE users SET age = :age WHERE id = :id")Mono<Integer>updateAgeById(Longid,Integerage);}5. Service 层 (结合 MyBatis-Plus 工具)
importcom.baomidou.mybatisplus.core.conditions.query.QueryWrapper;importcom.baomidou.mybatisplus.extension.plugins.pagination.Page;importlombok.RequiredArgsConstructor;importorg.springframework.data.domain.Pageable;importorg.springframework.r2dbc.core.DatabaseClient;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importreactor.core.scheduler.Schedulers;importjava.time.Duration;importjava.util.Map;@Service@RequiredArgsConstructorpublicclassReactiveUserService{privatefinalUserR2dbcRepositoryuserRepository;privatefinalDatabaseClientdatabaseClient;publicMono<User>createUser(Useruser){returnuserRepository.save(user);}publicMono<User>getUserById(Longid){returnuserRepository.findById(id).switchIfEmpty(Mono.error(newRuntimeException("User not found")));}publicFlux<User>getAllUsers(){returnuserRepository.findAll().delayElements(Duration.ofMillis(100))// 模拟流处理.subscribeOn(Schedulers.boundedElastic());}publicMono<User>updateUser(Longid,Useruser){returnuserRepository.findById(id).flatMap(existing->{existing.setUsername(user.getUsername());existing.setEmail(user.getEmail());existing.setAge(user.getAge());returnuserRepository.save(existing);});}@TransactionalpublicMono<Void>deleteUser(Longid){returnuserRepository.deleteById(id).then(Mono.fromRunnable(()->System.out.println("User deleted: "+id)));}/** * 使用 MyBatis-Plus 的 QueryWrapper 构建查询条件 * 然后转换为 R2DBC 查询 */publicFlux<User>queryUsers(Map<String,Object>params){// 使用 MyBatis-Plus 的 QueryWrapper 构建条件QueryWrapper<User>queryWrapper=newQueryWrapper<>();if(params.containsKey("username")){queryWrapper.like("username",params.get("username"));}if(params.containsKey("email")){queryWrapper.like("email",params.get("email"));}if(params.containsKey("minAge")){queryWrapper.ge("age",params.get("minAge"));}if(params.containsKey("maxAge")){queryWrapper.le("age",params.get("maxAge"));}queryWrapper.orderByDesc("create_time");// 将 QueryWrapper 转换为 SQLStringsql=buildQueryWrapperSql(queryWrapper);// 执行响应式查询returndatabaseClient.sql(sql).fetch().all().map(row->{Useruser=newUser();user.setId((Long)row.get("id"));user.setUsername((String)row.get("username"));user.setEmail((String)row.get("email"));user.setAge((Integer)row.get("age"));returnuser;});}/** * 响应式分页查询 */publicMono<Page<User>>getUsersPage(Pageablepageable){// 使用 MyBatis-Plus 的 Page 对象Page<User>mybatisPage=newPage<>(pageable.getPageNumber(),pageable.getPageSize());// 计算总数Mono<Long>countMono=databaseClient.sql("SELECT COUNT(*) FROM users").map(row->row.get(0,Long.class)).one();// 查询数据Flux<User>usersFlux=databaseClient.sql("SELECT * FROM users ORDER BY create_time DESC LIMIT :limit OFFSET :offset").bind("limit",pageable.getPageSize()).bind("offset",pageable.getOffset()).fetch().all().map(this::mapRowToUser);returnMono.zip(countMono,usersFlux.collectList()).map(tuple->{mybatisPage.setTotal(tuple.getT1());mybatisPage.setRecords(tuple.getT2());returnmybatisPage;});}privateStringbuildQueryWrapperSql(QueryWrapper<User>queryWrapper){// 简化示例,实际需要更复杂的转换return"SELECT * FROM users WHERE "+queryWrapper.getTargetSql();}privateUsermapRowToUser(Map<String,Object>row){Useruser=newUser();user.setId((Long)row.get("id"));user.setUsername((String)row.get("username"));user.setEmail((String)row.get("email"));user.setAge((Integer)row.get("age"));returnuser;}/** * 批量保存 */publicFlux<User>saveAll(Flux<User>users){returnuserRepository.saveAll(users).onErrorContinue((error,user)->System.err.println("Error saving user: "+error.getMessage()));}/** * 流式查询 */publicFlux<User>streamUsers(){returndatabaseClient.sql("SELECT * FROM users").fetch().all().delayElements(Duration.ofMillis(50))// 控制流速度.map(this::mapRowToUser);}}6. Controller 层
Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!
importcom.baomidou.mybatisplus.extension.plugins.pagination.Page;importlombok.RequiredArgsConstructor;importorg.springframework.data.domain.PageRequest;importorg.springframework.http.HttpStatus;importorg.springframework.http.MediaType;importorg.springframework.http.ResponseEntity;importorg.springframework.web.bind.annotation.*;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importjavax.validation.Valid;importjava.time.Duration;importjava.util.Map;@RestController@RequestMapping("/api/reactive/users")@RequiredArgsConstructorpublicclassReactiveUserController{privatefinalReactiveUserServiceuserService;@PostMapping@ResponseStatus(HttpStatus.CREATED)publicMono<ResponseEntity<User>>create(@Valid@RequestBodyUseruser){returnuserService.createUser(user).map(saved->ResponseEntity.status(HttpStatus.CREATED).body(saved)).onErrorResume(e->Mono.just(ResponseEntity.badRequest().build()));}@GetMapping("/{id}")publicMono<ResponseEntity<User>>getById(@PathVariableLongid){returnuserService.getUserById(id).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.notFound().build());}@GetMappingpublicFlux<User>getAll(){returnuserService.getAllUsers();}@GetMapping("/search")publicFlux<User>search(@RequestParamMap<String,Object>params){returnuserService.queryUsers(params);}@GetMapping("/page")publicMono<Page<User>>getPage(@RequestParam(defaultValue="0")intpage,@RequestParam(defaultValue="10")intsize){PageRequestpageRequest=PageRequest.of(page,size);returnuserService.getUsersPage(pageRequest);}@PutMapping("/{id}")publicMono<ResponseEntity<User>>update(@PathVariableLongid,@Valid@RequestBodyUseruser){returnuserService.updateUser(id,user).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.notFound().build());}@DeleteMapping("/{id}")publicMono<ResponseEntity<Void>>delete(@PathVariableLongid){returnuserService.deleteUser(id).then(Mono.just(ResponseEntity.noContent().<Void>build())).defaultIfEmpty(ResponseEntity.notFound().build());}/** * Server-Sent Events (SSE) 流式接口 */@GetMapping(value="/stream",produces=MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<User>stream(){returnuserService.streamUsers();}/** * WebFlux WebSocket 支持 */@MessageMapping("users.chat")publicFlux<UserMessage>userChat(Flux<UserMessage>messages){returnmessages.doOnNext(message->System.out.println("Received: "+message.getContent())).map(message->newUserMessage("Server: "+message.getContent())).delayElements(Duration.ofSeconds(1));}/** * 批量操作 */@PostMapping("/batch")publicFlux<User>batchCreate(@RequestBodyFlux<User>users){returnuserService.saveAll(users);}}7. 自定义响应式 Repository
Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!
importcom.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;importorg.springframework.data.r2dbc.repository.R2dbcRepository;importorg.springframework.data.repository.reactive.ReactiveCrudRepository;importorg.springframework.r2dbc.core.DatabaseClient;importorg.springframework.stereotype.Repository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;@RepositorypublicinterfaceCustomReactiveRepository{/** * 使用 MyBatis-Plus 的 Lambda 查询 */Flux<User>findUsersByCondition(LambdaQueryWrapper<User>wrapper);/** * 响应式分页查询 */Mono<Page<User>>findPage(Page<User>page,LambdaQueryWrapper<User>wrapper);}8. 响应式事务配置
importorg.springframework.context.annotation.Configuration;importorg.springframework.transaction.ReactiveTransactionManager;importorg.springframework.transaction.reactive.TransactionalOperator;@ConfigurationpublicclassReactiveTransactionConfig{@BeanpublicTransactionalOperatortransactionalOperator(ReactiveTransactionManagertransactionManager){returnTransactionalOperator.create(transactionManager);}}方案二:使用 MyBatis-Plus 响应式扩展 (第三方)
有一些第三方项目正在尝试为 MyBatis-Plus 添加响应式支持:
Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!
1. 添加依赖
<!-- 第三方响应式扩展 --><dependency><groupId>com.github.yulichang</groupId><artifactId>mybatis-plus-join</artifactId><version>1.4.6</version></dependency>2. 自定义响应式 Mapper
importorg.apache.ibatis.annotations.SelectProvider;importorg.springframework.data.repository.reactive.ReactiveCrudRepository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;publicinterfaceReactiveBaseMapper<T>{Mono<Integer>insertReactive(Tentity);Mono<Integer>updateByIdReactive(Tentity);Mono<T>selectByIdReactive(Serializableid);Flux<T>selectListReactive(Wrapper<T>queryWrapper);Mono<Integer>deleteByIdReactive(Serializableid);}重要提示
- MyBatis-Plus 官方还不完全支持响应式,上述方案是结合 R2DBC 和 MyBatis-Plus 的工具类
- 真正的响应式编程需要使用 R2DBC 或 MongoDB Reactive
- 如果需要复杂 SQL 查询,可以使用 DatabaseClient 或 R2DBC Entity Callbacks
- 生产环境建议使用成熟的响应式数据库驱动
完整配置 application.yml
Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!
spring:r2dbc:url:r2dbc:mysql://localhost:3306/reactive_dbusername:rootpassword:passwordpool:initial-size:5max-size:20max-idle-time:30mwebflux:base-path:/apistatic-path-pattern:/static/**codec:max-in-memory-size:10MBlogging:level:org.springframework.r2dbc:DEBUGio.r2dbc:DEBUG这种架构结合了 MyBatis-Plus 的便利性和 R2DBC 的响应式能力,适合需要复杂查询的场景。