SpringCloud之三:熔断限流

基本概念

设想一个集群环境,某一个节点出现了故障或者压力过大的情况,如何保证整个集群不会受这个节点的影响?

  1. 断路(circuit-breaker)

    针对故障情况,给出解决方案是断路,指的是故障节点的上游通过一定方式,使其自身不受下游故障的拖累。

  2. 限流

    限流是针对压力过大的情况,bulkhead通过一定策略限定调用下游的并发,使自身的压力不要传递到下游;ratelimiter通过一定策略限定一段时间内被上游调用的次数,来控制不要被上游的压力影响到。

    bulkhead是一种空间限制方式,ratelimiter是一种时间限制方式。

在组件的选择上有Netflix Hystrix,但它已经不再继续维护的;还有Resilience4j,这里采用后者。

注:这一部分的测试,可以用actual去查看metrics监控。

实现原理

  • 基本原理

    基本是基于AOP的,是对原来的调用做了一层装饰。

  • 断路

    断路器根据调用的结果(或者一些状态)来判断下游是否发生了故障,若符合触发条件,则进行断路;

    断路之后,需要将调用转向自定义的降级处理上;

    还需要有一定的条件进行嗅探,若下游故障恢复,则重新调用;

    基本流程如下图:

    深度截图_选择区域_20210726174200

  • 限流

    根据空间或者时间的限制条件,对调用进行约束与限制。

    对于上游提供了bulkhead的方式,对于下游提供了ratelimiter的方式。

    bulkhead是限定上游对下游调用的并发度,高于某并发,就排队等待,等待超时,则直接返回。

    ratelimite是限定下游被上游调用的频率,在一段时间内高于某个阈值,则排队等待,等到超时,则直接返回。

resilience4j组件介绍

resilience4j将各个功能封装在不同组件中,如下:

组件名称功能
resilience4j-circuitbreaker断路器
resilience4j-ratelimiter频率控制
resilience4j-bulkhead负载保护
resilience4j-retry自动重试
resilience4j-cache应答缓存
resilience4j-timelimiter超时控制
resilience4j-feignfeign适配器
resilience4j-PromethuesPromethues Metrics输出
resilience4j-micrometermicrometer metrics输出
resilience4j-spring-boot2spring boot 2 starter

断路circuit breaker

  1. 依赖

    <dependency>
    			<groupId>io.github.resilience4j</groupId>
    			<artifactId>resilience4j-spring-boot2</artifactId>
    			<version>0.14.1</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-consul-discovery</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-openfeign</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>io.github.openfeign</groupId>
    			<artifactId>feign-httpclient</artifactId>
    		</dependency>
    

    依赖consul做服务发现,所以需要启动consul,依赖feign去做rest调用

    resilience4j-spring-boot2会将circuit breaker、bulkhead、ratelimiter、retry组件都引入进来

  2. 配置

    这里分别配置了Metrics输出、feign、consul以及circuit breaker。

    management.endpoints.web.exposure.include=*
    management.endpoint.health.show-details=always
    
    feign.client.config.default.connect-timeout=500
    feign.client.config.default.read-timeout=500
    
    spring.cloud.consul.host=localhost
    spring.cloud.consul.port=8500
    spring.cloud.consul.discovery.prefer-ip-address=true
    
    resilience4j.circuitbreaker.backends.断路器名xxx.failure-rate-threshold=50
    resilience4j.circuitbreaker.backends.断路器名xxx.wait-duration-in-open-state=5000
    resilience4j.circuitbreaker.backends.断路器名xxx.ring-buffer-size-in-closed-state=5	
    resilience4j.circuitbreaker.backends.断路器名xxx.ring-buffer-size-in-half-open-state=3
    resilience4j.circuitbreaker.backends.断路器名xxx.event-consumer-buffer-size=10
    
    

    这里的断路器名xxx为配置的断路器名称。

  3. 启用服务注册与feign

    @SpringBootApplication
    @Slf4j
    @EnableDiscoveryClient
    @EnableFeignClients
    @EnableAspectJAutoProxy
    public class CustomerServiceApplication {
        ...
    }
    
  4. feign的客户端service

    @FeignClient(name = "waiter-service", contextId = "coffee", path = "/coffee")
    public interface CoffeeService {
        @GetMapping(path = "/", params = "!name")
        List<Coffee> getAll();
    
        @GetMapping("/{id}")
        Coffee getById(@PathVariable Long id);
    
        @GetMapping(path = "/", params = "name")
        Coffee getByName(@RequestParam String name);
    }
    
    
  5. 编程使用方式

    有两种使用方式,一种是注解方式,另外一种是注入breaker的编程方式。

    @RestController
    @RequestMapping("/customer")
    @Slf4j
    public class CustomerController {
        @Autowired
        private CoffeeService coffeeService;
    
        private CircuitBreaker circuitBreaker;
    
        public CustomerController(CircuitBreakerRegistry registry) {
            circuitBreaker = registry.circuitBreaker("menu");
        }
    
        @GetMapping("/menu")
        public List<Coffee> readMenu() {
            return Try.ofSupplier(
                    CircuitBreaker.decorateSupplier(circuitBreaker,
                            () -> coffeeService.getAll()))
                    .recover(CircuitBreakerOpenException.class, Collections.emptyList())
                    .get();
        }
    }
    
  6. 注解使用方式

    @RestController
    @RequestMapping("/customer")
    @Slf4j
    public class CustomerController {
        @Autowired
        private CoffeeOrderService coffeeOrderService;
    
        @PostMapping("/order")
        @io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker(name = "order")
        public CoffeeOrder createOrder() {
            NewOrderRequest orderRequest = NewOrderRequest.builder()
                    .customer("Li Lei")
                    .items(Arrays.asList("capuccino"))
                    .build();
            CoffeeOrder order = coffeeOrderService.create(orderRequest);
            log.info("Order ID: {}", order != null ? order.getId() : "-");
            return order;
        }
    }
    

    注:这里的注解方式并没有给出降级的处理。

