概述
函数编程接口与lambda
函数编程接口是一种只有一个方法的接口,2个条件:interface、有且只有一个抽象的方法(可以有多个非抽象的方法)。
常见的5种函数编程接口:Supplier、Consumer、Function、UnaryOperator、BiFunction,下边简单来看看。
-
Supplier
定义:
@FunctionalInterface
public interface Supplier<T> {
T get();
}
特点:没有输入,只有输出。
-
Consumer
定义:
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
...
}
特点:只有输入,没有输出。
-
Function
定义:
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
...
}
特点:一个输入,一个输出,输入输出类型可以不同。
-
UnaryOperator
定义:
@FunctionalInterface
public interface UnaryOperator<T> extends Function<T, T> {
...
}
特点:一个输入,一个输出,输入输出类型相同。
-
BiFunction
定义:
@FunctionalInterface
public interface BiFunction<T, U, R> {
R apply(T t, U u);
...
}
特点:两个输入,一个输出。
-
lambda
lambda表达式是一种函数编程接口实例化的语法糖。它实例化了一个函数编程接口,实例化的接口的输入、输出需要跟函数编程接口相一致,如下:
Supplier<String> supplier = () -> "gaga";
Consumer<String> consumer = str -> System.out.println(str);
Function<Integer, Integer> function = integer -> integer*integer;
Stream
有了lambda之后,Stream就比较好理解了。这里包括stream创建、中间操作、终止操作3部分。
stream创建
包括:
-
Arrays.stream(),数组到流
String[] arr = {"Apple", "Banana", "Cherry", "Pair"};
Arrays.stream(arr).forEach(System.out::println);
-
Collection.stream(),列表等容器到流
Arrays.asList(1,2,3,4).stream().forEach(System.out::println);
-
Stream.of(),直接构建流
Stream.of(1,2,3,4).forEach(System.out::println);
-
Stream.iterate(),利用迭代器
Stream.iterate(1,i->i+1).limit(10).forEach(System.out::println);
-
Stream.generate(),利用生成器
Stream.generate(()->new Random().nextInt(10)).limit(10).forEach(System.out::println);
中间操作
所谓中间操作,就是经过这个操作后,返回结果仍然是stream的操作。包括:filter、map、flatMap、peek、distinct、sort、limit等操作
String[] arr = {"react", "", "react", "spring", "bo_le"};
Stream.of(arr)
.filter( i-> !i.isEmpty())
.distinct()
.sorted()
.limit(1)
.map(s->s.replace("_", ""))
.flatMap( s -> Stream.of(s.split("")))
.sorted()
.forEach(System.out::print);
终止操作
所谓终止操作,就是经过这个操作后,返回结果不再是stream的操作。包括:forEach、collect、reduce、min/max、count、findFirst/findAny、anyMatch/allMatch/noneMath
这里只摆一个reduce的
Optional<String> str = Stream.of(arr)
.filter( i-> !i.isEmpty())
.distinct()
.sorted()
.limit(1)
.map(s->s.replace("_", ""))
.flatMap( s -> Stream.of(s.split("")))
.sorted()
.reduce(String::concat);
System.out.println(str.orElse("fail to get"));
Stream这一套与MapReduce的算子很像。
ReactiveStream
ReactiveStream的结构图如下:
基本概念包括:
- Publisher
- Subscriber
- Subscription
- Backpressure
- Processor
一句话:Publisher与Subscriber达成订阅协议Subscription,Processor是Publisher与Subscriber的中间人,负责对发布内容的处理。
Backpressure是Subscriber根据自己需要,对订阅的流量进行约束的概念。
ReactiveStream的概念是在Java9的Flow中提出的,Spring也对此进行了实现,下边看看几个接口。
接口定义
-
Publisher
@FunctionalInterface
public static interface Publisher<T> {
// 签订协议
public void subscribe(Subscriber<? super T> subscriber);
}
-
Subscriber
public static interface Subscriber<T> {
// 协议签订成功回调
public void onSubscribe(Subscription subscription);
// 接收到发布的内容回调
public void onNext(T item);
// 发生错误回调
public void onError(Throwable throwable);
// 结束回调
public void onComplete();
}
-
Subscription
public static interface Subscription {
// 订阅者允许消息的个数
public void request(long n);
public void cancel();
}
-
Processor
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
既是Subscriber又是Publisher
示例
public static void main(String[] args) {
// 1. 创建一个发布者
SubmissionPublisher publisher = new SubmissionPublisher();
// 2. 订阅者
Flow.Subscriber subscriber = new Flow.Subscriber() {
// 协议
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("建立订阅关系");
this.subscription = subscription;
subscription.request(1); // 背压:允许1次消息
}
@Override
public void onNext(Object item) {
// 业务处理
System.out.println("接收数据:"+ item);
// 背压控制
subscription.request(10); // 背压:允许10次消息,如果这里不加,则不会再接收到消息。
}
@Override
public void onError(Throwable throwable) {
System.out.println("错误");
}
@Override
public void onComplete() {
System.out.println("完成");
}
};
// 3. 订阅关系
publisher.subscribe(subscriber);
// 4. 发送数据
for (int i = 0; i < 20; i++) {
publisher.submit("hello reactive stream"+i);
}
// 5. 关闭
publisher.close();
try {
Thread.currentThread().join(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Subscriber 中对消息的处理回调onNext,与主线程不是一个线程。
Reactor
Flux与Mono
Reactor是响应式编程的核心库,这里主要来介绍它的两个基本概念(Publisher):Flux与Mono,它俩是包含数据的publisher。
Flux是一个有0个或N个元素的响应式的序列,如下图所示:

Mono是一个0个或1个的响应式对象,如下图所示:

注:Reactive Stream是响应与流两个概念的结合,这在Flux与Mono中体现的很清楚,它们的操作跟Stream很像,却又是发布订阅的方式存在的。
示例
// 创建方式
public void create(){
// mono创建方式 0 - 1
Mono.just("hello Mono").subscribe(System.out::println);
// Flux创建方式 0 - N
Flux.just(1,2,3,4).subscribe(System.out::println);
Flux.fromIterable(Arrays.asList(1,2,3,4)).subscribe(System.out::println);
Flux.fromArray(new String[]{"a", "b", "c", "d"}).subscribe(System.out::println);
Flux.fromStream(Stream.of(1,2,3,4)).subscribe(System.out::println);
Flux.range(1, 5).subscribe(System.out::println);
}
// 操作方式,与Stream很像
public void operate(){
String src = "In order to dig deeper into the core features of Reactor, head to Reactor Core Features to learn";
Flux.fromArray(src.split(" "))
.flatMap(s -> Flux.fromArray(s.split("")))
.distinct()
.sort()
.subscribe(System.out::print);
}
源码分析
从源码上看, 流的各种操作Operator(flatMap、distinct、sort)都等价于前边讲的Processor,即是订阅者又是发布者。
与前边将的通过publisher的submit方法触发不同,Mono/Flux是当subscriber的subscribe方法调用时进行出发,它是一个回路,只要接通整个回路就能运转,哪里接通并不重要。
Flux的subscribe方法
public final void subscribe(Subscriber<? super T> actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
if (subscriber instanceof Fuseable.QueueSubscription && this != publisher && this instanceof Fuseable && !(publisher instanceof Fuseable)) {
subscriber = new FluxHide.SuppressFuseableSubscriber<>(subscriber);
}
try {
if (publisher instanceof OptimizableOperator) {
OptimizableOperator operator = (OptimizableOperator) publisher;
while (true) {
// opertor订阅subscriber的同时,封装一个新的subscriber,以此成链
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
return;
}
// 下一个operator
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
publisher = operator.source();
break;
}
operator = newSource;
}
}
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
为什么是等价于呢,因为它不是直接继承接口,而是用了内部类来做Subscriber的事情。以FluxDistinct
为例
final class FluxDistinct<T, K, C> extends InternalFluxOperator<T, T> {
....
static final class DistinctSubscriber<T, K, C>
implements ConditionalSubscriber<T>, InnerOperator<T, T> {
....
}
...
}
Webflux
WebFlux是一套建立在Reactor基础的响应式web框架,这里简单看看它的2种编程模式:注解型与函数型。
通过Spring Initializr创建一个Spring Reactive Web的工程
@RestController
@SpringBootApplication
public class WebfluxDemoApplication {
public static void main(String[] args) {
SpringApplication.run(WebfluxDemoApplication.class, args);
}
// 注解型方式
@GetMapping("/annotate")
public Mono<String> getByAnnotate(){
return Mono.just("hello world by annotate");
}
// 函数型方式
@Bean
public RouterFunction<ServerResponse> route(){
return RouterFunctions.route()
.GET("/func", new HandlerFunction<ServerResponse>() {
@Override
public Mono<ServerResponse> handle(ServerRequest request) {
return ServerResponse.ok().bodyValue("hello world by function");
}
}).build();
}
}
函数型中RouterFunction对应注解型的GetMapping,HandlerFunction对应具体的函数。