响应式编程基础

概述

2022-12-12 11-30-52屏幕截图

函数编程接口与lambda

函数编程接口是一种只有一个方法的接口,2个条件:interface、有且只有一个抽象的方法(可以有多个非抽象的方法)。

常见的5种函数编程接口:Supplier、Consumer、Function、UnaryOperator、BiFunction,下边简单来看看。

  • Supplier

    定义:

    @FunctionalInterface
    public interface Supplier<T> {
        T get();
    }
    

    特点:没有输入,只有输出。

  • Consumer

    定义:

    @FunctionalInterface
    public interface Consumer<T> {
        void accept(T t);
        ...
    }
    

    特点:只有输入,没有输出。

  • Function

    定义:

    @FunctionalInterface
    public interface Function<T, R> {
        R apply(T t);
        ...
    }
    

    特点:一个输入,一个输出,输入输出类型可以不同。

  • UnaryOperator

    定义:

    @FunctionalInterface
    public interface UnaryOperator<T> extends Function<T, T> {
        ...
    }
    

    特点:一个输入,一个输出,输入输出类型相同。

  • BiFunction

    定义:

    @FunctionalInterface
    public interface BiFunction<T, U, R> {
        R apply(T t, U u);
        ...
    }
    

    特点:两个输入,一个输出。

  • lambda

    lambda表达式是一种函数编程接口实例化的语法糖。它实例化了一个函数编程接口,实例化的接口的输入、输出需要跟函数编程接口相一致,如下:

    Supplier<String> supplier = () -> "gaga";
    Consumer<String> consumer = str -> System.out.println(str);
    Function<Integer, Integer> function = integer -> integer*integer;
    

Stream

有了lambda之后,Stream就比较好理解了。这里包括stream创建、中间操作、终止操作3部分。

stream创建

包括:

  • Arrays.stream(),数组到流

    String[] arr = {"Apple", "Banana", "Cherry", "Pair"};
    Arrays.stream(arr).forEach(System.out::println);
    
  • Collection.stream(),列表等容器到流

    Arrays.asList(1,2,3,4).stream().forEach(System.out::println);
    
  • Stream.of(),直接构建流

    Stream.of(1,2,3,4).forEach(System.out::println);
    
  • Stream.iterate(),利用迭代器

    Stream.iterate(1,i->i+1).limit(10).forEach(System.out::println);
    
  • Stream.generate(),利用生成器

    Stream.generate(()->new Random().nextInt(10)).limit(10).forEach(System.out::println);
    

中间操作

所谓中间操作,就是经过这个操作后,返回结果仍然是stream的操作。包括:filter、map、flatMap、peek、distinct、sort、limit等操作

 String[] arr = {"react", "", "react", "spring", "bo_le"};
Stream.of(arr)
                .filter( i-> !i.isEmpty())
                .distinct()
                .sorted()
                .limit(1)
                .map(s->s.replace("_", ""))
                .flatMap( s -> Stream.of(s.split("")))
                .sorted()
                .forEach(System.out::print);

终止操作

所谓终止操作,就是经过这个操作后,返回结果不再是stream的操作。包括:forEach、collect、reduce、min/max、count、findFirst/findAny、anyMatch/allMatch/noneMath

这里只摆一个reduce的

 Optional<String> str = Stream.of(arr)
                .filter( i-> !i.isEmpty())
                .distinct()
                .sorted()
                .limit(1)
                .map(s->s.replace("_", ""))
                .flatMap( s -> Stream.of(s.split("")))
                .sorted()
                .reduce(String::concat);
System.out.println(str.orElse("fail to get"));

Stream这一套与MapReduce的算子很像。

ReactiveStream

ReactiveStream的结构图如下:

2022-12-12 11-34-07屏幕截图

基本概念包括:

  • Publisher
  • Subscriber
  • Subscription
  • Backpressure
  • Processor

一句话:Publisher与Subscriber达成订阅协议Subscription,Processor是Publisher与Subscriber的中间人,负责对发布内容的处理。

Backpressure是Subscriber根据自己需要,对订阅的流量进行约束的概念。

ReactiveStream的概念是在Java9的Flow中提出的,Spring也对此进行了实现,下边看看几个接口。

接口定义

  • Publisher

     @FunctionalInterface
        public static interface Publisher<T> {
            // 签订协议
            public void subscribe(Subscriber<? super T> subscriber);
        }
    
  • Subscriber

    public static interface Subscriber<T> {
        // 协议签订成功回调
            public void onSubscribe(Subscription subscription);
    
        // 接收到发布的内容回调
            public void onNext(T item);
    
        // 发生错误回调
            public void onError(Throwable throwable);
    
        // 结束回调
            public void onComplete();
        }
    
    • Subscription

          public static interface Subscription {
              // 订阅者允许消息的个数
              public void request(long n);
              public void cancel();
          }
      
    • Processor

         public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
      

      既是Subscriber又是Publisher

