0. 前言
12月第2个周学习了响应式编程,后边经过新冠,做了everylittle的实践工作,下边对这块工作进行一个总结。
本文主要使用了SpringBoot 3.0.0,几个重要的依赖如下:
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-validation'
// r2dbc
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
implementation 'io.r2dbc:r2dbc-pool:1.0.0.RELEASE'
implementation 'dev.miku:r2dbc-mysql:0.8.2.RELEASE'
implementation 'mysql:mysql-connector-java:8.0.28'
// redis
implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
// aop
implementation 'org.springframework.boot:spring-boot-starter-aop'
1. Auditing
参考
Auditing指的是对表数据创建、修改的审计,包括4个属性:creator、createTime、updator、updateTime。这里,由于是个人系统,我只使用了createTime与updateTime2个属性。
用法
对于这两个属性,所做的配置只有2点:
-
写一个AuditConfig的Configuration,并启动@EnableR2dbcAuditing
@EnableR2dbcAuditing
@Configuration
public class AuditConfig {
}
-
在AuditEntity上增加@CreatedData
与@LastModifiedDate
@Data
@AllArgsConstructor
@NoArgsConstructor
public abstract class AuditBaseEntity {
@Column("create_time")
@CreatedDate
protected LocalDateTime createTime;
@Column("update_time")
@LastModifiedDate
protected LocalDateTime updateTime;
}
经验
-
关于Creator与Updator的使用
因为要去拿创建者或者修改者的Id,所以一定会需要一个回调来获取User。
需要实现ReactiveAuditorAware<T>
接口的Mono<T> getCurrentAuditor()
方法即可
ps: 这个因为没有实践,只是从官方文档上看到
注意:官网上明确说明,不需要这个两个属性时,不需要实现该接口。
Applications that only track creation and modification dates are not required do make their entities implement AuditorAware.
-
关于JPA与listerner
网上有些资料,是JPA的Auditing的实践,在Entity上增加Listener注解,并指明修改时执行的方法,这些资料不适合Webflux,不要用,也不要引入JPA。
-
Audit的实现方式
之前使用都是使用基类的方式来实现,这里也沿用了这个思路,除此之外还有2种思路。
-
直接在需要的Entity中增加相应属性
-
采用组合的方式,如官网给出的示例:
class Customer {
private AuditMetadata auditingMetadata;
// … further properties omitted
}
class AuditMetadata {
@CreatedBy
private User user;
@CreatedDate
private Instant createdDate;
}
我觉得,这种方式在之后的使用上可能没那么方便,也就没有用。
基类的方式也存在着缺点,因为继承只能是单继承。
2. exception处理
在SpringMVC中,我们有着优雅对异常的统一处理,在Webflux中,我也引入了它们。与之前不同,这里需要我们自己进行转换,将构建DataBuffer写入的Response中。
整个过程:
- 通过exchange获取到response以及Exception
- 构造自己MyResponseBody
- 通过ObjectMapper(jackson)将MyResponseBody序列化成byte[]字节数组
- 通过response.bufferFactory().wrap(bytes)将byte[]转换成DataBuffer
- 最后将DataBuffer装入Mono,并通过**writeWith()**将数据写入response
代码如下:
@Slf4j
@Component
@AllArgsConstructor
@Order(-99)
public class GlobalExceptionHandler implements WebExceptionHandler {
private final ObjectMapper objectMapper;
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
ServerHttpResponse response = exchange.getResponse();
// 对自定义异常的处理
if(ex instanceof MyException e){
// 构造MyResponseBody
MyResponseBody body = MyResponseBody.fail(e.getCode(), e.getMessage());
try {
// 序列化成byte[]
byte[] bits = objectMapper.writeValueAsBytes(body);
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
// 构造DataBuffer并写入response
return response.writeWith(createData(response, bits));
} catch (JsonProcessingException exc) {
log.debug("failed to process json", exc);
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
return response.writeWith(createData(response, ex.getMessage().getBytes()));
}
}
// 其他异常的处理
ex.printStackTrace();
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
return response.writeWith(createData(response, ex.getMessage().getBytes()));
}
private Mono<DataBuffer> createData(ServerHttpResponse response, byte [] bytes){
DataBuffer dataBuffer = response.bufferFactory().wrap(bytes);
return Mono.just(dataBuffer);
}
}
3. filter应用:userId的获取
tag:filter,UserId,Mono的Context
介绍
认证指的是对用户是否登录进行判断,若没有登录则不允许访问。这里包括登录还包括对redis缓存的使用,以及Mono的Context概念。
认证的逻辑逻辑比较清晰:
- 从cookie中获取token
- 从redis中获取token对应的user
- 将user放到Mono的Context中以备使用
@Component
@AllArgsConstructor
public class AuthenticationFilter implements WebFilter {
private final ReactiveRedisTemplate<String, Object> redisTemplate;
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 忽略不需要认证的路由
RequestPath path = request.getPath();
if(path.toString().contains("login")){
return chain.filter(exchange);
}
// 获取token
HttpCookie tokenCookie = request.getCookies().getFirst("token");
return Mono.justOrEmpty(tokenCookie)
.switchIfEmpty(Mono.error(new BadRequestException("请登录")))
.map(HttpCookie::getValue)
.flatMap(token -> redisTemplate.opsForValue().get(token)) // 从缓存中获取User
.switchIfEmpty(Mono.error(new BadRequestException("请登录")))
.map(user -> ((User)user).getId())
.flatMap(userId -> exchange.getSession()
.doOnNext(webSession -> webSession.getAttributes().putIfAbsent(UserHolder.CONTEXT_KEY, userId)) // 将userId放到WebSession中
.then(chain.filter(exchange).contextWrite(context -> context.put(UserHolder.CONTEXT_KEY, userId))) // 将userId放到Context中
);
}
}
起初只将userId通过contextWrite放到了Context中,便于通过Util去获取userId,后来由于缓存的缘故,又增加了放到WebSession中的步骤。
讲到Util,这里来看看如何从Context中去获取:
public class UserHolder {
public static final String CONTEXT_KEY = "USER_ID";
public static Mono<Long> getUser(){
return Mono.deferContextual(ctx -> Mono.just(ctx.get(CONTEXT_KEY)));
}
}
通过Mono.deferContextual来获取Context中的内容,这块内容放到后边再讲。
经验
-
流的概念
写完之后,先用写过程式的方式去获取userId,当然最开始返回的是Long,而不是Mono
Long userId = UserHoler.getUser()
然后再需要的时候再去使用userId,这样总是调不通,原因是它们不在一个流上,Context是不同的。后来将UserHolder放到流上,问题才解决。
Mono<Post> createPost(@Valid @RequestBody PostCreateDto dto){
return UserHolder.getUser()
.map(userId -> Post.builder()
.title(dto.getTitle())
.userId(userId)
.state(Post.PostState.DRAFTS)
.scope(Post.PostScope.PRIVATE)
.projectId(dto.getProjectId())
.build())
.flatMap(postRepository::save);
}
参考:Reactor上下文Context,Reactor3上下文Context处理
4. filter应用:Response的装饰
tag:filter,JsonNode,DataBuffer,ServerHttpResponseDecorator
介绍
在SpringMVC中,我们定义统一的返回码与返回格式,这里也将它们引入了进来。如果每个路由中都去进行装饰,则过于繁琐,不利于一眼看清返回的实体,于是这里通过filter的方式,对返回进行装饰,将返回的结构放到规定的data中去。
这里的逻辑也很清洗:
- 就是将原来的数据从response中取出
- 对然后对数据进行封装
- 在将数据写回到response中
实现上是将Response做了装饰,看看实现:
public class BodyWrapperResponse extends ServerHttpResponseDecorator {
ObjectMapper objectMapper = new ObjectMapper();
public BodyWrapperResponse(ServerHttpResponse delegate) {
super(delegate);
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<DataBuffer> newBuffer = Flux.from(body)
.map(dataBuffer -> {
// 将DataBuffer转成字符串
String originStr = StandardCharsets.UTF_8.decode(dataBuffer.toByteBuffer()).toString();
MyResponseBody responseBody = new MyResponseBody(MyResponseCode.OK);
// 如果是可以json化的,写入jsonNode。否则写入原值
if(!originStr.isEmpty()){
try {
// 将字符串解析成JsonNode对象
JsonNode jsonNode = objectMapper.readTree(originStr);
// 将JsonNode封装进MyResponseBody
responseBody.setData(jsonNode);
} catch (JsonProcessingException e) {
responseBody.setData(originStr);
}
}
try {
// 将MyResponseBody转换成byte[]
byte[] bits = objectMapper.writeValueAsBytes(responseBody);
HttpHeaders httpHeaders = super.getHeaders();
httpHeaders.setContentLength(bits.length);
// 将byte[]转换成DataBuffer
return super.bufferFactory().wrap(bits);
} catch (JsonProcessingException e) {
e.printStackTrace();
return dataBuffer;
}
});
// 将DataBuffer写到Response中
return super.writeWith(newBuffer);
}
}
filter部分的代码如下:
@Component
public class ResponseWebFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpResponse originalResponse = exchange.getResponse();
// 对response进行封装
BodyWrapperResponse bodyWrapperResponse = new BodyWrapperResponse(originalResponse);
// 将新的response到exchange中
ServerWebExchange webExchange = exchange.mutate().response(bodyWrapperResponse).build();
return chain.filter(webExchange);
}
}
经验
5. R2dbc分页
分页的好处是每次都取一定量的数据,提高性能。
R2dbc对分页的支持还是很强的,普通的查询只需要增加Pageable参数即可。
介绍
Repository的写法:
public interface PostRepository extends ReactiveCrudRepository<Post, Long> {
// 注意这里的Pageable
Flux<Post> findPostsByUserIdAndStateNot(Long userId, Post.PostState state, Pageable pageable);
// 查询总量
Mono<Long> countByUserIdAndStateNot(Long userId, Post.PostState state);
}
Controller的写法:
@GetMapping("/list")
@ResponseBody
Mono<MyPage<PostSummaryVo>> getPostList(@RequestParam("page") int page, @RequestParam("size") int size){
return UserHolder.getUser()
// 获取数据
.flatMapMany(userId -> postRepository.findPostsByUserIdAndStateNot(userId,Post.PostState.DELETED,PageRequest.of(page, size)))
.map(PostSummaryVo::new)
// 注意这里的collectionList,将Flux转换成了Mono<List>
.collectList()
// 获取total,这里的zipWhen也值得注意
.zipWhen(l -> UserHolder.getUser().flatMap(userId -> postRepository.countByUserIdAndStateNot(userId, Post.PostState.DELETED)))
.map(t -> new MyPage<PostSummaryVo>(t.getT1(), t.getT2(), size));
}
经验
6. R2dbc存取json:Converter的使用
在Mybatis中,json的存取通过typeHandler对数据进行转换,主要是读取时的转换。R2dbc采用的JPA的方式,通过Converter来实现。
介绍
R2dbc不用在Entity的注解标明,需要写好Converter,然后通过配置类进入到R2dbc中。
下边先来看看Configuration
@Configuration
public class R2dbcConfiguration extends AbstractR2dbcConfiguration{
@Value("${spring.r2dbc.url}")
private String url;
private final ObjectMapper objectMapper;
public R2dbcConfiguration(ObjectMapper objectMapper){
this.objectMapper = objectMapper;
}
@Override
public ConnectionFactory connectionFactory() {
return ConnectionFactories.get(url);
}
@Override
protected List<Object> getCustomConverters(){
return List.of(new ListReadingConverter(objectMapper),
new ListWriterConverter(objectMapper),
new UserReadingConverter(objectMapper),
new UserWriterConverter(objectMapper)
);
}
}
Converter的实现,有2个,一个是将String转换成Object,另外一个将Object转换成String,这里我采用了泛型的方式。
// 读的Converter,从String转成相应的Object
@Slf4j
@AllArgsConstructor
public abstract class BaseJsonReadingConverter<T> implements Converter<String, T> {
private ObjectMapper objectMapper;
// 获取泛型的Class
private Class<T> getTClass() {
return (Class<T>)((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
}
@Override
public T convert(String source) {
if(!(StringUtils.hasLength(source) && StringUtils.hasText(source))){
return null;
}
try {
return (T)objectMapper.readValue(source, getTClass());
} catch (Exception ex) {
log.error("Error converting json column data from database", ex);
}
return null;
}
}
// 写的Converter,将Object转成String
@Slf4j
@AllArgsConstructor
public abstract class BaseJsonWriterConverter<T> implements Converter<T, String> {
private ObjectMapper objectMapper;
@Override
public String convert(T source) {
String response = null;
try {
response = objectMapper.writeValueAsString(source);
} catch (JsonProcessingException e) {
log.error("Error parsing Object to database column", e);
}
return response;
}
// List的ReadingConverter
@Slf4j
@ReadingConverter
public class ListReadingConverter extends BaseJsonReadingConverter<List>{
public ListReadingConverter(ObjectMapper objectMapper) {
super(objectMapper);
}
}
// List的WriteConverter
@Slf4j
@WritingConverter
public class ListWriterConverter extends BaseJsonWriterConverter<List>{
public ListWriterConverter(ObjectMapper objectMapper) {
super(objectMapper);
}
}
经验
7. Reidis使用
Reids的使用参考官网 以及补充,与SpringMVC使用Redis基本一致。
-
引入依赖
implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive:3.0.0'
-
yml配置
spring:
# ...
data:
redis:
host: localhost
port: 6379
database: 2
timeout: 5000
repositories:
enabled: false
注:这里的配置中一定要将repositories:
的enabled: false
,否则,它会尝试使用非响应式的RedisTemplate,而导致失败,参考。
-
配置类中,生成ReactiveRedisTemplate,并注入到Spring
@Configuration
@Slf4j
public class ReactiveRedisConfig {
/**
* 默认日期时间格式
*/
public static final String DEFAULT_DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
/**
* 默认日期格式
*/
public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
/**
* 默认时间格式
*/
public static final String DEFAULT_TIME_FORMAT = "HH:mm:ss";
@Bean
public ReactiveRedisTemplate<String, Object> redisTemplate(ReactiveRedisConnectionFactory factory) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
//LocalDateTime系列序列化和反序列化模块,继承自jsr310,我们在这里修改了日期格式
JavaTimeModule javaTimeModule = new JavaTimeModule();
//序列化
javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(
DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)));
javaTimeModule.addSerializer(LocalDate.class,
new LocalDateSerializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_FORMAT)));
javaTimeModule.addSerializer(LocalTime.class,
new LocalTimeSerializer(DateTimeFormatter.ofPattern(DEFAULT_TIME_FORMAT)));
//反序列化
javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(
DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)));
javaTimeModule.addDeserializer(LocalDate.class,
new LocalDateDeserializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_FORMAT)));
javaTimeModule.addDeserializer(LocalTime.class,
new LocalTimeDeserializer(DateTimeFormatter.ofPattern(DEFAULT_TIME_FORMAT)));
//注册模块
objectMapper.registerModule(javaTimeModule);
//json的序列化
Jackson2JsonRedisSerializer<Object> valueSerializer = new Jackson2JsonRedisSerializer<>(objectMapper, Object.class);
StringRedisSerializer keySerializer = new StringRedisSerializer();
RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder = RedisSerializationContext.newSerializationContext(keySerializer);
RedisSerializationContext<String, Object> context = builder.value(valueSerializer).build();
return new ReactiveRedisTemplate<>(factory, context);
}
}
在Json序列化过程中,指定了Date的格式,这个不用也成,其他的与官网就类似了
-
通过ReactiveRedisTemplate操作Redis
这里简单举个例子:redisTemplate.opsForValue().get(token)
,这是最简单的key-value类型,也可以List、Set、Hash、ZSet等结构。
8. AOP的使用:@Cacheable的实现
介绍
在SpringMVC中可以通过注解(@Cacheable)的方式对数据进行缓存,这里并没有直接提供这项功能,于是通过AOP的方式做了实现,下边来看看。
-
引入依赖
implementation 'org.springframework.boot:spring-boot-starter-aop:3.0.0'
-
启动Aop
...
@EnableAspectJAutoProxy
public class EverylittleApplication {
...
}
-
定义注解
redis原来的有4个注解接口:@Cacheable、@CacheEvict、@CachePut、@Caching(以上3种的组合),我这里只实现了2种:@ReactiveCacheable,@ReactiveCacheEvict。
// 增加缓存
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReactiveCacheable {
/**
* 缓存key,key为cacheName+"_"+key
* 支持EL表达式
*/
String key();
/**
* 缓存key分组,会做为缓存key的前缀+"_"
* 支持EL表达式
*/
String cacheName();
/**
* 缓存过期时间,单位秒,默认24小时
*/
long timeout() default 24 * 3600L;
}
// 删除缓存注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReactiveCacheEvict {
/**
* 缓存key,如果cacheName不为空,则key为cacheName+"_"+key
* 支持EL表达式
*/
String key() default "";
/**
* 缓存key分组,会做为缓存key的前缀+"_"
* 支持EL表达式
*/
String cacheName();
/**
* 是否删除cacheName下全部缓存数据,
* true时cacheName不能为空,此时即便指定了key值,也会删除cacheName下全部缓存
* false时key值不能为空
*/
boolean allEntries() default false;
/**
* 调用清除缓存的时机,true:执行方法前,false:执行方法后
* 如果是false,则方法执行过程中发生异常,则不会清除缓存
*/
boolean beforeInvocation() default false;
}
-
通过Aop实现缓存逻辑
先定义切点(PoinCut),再定义切入的方式@Around、@Before或@AfterReturning,最后定义点处的逻辑。
@Component
@Aspect
@Slf4j
public class ReactiveCacheAspect {
private final ReactiveRedisTemplate<String, Object> redisTemplate;
public ReactiveCacheAspect(ReactiveRedisTemplate<String, Object> redisTemplate){
this.redisTemplate = redisTemplate;
}
// 定义切点
@Pointcut("@annotation(com.sun.everylittle.aop.ReactiveCacheable)")
public void cacheablePointCut() {
}
@Pointcut("@annotation(com.sun.everylittle.aop.ReactiveCacheEvict)")
public void cacheEvictPointCut() {
}
//环绕通知,一般不建议使用,可以通过@Before和@AfterReturning实现
//但是响应式方法只能通过环绕通知实现aop,因为其它通知会导致不再同一个线程执行
@Around("cacheablePointCut()")
public Mono<Object> cacheableAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
log.debug("ReactiveRedisCacheAspect cacheableAround....");
MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
Method method = methodSignature.getMethod();
ReactiveCacheable annotation = method.getAnnotation(ReactiveCacheable.class);
String cacheName = annotation.cacheName();
String key = annotation.key();
long timeout = annotation.timeout();
//转换EL表达式
cacheName = (String) AspectSupportUtils.getKeyValue(proceedingJoinPoint, cacheName);
key = ""+AspectSupportUtils.getKeyValue(proceedingJoinPoint, key);
String redis_key = cacheName + "_" + key;
return redisTemplate.hasKey(redis_key)
.flatMap(hasKey-> {
if(hasKey){
return redisTemplate.opsForValue().get(redis_key);
}else {
try {
return ((Mono<Object>)proceedingJoinPoint.proceed())
.zipWhen(o -> redisTemplate.opsForValue().set(redis_key, o, Duration.ofSeconds(timeout)))
.map(Tuple2::getT1);
} catch (Throwable e){
return Mono.error(new Exception(e.getMessage()));
}
}
});
}
// 逻辑实现
@Around("cacheEvictPointCut()")
public Mono<Object> cacheEvictAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
log.debug("ReactiveRedisCacheAspect cacheEvictAround....");
MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
Method method = methodSignature.getMethod();
String returnTypeName = method.getReturnType().getSimpleName();
ReactiveCacheEvict annotation = method.getAnnotation(ReactiveCacheEvict.class);
String cacheName = annotation.cacheName();
String annoKey = annotation.key();
boolean allEntries = annotation.allEntries();
boolean beforeInvocation = annotation.beforeInvocation();
//转换EL表达式
cacheName = (String) AspectSupportUtils.getKeyValue(proceedingJoinPoint, cacheName);
String key = ""+AspectSupportUtils.getKeyValue(proceedingJoinPoint, annoKey);
//执行方法前清除缓存
if (beforeInvocation) {
//清除全部缓存
return deleteRedisCache(cacheName, key, allEntries)
.flatMap(c -> {
try {
//实际执行的方法
return (Mono<Object>)proceedingJoinPoint.proceed();
} catch (Throwable e) {
return Mono.error(new RuntimeException(e));
}
});
} else {//成功执行方法后清除缓存
//实际执行的方法
final String cacheNameTemp = cacheName;
final String keyTemp = key;
return ((Mono<Object>)proceedingJoinPoint.proceed())
.zipWhen(obj->deleteRedisCache(cacheNameTemp, keyTemp, allEntries))
.map(Tuple2::getT1);
}
}
private Mono<Long> deleteRedisCache(String cacheName, String key, boolean clearAll) {
String redis_key = clearAll ? cacheName+"_*" : cacheName+"_"+key;
return redisTemplate.delete(redisTemplate.keys(redis_key));
}
}
注:这块代码基本出自 reactive-redis-cache-annotation-spring-boot-starter、文章,还有EL表达式的处理没有贴出,可以翻阅原地址查看。
-
使用
@ReactiveCacheable(cacheName="tag", key="#uId")
@GetMapping("/list/{type}")
@ResponseBody
public Mono<List<Tag>> getTagList(@PathVariable String type, @SessionAttribute(UserHolder.CONTEXT_KEY) Long uId){
System.out.println(uId);
return UserHolder.getUser()
.flatMapMany(userId -> tagRepository.findTagsByUserIdAndTypeOrderById(userId, Tag.TagType.valueOf(type)))
.collectList();
}
@ReactiveCacheEvict(cacheName = "tag", key="#uId")
@DeleteMapping("/i/{tagId}")
@ResponseBody
public Mono<String> deleteTag(@PathVariable Long tagId, @SessionAttribute(UserHolder.CONTEXT_KEY) Long uId){
return tagRepository.deleteById(tagId)
.thenReturn("");
}
经验
-
缓存的实现方式
缓存的实现方式除了用注解的方式,还可以用函数编程的方式在需要的地方直接缓存,但这样并不优雅,故而这里没有采用这种方式。参考
-
注解的类型
这里没有实现@CachePut,看了它的逻辑,先将缓存删除,然后再修改数据库,再存入缓存。我觉得这样做太麻烦,还存在一致性的风险,也就没有引入。再Get时使用@ReactiveCacheable,在Put与Delete时使用@ReactiveCacheEvit即可。
-
缓存key的选择
前边时候说过,一开始只实现了UserHoler来获取UserId的,这里由于缓存时候,需要用userId进行区别,也就用实现了用SessionAttribute来记录UserId的方式。参考
9. 测试
以前写单元测试,大多写的都是对Service的测试,这次写时没有直接没有写service,直接在Controller中实现的逻辑,这很不MVC,但却有点Webflux的Endpoint的样子。这导致我这里的测试也是对Controller的测试,其实与Apifox中的测试有点重合,故而这里只做简单的介绍。
介绍
-
引入依赖
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
-
StepVerifier
在使用前要介绍一个基本的概念:StepVerifier,参考。它是对Flux的每个元素进行验证的。示例如下:
@Test
void testStepVerifier(){
Flux<String> source = Flux.just("John", "Monica", "Mark", "Cloe", "Frank", "Casper", "Olivia", "Emily", "Cate")
.filter(name -> name.length() == 4)
.doOnNext(System.out::println)
.map(String::toUpperCase);
StepVerifier
.create(source)
.expectNext("JOHN")
.expectNextMatches(name -> name.startsWith("MA"))
.expectNext("CLOE", "CATE")
.expectComplete()
.verify();
}
通过expectNext,对每一个元素进行验证。还可以通过expectErrorMatches对异常进行验证。示例如下:
@Test
void testException(){
int i = 1;
Flux<String> source = Flux.just("John", "Monica", "Mark", "Cloe", "Frank", "Casper", "Olivia", "Emily", "Cate")
.filter(name -> name.length() == 4)
.doOnNext(t->System.out.println(i))
.map(String::toUpperCase);
Flux<String> error = source.concatWith(
Mono.error(new IllegalArgumentException("Our message")));
StepVerifier
.create(error)
.expectNextCount(4)
.expectErrorMatches(throwable -> throwable instanceof IllegalArgumentException &&
throwable.getMessage().equals("Our message")
).verify();
}
-
结合StepVerifier与WebClient对Controller进行验证
WebClient是Webflux的客户端,通过客户端访问服务,参考
@Test
void testTaskGet(){
private final WebClient webClient = WebClient.create("http://localhost:8080");
Mono<MyResponseBody> bodyMono = webClient
.get()
.uri("/task/list")
.cookie("token", token)
.retrieve()
.bodyToMono(MyResponseBody.class);
StepVerifier
.create(bodyMono)
.consumeNextWith(entity ->{
System.out.println(entity);
assert(entity.getCode().equals(MyResponseCode.OK.getCode()));
assert(((List<Task>)entity.getData()).size()==5);
})
.expectComplete()
.verify();
}
经验
10. 总结
本文对webflux实践中的要点进行了整理,大约包括web与存取2大部分,逻辑上都比较清晰,主要是工程上的调试。
web部分包括:Exception的处理、filter的使用、Response的装饰、AOP的使用;
存取部分包括:R2dbc的分页、Auditing的实现、Redis的使用等;