限流bulkhead

bulkhead是一种隔舱的设计,防止发生连环故障,方式主要是控制访问下游的并发,高于某个阈值之后就做排队或者直接返回失败。

使用方式与circuit breaker类似,也是有注入编程方式与注解方式2种。

  1. 依赖

    依赖与circuit breaker相同,引入resilience4j-spring-boot2接口

  2. 配置

    resilience4j.bulkhead.backends.order.max-concurrent-call=1
    resilience4j.bulkhead.backends.order.max-wait-time=5
    
    resilience4j.bulkhead.backends.menu.max-concurrent-call=5
    resilience4j.bulkhead.backends.menu.max-wait-time=5
    

    配置最大并发数、最大等待时间。

  3. 2种使用方式

    通过注入bulkhead来使用,以及通过Bulkhead的annotation来使用2种

    @RestController
    @RequestMapping("/customer")
    @Slf4j
    public class CustomerController {
        @Autowired
        private CoffeeService coffeeService;
        @Autowired
        private CoffeeOrderService coffeeOrderService;
        private CircuitBreaker circuitBreaker;
        private Bulkhead bulkhead;
    
        public CustomerController(CircuitBreakerRegistry circuitBreakerRegistry,
                                  BulkheadRegistry bulkheadRegistry) {
            circuitBreaker = circuitBreakerRegistry.circuitBreaker("menu");
            bulkhead = bulkheadRegistry.bulkhead("menu");
        }
    
        @GetMapping("/menu")
        public List<Coffee> readMenu() {
            return Try.ofSupplier(
                    Bulkhead.decorateSupplier(bulkhead,
                            CircuitBreaker.decorateSupplier(circuitBreaker,
                                    () -> coffeeService.getAll())))
                    .recover(CircuitBreakerOpenException.class, Collections.emptyList())
                    .recover(BulkheadFullException.class, Collections.emptyList())
                    .get();
        }
    
        @PostMapping("/order")
        @io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker(name = "order")
        @io.github.resilience4j.bulkhead.annotation.Bulkhead(name = "order")
        public CoffeeOrder createOrder() {
            NewOrderRequest orderRequest = NewOrderRequest.builder()
                    .customer("Li Lei")
                    .items(Arrays.asList("capuccino"))
                    .build();
            CoffeeOrder order = coffeeOrderService.create(orderRequest);
            log.info("Order ID: {}", order != null ? order.getId() : "-");
            return order;
        }
    }
    

    可以用ab去模拟并发,判断是否已经限流。

    源码:resilience4j-spring-boot/ BulkheadAutoConfiguration => BulkheadConfiguration => BulkheadConfigurationProperties => BulkheadAspeck。

限流ratelimiter

限定特定时间段内执行的次数,超过次数就等待,等待超时就返回。

使用方法与circuit breaker、bulkhead相同,一般情况下与bulkhead连用,一起来进行限流。

  1. 依赖

    与上面相同

  2. 配置

    resilience4j.ratelimiter.limiters.coffee.limit-for-period=5
    resilience4j.ratelimiter.limiters.coffee.limit-refresh-period-in-millis=30000
    resilience4j.ratelimiter.limiters.coffee.timeout-in-millis=5000
    resilience4j.ratelimiter.limiters.coffee.subscribe-for-events=true
    resilience4j.ratelimiter.limiters.coffee.register-health-indicator=true
    

    limit-for-period:调用的限定次数

    limit-refresh-period-in-millis:多少毫秒内

    timeout-in-millis:等到超时时间

  3. 2种使用方式

    与上边相同,也有注入编程使用方式与注解使用方式2种。

    注:RateLimiter等注解可以直接加到类上,对整个Contoller进行限流。

    @RestController
    @RequestMapping("/order")
    @Slf4j
    public class CoffeeOrderController {
        @Autowired
        private CoffeeOrderService orderService;
        @Autowired
        private CoffeeService coffeeService;
        private RateLimiter rateLimiter;
    
        public CoffeeOrderController(RateLimiterRegistry rateLimiterRegistry) {
            rateLimiter = rateLimiterRegistry.rateLimiter("order");
        }
    
        @GetMapping("/{id}")
        public CoffeeOrder getOrder(@PathVariable("id") Long id) {
            CoffeeOrder order = null;
            try {
                order = rateLimiter.executeSupplier(() -> orderService.get(id));
                log.info("Get Order: {}", order);
            } catch(RequestNotPermitted e) {
                log.warn("Request Not Permitted! {}", e.getMessage());
            }
            return order;
        }
    
        @PostMapping(path = "/", consumes = MediaType.APPLICATION_JSON_VALUE,
                produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        @ResponseStatus(HttpStatus.CREATED)
        @io.github.resilience4j.ratelimiter.annotation.RateLimiter(name = "order")
        public CoffeeOrder create(@RequestBody NewOrderRequest newOrder) {
            log.info("Receive new Order {}", newOrder);
            Coffee[] coffeeList = coffeeService.getCoffeeByName(newOrder.getItems())
                    .toArray(new Coffee[] {});
            return orderService.createOrder(newOrder.getCustomer(), coffeeList);
        }
    }
    

    源码:resilience4j-spring-boot/ RateLimiterAutoConfiguration => RateLimiterConfiguration => RateLimiterConfigurationProperties => LimiterProperties => RateLimiterAspect。

思考

感知层面与基本的保护层面

与K8S相比,流量过量之后可以自动调度还是有差距的

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×