Webflux实践总结

0. 前言

12月第2个周学习了响应式编程,后边经过新冠,做了everylittle的实践工作,下边对这块工作进行一个总结。

本文主要使用了SpringBoot 3.0.0,几个重要的依赖如下:

implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-validation'
// r2dbc
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
implementation 'io.r2dbc:r2dbc-pool:1.0.0.RELEASE'
implementation 'dev.miku:r2dbc-mysql:0.8.2.RELEASE'
implementation 'mysql:mysql-connector-java:8.0.28'
// redis
implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
// aop
implementation 'org.springframework.boot:spring-boot-starter-aop'

1. Auditing

参考

Auditing指的是对表数据创建、修改的审计,包括4个属性:creator、createTime、updator、updateTime。这里,由于是个人系统,我只使用了createTime与updateTime2个属性。

用法

对于这两个属性,所做的配置只有2点:

  1. 写一个AuditConfig的Configuration,并启动@EnableR2dbcAuditing

    @EnableR2dbcAuditing
    @Configuration
    public class AuditConfig {
    }
    
  2. 在AuditEntity上增加@CreatedData@LastModifiedDate

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public abstract class AuditBaseEntity {
        @Column("create_time")
        @CreatedDate
        protected LocalDateTime createTime;
    
        @Column("update_time")
        @LastModifiedDate
        protected LocalDateTime updateTime;
    
    }
    

经验

  1. 关于Creator与Updator的使用

    因为要去拿创建者或者修改者的Id,所以一定会需要一个回调来获取User。

    需要实现ReactiveAuditorAware<T>接口的Mono<T> getCurrentAuditor()方法即可

    ps: 这个因为没有实践,只是从官方文档上看到

    注意:官网上明确说明,不需要这个两个属性时,不需要实现该接口。

    Applications that only track creation and modification dates are not required do make their entities implement AuditorAware.

  2. 关于JPA与listerner

    网上有些资料,是JPA的Auditing的实践,在Entity上增加Listener注解,并指明修改时执行的方法,这些资料不适合Webflux,不要用,也不要引入JPA。

  3. Audit的实现方式

    之前使用都是使用基类的方式来实现,这里也沿用了这个思路,除此之外还有2种思路。

    • 直接在需要的Entity中增加相应属性

    • 采用组合的方式,如官网给出的示例:

      class Customer {
        private AuditMetadata auditingMetadata;
        // … further properties omitted
      }
      class AuditMetadata {
        @CreatedBy
        private User user;
        @CreatedDate
        private Instant createdDate;
      }
      

      我觉得,这种方式在之后的使用上可能没那么方便,也就没有用。

      基类的方式也存在着缺点,因为继承只能是单继承。

2. exception处理

在SpringMVC中,我们有着优雅对异常的统一处理,在Webflux中,我也引入了它们。与之前不同,这里需要我们自己进行转换,将构建DataBuffer写入的Response中。

整个过程:

  1. 通过exchange获取到response以及Exception
  2. 构造自己MyResponseBody
  3. 通过ObjectMapper(jackson)将MyResponseBody序列化成byte[]字节数组
  4. 通过response.bufferFactory().wrap(bytes)将byte[]转换成DataBuffer
  5. 最后将DataBuffer装入Mono,并通过**writeWith()**将数据写入response

代码如下:

@Slf4j
@Component
@AllArgsConstructor
@Order(-99)
public class GlobalExceptionHandler implements WebExceptionHandler {
    private final ObjectMapper objectMapper;
    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();
        // 对自定义异常的处理
        if(ex instanceof MyException e){
            // 构造MyResponseBody
            MyResponseBody body = MyResponseBody.fail(e.getCode(), e.getMessage());
            try {
                // 序列化成byte[]
                byte[] bits = objectMapper.writeValueAsBytes(body);
                 response.setStatusCode(HttpStatus.OK);
            	response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
                // 构造DataBuffer并写入response
                return response.writeWith(createData(response, bits));
            } catch (JsonProcessingException exc) {
                log.debug("failed to process json", exc);
                response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
                return response.writeWith(createData(response, ex.getMessage().getBytes()));
            }
        }
		// 其他异常的处理
        ex.printStackTrace();
        response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
        return response.writeWith(createData(response, ex.getMessage().getBytes()));
    }
    private Mono<DataBuffer> createData(ServerHttpResponse response, byte [] bytes){
        DataBuffer dataBuffer = response.bufferFactory().wrap(bytes);
        return Mono.just(dataBuffer);
    }
}