示例

 public static void main(String[] args)  {
        // 1. 创建一个发布者
        SubmissionPublisher publisher = new SubmissionPublisher();

        // 2. 订阅者
        Flow.Subscriber subscriber = new Flow.Subscriber() {
            // 协议
            Flow.Subscription subscription;
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                System.out.println("建立订阅关系");
                this.subscription = subscription;
                subscription.request(1);        // 背压:允许1次消息
            }

            @Override
            public void onNext(Object item) {
                // 业务处理
                System.out.println("接收数据:"+ item);
                // 背压控制
                subscription.request(10);   // 背压:允许10次消息,如果这里不加,则不会再接收到消息。
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("错误");
            }
            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        };

        // 3. 订阅关系
        publisher.subscribe(subscriber);

        // 4. 发送数据
        for (int i = 0; i < 20; i++) {
            publisher.submit("hello reactive stream"+i);
        }
     // 5. 关闭
        publisher.close();

        try {
            Thread.currentThread().join(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

Subscriber 中对消息的处理回调onNext,与主线程不是一个线程。

Reactor

Flux与Mono

Reactor是响应式编程的核心库,这里主要来介绍它的两个基本概念(Publisher):Flux与Mono,它俩是包含数据的publisher。

Flux是一个有0个或N个元素的响应式的序列,如下图所示:

image-20221212170958577

Mono是一个0个或1个的响应式对象,如下图所示:

image-20221212171347880

注:Reactive Stream是响应与流两个概念的结合,这在Flux与Mono中体现的很清楚,它们的操作跟Stream很像,却又是发布订阅的方式存在的。

示例

// 创建方式
public void create(){
        // mono创建方式 0 - 1
        Mono.just("hello Mono").subscribe(System.out::println);

        // Flux创建方式 0 - N
        Flux.just(1,2,3,4).subscribe(System.out::println);
        Flux.fromIterable(Arrays.asList(1,2,3,4)).subscribe(System.out::println);
        Flux.fromArray(new String[]{"a", "b", "c", "d"}).subscribe(System.out::println);
        Flux.fromStream(Stream.of(1,2,3,4)).subscribe(System.out::println);
        Flux.range(1, 5).subscribe(System.out::println);
    }

// 操作方式,与Stream很像
    public void operate(){
        String src = "In order to dig deeper into the core features of Reactor, head to Reactor Core Features to learn";
        Flux.fromArray(src.split(" "))
                .flatMap(s -> Flux.fromArray(s.split("")))
                .distinct()
                .sort()
                .subscribe(System.out::print);
    }

源码分析

从源码上看, 流的各种操作Operator(flatMap、distinct、sort)都等价于前边讲的Processor,即是订阅者又是发布者。

与前边将的通过publisher的submit方法触发不同,Mono/Flux是当subscriber的subscribe方法调用时进行出发,它是一个回路,只要接通整个回路就能运转,哪里接通并不重要。

Flux的subscribe方法

public final void subscribe(Subscriber<? super T> actual) {
		CorePublisher publisher = Operators.onLastAssembly(this);
		CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

		if (subscriber instanceof Fuseable.QueueSubscription && this != publisher && this instanceof Fuseable && !(publisher instanceof Fuseable)) {
			subscriber = new FluxHide.SuppressFuseableSubscriber<>(subscriber);
		}

		try {
			if (publisher instanceof OptimizableOperator) {
				OptimizableOperator operator = (OptimizableOperator) publisher;
				while (true) {
                    // opertor订阅subscriber的同时,封装一个新的subscriber,以此成链
					subscriber = operator.subscribeOrReturn(subscriber);
					if (subscriber == null) {
						return;
					}
                    // 下一个operator
					OptimizableOperator newSource = operator.nextOptimizableSource();
					if (newSource == null) {
						publisher = operator.source();
						break;
					}
					operator = newSource;
				}
			}

			publisher.subscribe(subscriber);
		}
		catch (Throwable e) {
			Operators.reportThrowInSubscribe(subscriber, e);
			return;
		}
	}

为什么是等价于呢,因为它不是直接继承接口,而是用了内部类来做Subscriber的事情。以FluxDistinct为例

final class FluxDistinct<T, K, C> extends InternalFluxOperator<T, T> {
    ....
    	static final class DistinctSubscriber<T, K, C>
			implements ConditionalSubscriber<T>, InnerOperator<T, T> {
           ....
        }
    ...
}

Webflux

WebFlux是一套建立在Reactor基础的响应式web框架,这里简单看看它的2种编程模式:注解型与函数型。

通过Spring Initializr创建一个Spring Reactive Web的工程

@RestController
@SpringBootApplication
public class WebfluxDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebfluxDemoApplication.class, args);
    }

// 注解型方式
    @GetMapping("/annotate")
    public Mono<String> getByAnnotate(){
        return Mono.just("hello world by annotate");
    }

// 函数型方式
    @Bean
    public RouterFunction<ServerResponse> route(){
        return RouterFunctions.route()
                .GET("/func", new HandlerFunction<ServerResponse>() {
                    @Override
                    public Mono<ServerResponse> handle(ServerRequest request) {
                        return ServerResponse.ok().bodyValue("hello world by function");
                    }
                }).build();
    }

}

函数型中RouterFunction对应注解型的GetMapping,HandlerFunction对应具体的函数。

# 语言 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×