如何在Java 9中使用Flow API实现反应式流?
自Java9以来,FlowAPI是对反应流规范的官方支持。它是Iterator 和Observer 模式的组合。的流API是一个互操作规范,而不是最终用户API等RxJava。
FlowAPI包含四个基本接口:
订阅服务器:订阅服务器订阅发布服务器以进行回调。
发布者:发布者将数据项流发布给注册的订阅者。
订阅:发布者和订阅者之间的链接。
处理器:处理器位于发布者和订阅者之间,并将一个流转换为另一流。
在下面的示例中,我们创建了一个基本订阅服务器,该订阅服务器请求一个数据对象,将其打印并再请求一个。我们可以使用Java(SubmissionPublisher)提供的发布者实现来完成我们的会话。
示例
import java.util.concurrent.Flow;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
class MySubscriber<T>implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
@Override public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override public void onNext(T item) {
System.out.println(item);
subscription.request(1);
}
@Override public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override public void onComplete() {
System.out.println("Done");
}
}// main classpublic class FlowTest {
public static void main(String args[]) {
List<String> items = List.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(new MySubscriber<>());
items.forEach(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
publisher.submit(s);
});
publisher.close();
}
}输出结果
1 2 3 4 5 6 7 8 910 Done