3. filter应用:userId的获取

tag:filter,UserId,Mono的Context

介绍

认证指的是对用户是否登录进行判断,若没有登录则不允许访问。这里包括登录还包括对redis缓存的使用,以及Mono的Context概念。

认证的逻辑逻辑比较清晰:

  1. 从cookie中获取token
  2. 从redis中获取token对应的user
  3. 将user放到Mono的Context中以备使用
@Component
@AllArgsConstructor
public class AuthenticationFilter  implements WebFilter {

    private final ReactiveRedisTemplate<String, Object> redisTemplate;
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        
        ServerHttpRequest request = exchange.getRequest();
        // 忽略不需要认证的路由
        RequestPath path = request.getPath();
        if(path.toString().contains("login")){
            return chain.filter(exchange);
        }
        
        // 获取token
        HttpCookie tokenCookie = request.getCookies().getFirst("token");
        return Mono.justOrEmpty(tokenCookie)
                .switchIfEmpty(Mono.error(new BadRequestException("请登录")))
                .map(HttpCookie::getValue)
                .flatMap(token -> redisTemplate.opsForValue().get(token))				// 从缓存中获取User
                .switchIfEmpty(Mono.error(new BadRequestException("请登录")))
                .map(user -> ((User)user).getId())
                .flatMap(userId -> exchange.getSession()
                        .doOnNext(webSession -> webSession.getAttributes().putIfAbsent(UserHolder.CONTEXT_KEY, userId))		// 将userId放到WebSession中
                        .then(chain.filter(exchange).contextWrite(context -> context.put(UserHolder.CONTEXT_KEY, userId)))	    //  将userId放到Context中
                );
    }
}

起初只将userId通过contextWrite放到了Context中,便于通过Util去获取userId,后来由于缓存的缘故,又增加了放到WebSession中的步骤。

讲到Util,这里来看看如何从Context中去获取:

public class UserHolder {
    public static final String CONTEXT_KEY = "USER_ID";
    public static Mono<Long> getUser(){
        return  Mono.deferContextual(ctx -> Mono.just(ctx.get(CONTEXT_KEY)));
    }
}

通过Mono.deferContextual来获取Context中的内容,这块内容放到后边再讲。

经验

  • 流的概念

    写完之后,先用写过程式的方式去获取userId,当然最开始返回的是Long,而不是Mono

    Long userId = UserHoler.getUser()
    

    然后再需要的时候再去使用userId,这样总是调不通,原因是它们不在一个流上,Context是不同的。后来将UserHolder放到流上,问题才解决。

    Mono<Post> createPost(@Valid @RequestBody PostCreateDto dto){
            return UserHolder.getUser()
                    .map(userId -> Post.builder()
                            .title(dto.getTitle())
                            .userId(userId)
                            .state(Post.PostState.DRAFTS)
                            .scope(Post.PostScope.PRIVATE)
                            .projectId(dto.getProjectId())
                            .build())
                    .flatMap(postRepository::save);
        }
    

    参考:Reactor上下文ContextReactor3上下文Context处理

4. filter应用:Response的装饰

tag:filter,JsonNode,DataBuffer,ServerHttpResponseDecorator

介绍

在SpringMVC中,我们定义统一的返回码与返回格式,这里也将它们引入了进来。如果每个路由中都去进行装饰,则过于繁琐,不利于一眼看清返回的实体,于是这里通过filter的方式,对返回进行装饰,将返回的结构放到规定的data中去。

这里的逻辑也很清洗:

  • 就是将原来的数据从response中取出
  • 对然后对数据进行封装
  • 在将数据写回到response中

实现上是将Response做了装饰,看看实现:

public class BodyWrapperResponse extends ServerHttpResponseDecorator {
    ObjectMapper objectMapper = new ObjectMapper();
    public BodyWrapperResponse(ServerHttpResponse delegate) {
        super(delegate);
    }
    @Override
    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
        Flux<DataBuffer> newBuffer = Flux.from(body)
                .map(dataBuffer -> {
                    // 将DataBuffer转成字符串
                    String originStr = StandardCharsets.UTF_8.decode(dataBuffer.toByteBuffer()).toString();
                    MyResponseBody responseBody = new MyResponseBody(MyResponseCode.OK);
                    // 如果是可以json化的,写入jsonNode。否则写入原值
                    if(!originStr.isEmpty()){	  
                        try {
                            // 将字符串解析成JsonNode对象
                            JsonNode jsonNode = objectMapper.readTree(originStr);		
                            
                            // 将JsonNode封装进MyResponseBody
                            responseBody.setData(jsonNode);														
                        } catch (JsonProcessingException e) {
                            responseBody.setData(originStr);
                        }
                    }
                    try {
                        // 将MyResponseBody转换成byte[]
                        byte[] bits = objectMapper.writeValueAsBytes(responseBody);
                        HttpHeaders httpHeaders = super.getHeaders();
                        httpHeaders.setContentLength(bits.length);
                            // 将byte[]转换成DataBuffer
                        return super.bufferFactory().wrap(bits);
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                        return  dataBuffer;
                    }
                });
        // 将DataBuffer写到Response中
        return super.writeWith(newBuffer);
    }
}

filter部分的代码如下:

@Component
public class ResponseWebFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpResponse originalResponse =  exchange.getResponse();
        //  对response进行封装
        BodyWrapperResponse bodyWrapperResponse = new BodyWrapperResponse(originalResponse);
        // 将新的response到exchange中
        ServerWebExchange webExchange = exchange.mutate().response(bodyWrapperResponse).build();
        return chain.filter(webExchange);
    }
}

经验

  • Flux与Mono的差异

    写完之后,简单的CRUD都能正常包上MyResponseBody,但有一种例外:Flux。在获取Flux时,总是提示错误,后来发现数据总是被整齐的切成一条一条的,应该是R2dbc这种响应式的读取方式,从数据库出来的数据并不是等待读取完成再返回,而是边读取边返回。

    后来发现可以采用Flux::collectList(),将Flux转变成Mono进行返回。

5. R2dbc分页

分页的好处是每次都取一定量的数据,提高性能。

R2dbc对分页的支持还是很强的,普通的查询只需要增加Pageable参数即可。

介绍

Repository的写法:

public interface PostRepository extends ReactiveCrudRepository<Post, Long> {
    // 注意这里的Pageable
	Flux<Post> findPostsByUserIdAndStateNot(Long userId, Post.PostState state, Pageable pageable);
    // 查询总量
	Mono<Long> countByUserIdAndStateNot(Long userId, Post.PostState state);
}

Controller的写法:

@GetMapping("/list")
@ResponseBody
Mono<MyPage<PostSummaryVo>> getPostList(@RequestParam("page") int page, @RequestParam("size") int size){
    return UserHolder.getUser()
        // 获取数据
        .flatMapMany(userId -> postRepository.findPostsByUserIdAndStateNot(userId,Post.PostState.DELETED,PageRequest.of(page, size)))
        .map(PostSummaryVo::new)
        // 注意这里的collectionList,将Flux转换成了Mono<List>
        .collectList()
        // 获取total,这里的zipWhen也值得注意
        .zipWhen(l -> UserHolder.getUser().flatMap(userId -> postRepository.countByUserIdAndStateNot(userId, Post.PostState.DELETED)))
        .map(t -> new MyPage<PostSummaryVo>(t.getT1(), t.getT2(), size));
}

经验

  • 对总量的处理

    手机端下滑与上滑不需要总量这个参数,这样对性能更友好一些。

    web端方面,组件上一般都有总量这个参数,需要获取。这就有2种方案:

    1. 总量获取一次,如果有增、删操作,触发再次获取。
    2. 后端做缓存,获取之后加入缓存,有增、删操作删除缓存

6. R2dbc存取json:Converter的使用

在Mybatis中,json的存取通过typeHandler对数据进行转换,主要是读取时的转换。R2dbc采用的JPA的方式,通过Converter来实现。

介绍

R2dbc不用在Entity的注解标明,需要写好Converter,然后通过配置类进入到R2dbc中。

下边先来看看Configuration

@Configuration
public class R2dbcConfiguration extends AbstractR2dbcConfiguration{

    @Value("${spring.r2dbc.url}")
    private String url;
    private final ObjectMapper objectMapper;
    public R2dbcConfiguration(ObjectMapper objectMapper){
        this.objectMapper = objectMapper;
    }
    @Override
    public ConnectionFactory connectionFactory() {
        return ConnectionFactories.get(url);
    }

    @Override
    protected List<Object> getCustomConverters(){
        return List.of(new ListReadingConverter(objectMapper),
                new ListWriterConverter(objectMapper),
                new UserReadingConverter(objectMapper),
                new UserWriterConverter(objectMapper)
        );
    }
}

Converter的实现,有2个,一个是将String转换成Object,另外一个将Object转换成String,这里我采用了泛型的方式。

// 读的Converter,从String转成相应的Object
@Slf4j
@AllArgsConstructor
public abstract class BaseJsonReadingConverter<T> implements Converter<String, T> {
    private ObjectMapper objectMapper;
    // 获取泛型的Class
    private Class<T> getTClass() {
        return (Class<T>)((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }
    @Override
    public T convert(String source) {
        if(!(StringUtils.hasLength(source) && StringUtils.hasText(source))){
            return null;
        }
        try {
            return (T)objectMapper.readValue(source, getTClass());
        } catch (Exception ex) {
            log.error("Error converting json column data from database", ex);
        }
        return null;
    }
}

// 写的Converter,将Object转成String
@Slf4j
@AllArgsConstructor
public abstract class BaseJsonWriterConverter<T> implements Converter<T, String> {
    private ObjectMapper objectMapper;
    @Override
    public String convert(T source) {
        String response = null;
        try {
            response = objectMapper.writeValueAsString(source);
        } catch (JsonProcessingException e) {
            log.error("Error parsing Object to database column", e);
        }
        return response;
    }
    
    // List的ReadingConverter
    @Slf4j
@ReadingConverter
public class ListReadingConverter extends BaseJsonReadingConverter<List>{
    public ListReadingConverter(ObjectMapper objectMapper) {
        super(objectMapper);
    }
}
    
// List的WriteConverter
@Slf4j
@WritingConverter
public class ListWriterConverter extends BaseJsonWriterConverter<List>{
    public ListWriterConverter(ObjectMapper objectMapper) {
        super(objectMapper);
    }
}

经验

  • Class类的获取方式

    不同通过T.class来获取,采用了2种方案:

    1. 通过子类实现父类接口返回子类的Class

      public abstract class Foo<T> {  
          public abstract Class getEntityClass();  
      }  
      public class Child extends Foo<String> {  
          public Class getEntityClass() {  
              return String.class;  
          }  
      }  
      

      这种方式缺点是每个子类都得写一遍

    2. 反射的方式

        // 获取泛型的Class
          private Class<T> getTClass() {
              return (Class<T>)((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
          }
      

      getClass().getGenericSuperclass()返回表示此 Class所表示的实体的Type;

      将这个Type转换成ParameterizedType;

      getActualTypeArguments()返回表示此Type实际类型参数的数组,也就是泛型参数的实际类型。

      参考1参考2参考3

  • 是否可以不写子类getCustomConverters

    这里进行进行了尝试,在R2dbcConfiguration的getCustomConverters中,直接new BaseJsonReadingConverter,结果并没有成功,推断不出所用的类型。目前先采用这种方式。

7. Reidis使用

Reids的使用参考官网 以及补充,与SpringMVC使用Redis基本一致。

  • 引入依赖

    implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive:3.0.0'
    
  • yml配置

    spring:
    # ...
      data:
        redis:
          host: localhost
          port: 6379
          database: 2
          timeout: 5000
          repositories:
            enabled: false
    

    注:这里的配置中一定要将repositories:enabled: false,否则,它会尝试使用非响应式的RedisTemplate,而导致失败,参考

  • 配置类中,生成ReactiveRedisTemplate,并注入到Spring

    @Configuration
    @Slf4j
    public class ReactiveRedisConfig {
        /**
         * 默认日期时间格式
         */
        public static final String DEFAULT_DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
        /**
         * 默认日期格式
         */
        public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
        /**
         * 默认时间格式
         */
        public static final String DEFAULT_TIME_FORMAT = "HH:mm:ss";
        @Bean
        public ReactiveRedisTemplate<String, Object> redisTemplate(ReactiveRedisConnectionFactory factory) {
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
    
            //LocalDateTime系列序列化和反序列化模块,继承自jsr310,我们在这里修改了日期格式
            JavaTimeModule javaTimeModule = new JavaTimeModule();
            //序列化
            javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(
                    DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)));
            javaTimeModule.addSerializer(LocalDate.class,
                    new LocalDateSerializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_FORMAT)));
            javaTimeModule.addSerializer(LocalTime.class,
                    new LocalTimeSerializer(DateTimeFormatter.ofPattern(DEFAULT_TIME_FORMAT)));
            //反序列化
            javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(
                    DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)));
            javaTimeModule.addDeserializer(LocalDate.class,
                    new LocalDateDeserializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_FORMAT)));
            javaTimeModule.addDeserializer(LocalTime.class,
                    new LocalTimeDeserializer(DateTimeFormatter.ofPattern(DEFAULT_TIME_FORMAT)));
            //注册模块
            objectMapper.registerModule(javaTimeModule);
            //json的序列化
            Jackson2JsonRedisSerializer<Object> valueSerializer = new Jackson2JsonRedisSerializer<>(objectMapper, Object.class);
            StringRedisSerializer keySerializer = new StringRedisSerializer();
            RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder = RedisSerializationContext.newSerializationContext(keySerializer);
            RedisSerializationContext<String, Object> context = builder.value(valueSerializer).build();
            return new ReactiveRedisTemplate<>(factory, context);
        }
    }
    
    

    在Json序列化过程中,指定了Date的格式,这个不用也成,其他的与官网就类似了

  • 通过ReactiveRedisTemplate操作Redis

    这里简单举个例子:redisTemplate.opsForValue().get(token),这是最简单的key-value类型,也可以List、Set、Hash、ZSet等结构。

8. AOP的使用:@Cacheable的实现

介绍

在SpringMVC中可以通过注解(@Cacheable)的方式对数据进行缓存,这里并没有直接提供这项功能,于是通过AOP的方式做了实现,下边来看看。

  • 引入依赖

    implementation 'org.springframework.boot:spring-boot-starter-aop:3.0.0'
    
  • 启动Aop

    ...
    @EnableAspectJAutoProxy
    public class EverylittleApplication {
        ...
    }
    
  • 定义注解

    redis原来的有4个注解接口:@Cacheable、@CacheEvict、@CachePut、@Caching(以上3种的组合),我这里只实现了2种:@ReactiveCacheable,@ReactiveCacheEvict。

    // 增加缓存
    @Target({ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface ReactiveCacheable {
        /**
         * 缓存key,key为cacheName+"_"+key
         * 支持EL表达式
        */
        String key();
    
        /**
         * 缓存key分组,会做为缓存key的前缀+"_"
         * 支持EL表达式
        */
        String cacheName();
    
        /**
         * 缓存过期时间,单位秒,默认24小时
        */
        long timeout() default 24 * 3600L;
    }
    
    // 删除缓存注解
    @Target({ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface ReactiveCacheEvict {
        /**
         * 缓存key,如果cacheName不为空,则key为cacheName+"_"+key
         * 支持EL表达式
         */
        String key() default "";
    
        /**
         * 缓存key分组,会做为缓存key的前缀+"_"
         * 支持EL表达式
         */
        String cacheName();
    
        /**
         * 是否删除cacheName下全部缓存数据,
         * true时cacheName不能为空,此时即便指定了key值,也会删除cacheName下全部缓存
         * false时key值不能为空
         */
        boolean allEntries() default false;
    
        /**
         * 调用清除缓存的时机,true:执行方法前,false:执行方法后
         * 如果是false,则方法执行过程中发生异常,则不会清除缓存
        */
        boolean beforeInvocation() default false;
    }
    
  • 通过Aop实现缓存逻辑

    先定义切点(PoinCut),再定义切入的方式@Around、@Before或@AfterReturning,最后定义点处的逻辑。

    @Component
    @Aspect
    @Slf4j
    public class ReactiveCacheAspect {
        private final ReactiveRedisTemplate<String, Object> redisTemplate;
        public ReactiveCacheAspect(ReactiveRedisTemplate<String, Object> redisTemplate){
            this.redisTemplate = redisTemplate;
        }
        // 定义切点
        @Pointcut("@annotation(com.sun.everylittle.aop.ReactiveCacheable)")
        public void cacheablePointCut() {
        }
        @Pointcut("@annotation(com.sun.everylittle.aop.ReactiveCacheEvict)")
        public void cacheEvictPointCut() {
        }
    
        //环绕通知,一般不建议使用,可以通过@Before和@AfterReturning实现
        //但是响应式方法只能通过环绕通知实现aop,因为其它通知会导致不再同一个线程执行
        @Around("cacheablePointCut()")
        public Mono<Object> cacheableAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
            log.debug("ReactiveRedisCacheAspect cacheableAround....");
    
            MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
            Method method = methodSignature.getMethod();
            ReactiveCacheable annotation = method.getAnnotation(ReactiveCacheable.class);
            String cacheName = annotation.cacheName();
            String key = annotation.key();
            long timeout = annotation.timeout();
            //转换EL表达式
            cacheName = (String) AspectSupportUtils.getKeyValue(proceedingJoinPoint, cacheName);
            key = ""+AspectSupportUtils.getKeyValue(proceedingJoinPoint, key);
            String redis_key = cacheName + "_" + key;
            return redisTemplate.hasKey(redis_key)
                    .flatMap(hasKey-> {
                        if(hasKey){
                            return redisTemplate.opsForValue().get(redis_key);
                        }else {
                            try {
                                return ((Mono<Object>)proceedingJoinPoint.proceed())
                                        .zipWhen(o -> redisTemplate.opsForValue().set(redis_key, o, Duration.ofSeconds(timeout)))
                                        .map(Tuple2::getT1);
                            } catch (Throwable e){
                                return Mono.error(new Exception(e.getMessage()));
                            }
                        }
                    });
        }
    
    	// 逻辑实现
        @Around("cacheEvictPointCut()")
        public Mono<Object> cacheEvictAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
            log.debug("ReactiveRedisCacheAspect cacheEvictAround....");
    
            MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
            Method method = methodSignature.getMethod();
            String returnTypeName = method.getReturnType().getSimpleName();
    
            ReactiveCacheEvict annotation = method.getAnnotation(ReactiveCacheEvict.class);
            String cacheName = annotation.cacheName();
            String annoKey = annotation.key();
            boolean allEntries = annotation.allEntries();
            boolean beforeInvocation = annotation.beforeInvocation();
            //转换EL表达式
            cacheName = (String) AspectSupportUtils.getKeyValue(proceedingJoinPoint, cacheName);
            String key = ""+AspectSupportUtils.getKeyValue(proceedingJoinPoint, annoKey);
    
            //执行方法前清除缓存
            if (beforeInvocation) {
                //清除全部缓存
                return deleteRedisCache(cacheName, key, allEntries)
                        .flatMap(c -> {
                            try {
                                //实际执行的方法
                                return (Mono<Object>)proceedingJoinPoint.proceed();
                            } catch (Throwable e) {
                                return Mono.error(new RuntimeException(e));
                            }
                        });
            } else {//成功执行方法后清除缓存
                //实际执行的方法
                final String cacheNameTemp = cacheName;
                final String keyTemp = key;
                return ((Mono<Object>)proceedingJoinPoint.proceed())
                        .zipWhen(obj->deleteRedisCache(cacheNameTemp, keyTemp, allEntries))
                        .map(Tuple2::getT1);
            }
        }
    
        private Mono<Long> deleteRedisCache(String cacheName, String key, boolean clearAll) {
            String redis_key = clearAll ? cacheName+"_*" : cacheName+"_"+key;
            return redisTemplate.delete(redisTemplate.keys(redis_key));
        }
    
    }
    

    注:这块代码基本出自 reactive-redis-cache-annotation-spring-boot-starter文章,还有EL表达式的处理没有贴出,可以翻阅原地址查看。

  • 使用

    @ReactiveCacheable(cacheName="tag", key="#uId")
    @GetMapping("/list/{type}")
    @ResponseBody
    public Mono<List<Tag>> getTagList(@PathVariable String type, @SessionAttribute(UserHolder.CONTEXT_KEY) Long uId){
        System.out.println(uId);
        return UserHolder.getUser()
            .flatMapMany(userId -> tagRepository.findTagsByUserIdAndTypeOrderById(userId, Tag.TagType.valueOf(type)))
            .collectList();
    }
    
    @ReactiveCacheEvict(cacheName = "tag", key="#uId")
    @DeleteMapping("/i/{tagId}")
    @ResponseBody
    public Mono<String> deleteTag(@PathVariable Long tagId, @SessionAttribute(UserHolder.CONTEXT_KEY) Long uId){
        return tagRepository.deleteById(tagId)
            .thenReturn("");
    }
    

经验

  • 缓存的实现方式

    缓存的实现方式除了用注解的方式,还可以用函数编程的方式在需要的地方直接缓存,但这样并不优雅,故而这里没有采用这种方式。参考

  • 注解的类型

    这里没有实现@CachePut,看了它的逻辑,先将缓存删除,然后再修改数据库,再存入缓存。我觉得这样做太麻烦,还存在一致性的风险,也就没有引入。再Get时使用@ReactiveCacheable,在Put与Delete时使用@ReactiveCacheEvit即可。

  • 缓存key的选择

    前边时候说过,一开始只实现了UserHoler来获取UserId的,这里由于缓存时候,需要用userId进行区别,也就用实现了用SessionAttribute来记录UserId的方式。参考

9. 测试

以前写单元测试,大多写的都是对Service的测试,这次写时没有直接没有写service,直接在Controller中实现的逻辑,这很不MVC,但却有点Webflux的Endpoint的样子。这导致我这里的测试也是对Controller的测试,其实与Apifox中的测试有点重合,故而这里只做简单的介绍。

介绍

  • 引入依赖

    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
    
  • StepVerifier

    在使用前要介绍一个基本的概念:StepVerifier,参考。它是对Flux的每个元素进行验证的。示例如下:

     @Test
        void testStepVerifier(){
            Flux<String> source = Flux.just("John", "Monica", "Mark", "Cloe", "Frank", "Casper", "Olivia", "Emily", "Cate")
                    .filter(name -> name.length() == 4)
                    .doOnNext(System.out::println)
                    .map(String::toUpperCase);
    
            StepVerifier
                    .create(source)
                    .expectNext("JOHN")
                    .expectNextMatches(name -> name.startsWith("MA"))
                    .expectNext("CLOE", "CATE")
                    .expectComplete()
                    .verify();
        }
    

    通过expectNext,对每一个元素进行验证。还可以通过expectErrorMatches对异常进行验证。示例如下:

     @Test
        void testException(){
            int i = 1;
            Flux<String> source = Flux.just("John", "Monica", "Mark", "Cloe", "Frank", "Casper", "Olivia", "Emily", "Cate")
                    .filter(name -> name.length() == 4)
                    .doOnNext(t->System.out.println(i))
                    .map(String::toUpperCase);
            Flux<String> error = source.concatWith(
                    Mono.error(new IllegalArgumentException("Our message")));
            StepVerifier
                    .create(error)
                    .expectNextCount(4)
                    .expectErrorMatches(throwable -> throwable instanceof IllegalArgumentException &&
                            throwable.getMessage().equals("Our message")
                    ).verify();
        }
    
  • 结合StepVerifier与WebClient对Controller进行验证

    WebClient是Webflux的客户端,通过客户端访问服务,参考

    
    @Test
        void testTaskGet(){
            private final WebClient webClient = WebClient.create("http://localhost:8080");
           Mono<MyResponseBody> bodyMono = webClient
                   .get()
                   .uri("/task/list")
                   .cookie("token", token)
                   .retrieve()
                   .bodyToMono(MyResponseBody.class);
    
           StepVerifier
                   .create(bodyMono)
                   .consumeNextWith(entity ->{
                       System.out.println(entity);
                       assert(entity.getCode().equals(MyResponseCode.OK.getCode()));
                       assert(((List<Task>)entity.getData()).size()==5);
                   })
                   .expectComplete()
                   .verify();
    
        }
    
    

经验

  • 先服务,再运行测试

    这里没有引入Junit,没法自动启动服务,只能先启动application,然后再执行Test。这种方式很轻量,可以将@Test很容易的引入代码中。

10. 总结

本文对webflux实践中的要点进行了整理,大约包括web与存取2大部分,逻辑上都比较清晰,主要是工程上的调试。

web部分包括:Exception的处理、filter的使用、Response的装饰、AOP的使用;

存取部分包括:R2dbc的分页、Auditing的实现、Redis的使用等;

# 语言 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×