前言
前兩天面試的時候,面試官問我:一個ip發請求過來,是一個ip對應一個線程嗎?我突然愣住了,對於SpringBoot如何處理請求好像從來沒仔細思考過,所以面試結束後就仔細研究了一番,現在就來探討一下這個問題。
正文
我們都知道,SpringBoot默認的內嵌容器是tomcat,也就是我們的程序實際上是運行在Tomcat里的。所以與其說SpringBoot可以處理多少請求,到不如說Tomcat可以處理多少請求。
關於Tomcat的默認配置,都在spring-configuration-metadata.json文件中,對應的配置類則是org.springframework.boot.autoconfigure.web.ServerProperties。
和處理請求數量相關的參數有四個:
- server.tomcat.threads.min-spare:最少的工作線程數,默認大小是10。該參數相當於長期工,如果並發請求的數量達不到10,就會依次使用這幾個線程去處理請求。
- server.tomcat.threads.max:最多的工作線程數,默認大小是200。該參數相當於臨時工,如果並發請求的數量在10到200之間,就會使用這些臨時工線程進行處理。
- server.tomcat.max-connections:最大連接數,默認大小是8192。表示Tomcat可以處理的最大請求數量,超過8192的請求就會被放入到等待隊列。
- server.tomcat.accept-count:等待隊列的長度,默認大小是100。
舉個例子說明一下這幾個參數之間的關係:
如果把Tomcat比作一家飯店的話,那麼一個請求其實就相當於一位客人。min-spare就是廚師(長期工);max是廚師總數(長期工+臨時工);max-connections就是飯店裡的座位數量;accept-count是門口小板凳的數量。來的客人優先坐到飯店裡面,然後廚師開始忙活,如果長期工可以乾的完,就讓長期工干,如果長期工干不完,就再讓臨時工干。圖中畫的廚師一共15人,飯店裡有30個座位,也就是說,如果現在來了20個客人,那麼就會有5個人先在飯店裡等着。如果現在來了35個人,飯店裡坐不下,就會讓5個人先到門口坐一下。如果來了50個人,那麼飯店座位+門口小板凳一共40個,所以就會有10人離開。
也就是說,SpringBoot同時所能處理的最大請求數量是max-connections+accept-count,超過該數量的請求直接就會被丟掉。
紙上得來終覺淺,絕知此事要躬行。
上面只是理論結果,現在通過一個實際的小例子來演示一下到底是不是這樣:
創建一個SpringBoot的項目,在application.yml里配置一下這幾個參數,因為默認的數量太大,不好測試,所以配小一點:
server:
tomcat:
threads:
# 最少線程數
min-spare: 10
# 最多線程數
max: 15
# 最大連接數
max-connections: 30
# 最大等待數
accept-count: 10
複製代碼
再來寫一個簡單的接口:
@GetMapping("/test")
public Response test1(HttpServletRequest request) throws Exception {
log.info("ip:{},線程:{}", request.getRemoteAddr(), Thread.currentThread().getName());
Thread.sleep(500);
return Response.buildSuccess();
}
複製代碼
代碼很簡單,只是打印了一下線程名,然後休眠0.5秒,這樣肯定會導致部分請求處理一次性處理不了而進入到等待隊列。
然後我用Apifox創建了一個測試用例,去模擬100個請求:
觀察一下測試結果:
從結果中可以看出,由於設置的 max-connections+accept-count 的和是40,所以有60個請求會被丟棄,這和我們的預期是相符的。由於最大線程是15,也就是有25個請求會先等待,等前15個處理完了再處理15個,最後在處理10個,也就是將40個請求分成了15,15,10這樣三批進行處理。
再從控制台的打印日誌可以看到,線程的最大編號是15,這也印證了前面的想法。
總結一下:如果並發請求數量低於server.tomcat.threads.max,則會被立即處理,超過的部分會先進行等待,如果數量超過max-connections與accept-count之和,則多餘的部分則會被直接丟棄。
延伸:並發問題是如何產生的
到目前為止,就已經搞明白了SpringBoot可以同時處理多少請求的問題。但是在這裡我還想基於上面的例子再延伸一下,就是為什麼並發場景下會出現一些值和我們預期的不一樣?
設想有以下場景:廚師們用一個賬本記錄一共做了多少道菜,每個廚師做完菜都記錄一下,每次記錄都是將賬本上的數字先抄到草稿紙上,計算x+1等於多少,然後將計算的結果寫回到賬本上。
Spring容器中的Bean默認是單例的,也就是說,處理請求的Controller、Service實例就只有一份。在並發場景下,將cookSum定義為全局變量,是所有線程共享的,當一個線程讀到了cookSum=20,然後計算,寫回前另一個線程也讀到是20,兩個線程都加1後寫回,最終cookSum就變成了21,但是實際上應該是22,因為加了兩次。
private int cookSum = 0;
@GetMapping("/test")
public Response test1(HttpServletRequest request) throws Exception {
// 做菜。。。。。。
cookSum += 1;
log.info("做了{}道菜", cookSum);
Thread.sleep(500);
return Response.buildSuccess();
}
複製代碼
如果要避免這樣的情況發生,就涉及到加鎖的問題了,就不在這裡討論了。
若你想更進一步提升並發性能不妨看看下面的反應式編程!
反應式編程
1.1. 阻塞可能造成浪費
現代應用程序可以覆蓋大量並發用戶,儘管現代硬件的功能不斷提高,但現代軟件的性能仍然是一個關鍵問題。
總的來說,有兩種方法可以提高程序的性能:
- 並行化以使用更多線程和更多硬件資源。
- 在如何使用當前資源方面尋求更高的效率。
通常,Java 開發人員使用阻塞代碼編寫程序。這種做法很好,直到出現性能瓶頸。然後是時候引入額外的線程,運行類似的阻塞代碼。但是這種資源利用率的擴展會很快引入爭用和並發問題。
更糟糕的是,阻塞會浪費資源。如果您仔細觀察,一旦程序涉及一些延遲(特別是 I/O,例如數據庫請求或網絡調用),資源就會被浪費,因為線程(可能很多線程)現在處於空閑狀態,等待數據。所以並行化方法不是靈丹妙藥。有必要訪問硬件的全部功能,但推理也很複雜,並且容易浪費資源。
1.2. 使用異步(Asynchronicity)
前面提到的第二種方法,尋求更高的效率,可以解決資源浪費問題。通過編寫異步、非阻塞代碼,您可以讓執行切換到另一個使用相同底層資源的活動任務,並在異步處理完成後返回到當前進程。
但是如何在 JVM 上生成異步代碼呢?java 提供了兩種異步編程模型:
- Callbacks:異步方法沒有返回值,但是需要一個額外的回調參數(lambda或者匿名函數),當結果可用時調用這個參數。一個著名的例子是Swing’s的EventListener層次結構。
- Futures:異步方法立即返回一個Future。異步進程計算T值,通過Future對象包裝對T值的訪問。該值不是立即可用的,可以輪詢該對象,直到該值可用為止。例如,ExecutorService使用Future對象,運行**Callable**任務。
但是這兩種技術都有他們的局限性,回調難以組合在一起,很快就會導致代碼難以閱讀和維護(這種情況稱為"回調地獄(Callback Hell)")
Future對象要比callbacks好一些,但是Future在組合(composition)上任然比較困難。儘管Java 8通過CompletableFuture進行了改進。編排多個Future對象是可行的,但是這並不容易。並且Future還有其他問題:
- 調用get()方法很容易導致Future對象出現另一種阻塞情況。
- 不支持惰性計算。
- 缺乏對多值和高級錯誤處理的支持。
1.3. 從命令式編程到反應式編程
反應式-函數式編程解決的問題就是並發和並行。更通俗地說,它解決了回調地獄問題。回調地獄是以命令式的方式來處理反應式和異步用例帶來的問題。反應式編程,比如RxJava實現,受到了函數式編程的影響,並且會使用聲明式的方式來避免反應式-命令式代碼常見的問題。
響應式庫,如Reactor,Rxjava旨在解決JVM上”經典”異步方法的這些缺點,同時關注一些額外的方面:
- 可組合性(Composability )和可讀性(readability)
- 數據作為一個流(flow ),使用豐富的操作符(Operators)進行操作
- 在你訂閱(subscribe)之前什麼都不會發生,延遲發布。
- 背壓能力(Backpressure )或消費者向生產者發出排放速度過高信號的能力
- 與並發無關的高級但高價值的抽象(High level but high value abstraction that is concurrency-agnostic)
2. Reactor Project 如何運行
Reactor的核心是Flux /Mono類型,它代表了數據或事件的流。它的目的是實現推送(反應式),但是也可以用於拉取(交互式)。它是延遲執行的(lazy),不是立即執行的(eager)。它可以同步使用,也可以異步使用。它能夠代表隨着時間推移產生的0個、1個、多個或者無窮個值或事件。
2.1. Flux 原理
當執行subscribe方法時,發布者會回調訂閱者的onSubscribe方法,這個方法中,通常訂閱者會藉助傳入的Subscription向發布者請求n個數據。然後發布者通過不斷調用訂閱者的onNext方法向訂閱者發出最多n個數據。如果數據全部發完,則會調用onComplete告知訂閱者流已經發完;如果有錯誤發生,則通過onError發出錯誤數據,同樣也會終止流。
- 首先,使用類似Flux .just的方法創建發布者後,會創建一個具體的發布者(Publisher),如Flux Array。
- 當使用.subscribe訂閱這個發布者時,首先會new一個具有相應邏輯的****Subscription(如ArraySubscription,這個Subscription定義了如何處理下游的request,以及如何“發出數據”);每種同類型的Subscriber都會對應一種類型的Subscription。
- 然後發布者將這個Subscription通過訂閱者的.onSubscribe方法傳給訂閱者;
- 在訂閱者的.onSubscribe方法中,需要通過Subscription發起第一次的請求.request
- Subscription收到請求,就可以通過回調訂閱者的onNext方法發出元素了,有多少發多少,但不能超過請求的個數;
- 訂閱者在onNext中通常定義對元素的處理邏輯,處理完成之後,可以繼續發起請求;
- 發布者根據繼續滿足訂閱者的請求;
- 直至發布者的序列結束,通過訂閱者的onComplete予以告知;當然序列發送過程中如果有錯誤,則通過訂閱者的onError予以告知並傳遞錯誤信息;這兩種情況都會導致序列終止,訂閱過程結束。
2.2. 操作符原理
操作符 :只對數據做搬運和加工,對下游是作為發布者(Publisher),傳遞上游的數據到下游;對上游是作為訂閱者(Subscriber),傳遞下游的請求到上游。
3. 創建 Flux
3.1. 以簡單方法創建 Flux
3.1.1 empty()
訂閱後立即完成,不發布任何的值。
@Test
public void emptyTest() {
PrintUtil.println("Before");
Flux.empty()
.subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
PrintUtil.println("After");
}
2021-08-31 18:04:016 [Thread-Name-main], Before
2021-08-31 18:04:016 [Thread-Name-main], complete
2021-08-31 18:04:016 [Thread-Name-main], After
複製代碼
3.1.2 never()
不發布任何的通知,無論是值,還是完成或失敗。這個流適用於測試。
@Test
public void errorTest() {
PrintUtil.println("Before");
Flux.error(new RuntimeException("emitter an error"))
.subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
PrintUtil.println("After");
}
2021-08-31 18:09:023 [Thread-Name-main], Before
2021-08-31 18:09:024 [Thread-Name-main], After
複製代碼
3.1.3 error()
立即給每個訂閱者發送一個onError()通知。不發布任何的值,按照契約,也不會發送onCompleted()通知
@Test
public void neverTest() {
PrintUtil.println("Before");
Flux.never()
.subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
PrintUtil.println("After");
}
2021-08-31 18:08:024 [Thread-Name-main], Before
2021-08-31 18:08:024 [Thread-Name-main], java.lang.RuntimeException: emitter an error
2021-08-31 18:08:024 [Thread-Name-main], After
複製代碼
3.1.4 range
從 start 開始生成n個整型數字。例如,range(3, 3) 將會發布3、4和5,然後正常完成。每個訂閱者會接收到一組相同的數字。
@Test
public void rangeTest() {
PrintUtil.println("Before");
Flux.range(3, 3)
.subscribe(PrintUtil::println);
PrintUtil.println("After");
}
2021-08-31 17:58:051 [Thread-Name-main], Before
2021-08-31 17:58:052 [Thread-Name-main], 3
2021-08-31 17:58:052 [Thread-Name-main], 4
2021-08-31 17:58:052 [Thread-Name-main], 5
2021-08-31 17:58:052 [Thread-Name-main], After
複製代碼
print語句的順序也是值得關注的。Before 和After 消息是由main客戶端線程打印出來的,這一點倒不令人驚訝。但是,請注意訂閱也是發生在客戶端線程中的,subscribe() 實際上會阻塞客戶端線程,直到所有的事件都被接收。除非某些操作符需要,否則RxJava不會隱式地在線程池中運行代碼。
3.1.5 interval
interval() 會生成一個long類型數字的序列,從零開始,每個數字之間有固定的時間間隔。某種程度上,interval()類似於 ScheduledExecutorService 中的 scheduleAtFixedRate()。你可以想象一下 interval() 的多種使用場景,比如定期輪詢數據、刷新用戶界面或者建模時間的推移。
@Test
public void intervalTest() throws InterruptedException {
Flux.interval(Duration.ofSeconds(1))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.onErrorReturn("Uh oh")
.subscribe(log::info);
TimeUnit.SECONDS.sleep(5);
}
18:17:18.548 [parallel-1] INFO wangxw.operator.OperatorTest - tick 0
18:17:19.549 [parallel-1] INFO wangxw.operator.OperatorTest - tick 1
18:17:20.548 [parallel-1] INFO wangxw.operator.OperatorTest - tick 2
18:17:21.564 [parallel-1] INFO wangxw.operator.OperatorTest - Uh oh
複製代碼
3.2. 以編程方式創建 Flux
3.2.1 Synchronous :generate on-by-one
public void testGenerate() {
Flux<String> flux = Flux.generate(
() -> 0, // 初始state值
(state, sink) -> {
sink.next("3 x " + state + " = " + 3 * state); // 產生數據是同步的,每次產生一個數據
if (state == 10) {
sink.complete();
}
return state + 1; // 改變狀態
},
(state) -> System.out.println("state: " + state)); // 最後狀態值
// 訂閱時觸發requset->sink.next順序產生數據
// 生產一個數據消費一個
flux.subscribe(System.out::println);
}
複製代碼
3.2.2 Asynchronous and Multi-threaded: create
Flux .cretate() 操作符可以用於橋接監聽器模型等現有API轉為響應式流模型,支持推拉結合的模式。cretate操作符創建的BufferAsyncSink中維護了一個SpscQueue。
- 推模式:當監聽觸發時,調用sink.next(o),將元素放入SpscQueue,隨後立即取出drain(排水)給消費者。
- **拉模式:**發生在消費者定閱時,當生產者有數據可用時將通過拉模式,拉取數據。
3.2.2.1. 示意圖
- 首先,使用Flux .create的方法創建發布者後,會創建一個具體的發布者(Flux Create)。
- 當使用.subscribe訂閱這個發布者時,首先會new一個具有相應邏輯的****BufferAsyncSink。
- 然後發布者將這個BufferAsyncSink通過訂閱者的.onSubscribe方法傳給訂閱者;並回調Flux Create中的Consumer,執行 sink.onRequest(requestConsumer) 將 requestConsumer 賦值給Sink。
- 在訂閱者的.onSubscribe方法中,需要通過BufferAsyncSink發起request,執行requsetConsumer,此時是拉數據
- 通過異步調用sink.next(o),將數據推送到sqscQueue中,再 poll 出來是 推數據;
3.2.2.2. 代碼示例
@Test
public void testCreate() throws InterruptedException {
MyEventProcessor<String> myEventProcesser = new MyEventProcessor<>();
Flux.create(emitter -> {
myEventProcesser.register(new MyEventListener<String>() {
@Override
public void onDataChunk(MyEvent<String> event) {
emitter.next(event);
}
@Override
public void processComplete() {
emitter.complete();
}
});
emitter.onRequest(n -> { // n subscribe.requset時調用
List<String> messages = getHistory(n);
messages.forEach(PrintUtil::println);
});
}).subscribe(PrintUtil::println, PrintUtil::println); // 這時候還沒有任何事件產生;
for (int i = 0; i < 20; i++) { // 6
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000));
myEventProcesser.newEvent(new MyEvent<>(new Date(), "Event" + i));
}
myEventProcesser.processComplete();
}
複製代碼
3.2.3. Asynchronous but single-threaded: push
push is a middle ground between generate and create which is suitable for processing events from a single producer. It is similar to create in the sense that it can also be asynchronous and can manage backpressure using any of the overflow strategies supported by create. However, only one producing thread may invoke next, complete or error at a time.
根據官方的描述,push介於create和 generate 之間。異步生成序列,支持回壓。但是同一時刻只能有一個線程調用next、compete、error。
3.2.3.2. 示意圖
push 和 create 的唯一不同在於Flux Create.CreateMode不同。 Flux .push使用的是CreateMode.PUSH_ONLY,而Flux .create使用的是Flux Create.CreateMode.PUSH_PULL。
public void subscribe(CoreSubscriber<? super T> actual) {
BaseSink<T> sink = createSink(actual, backpressure);
actual.onSubscribe(sink);
try {
source.accept(
createMode == CreateMode.PUSH_PULL ? new SerializedFluxSink<>(sink) :
sink);
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
sink.error(Operators.onOperatorError(ex, actual.currentContext()));
}
}
複製代碼
BufferAsyncSink 內部維護了一個SpscLinkedArrayQueue(單生產者單消費者)隊列,只支持單線程的sink源,sink先將元素髮射到SpscLinkedArrayQueue中,然後觸發drain,其實就是從SpscLinkedArrayQueue中poll元素並向消費者傳遞,中間增加SpscLinkedArrayQueue作為中轉。
SerializedSink 內部除了維繫一個BufferAsyncSink 作委託外,還維護一個MpscLinkedQueue(多生產者單消費者)隊列,顯然支持多線程的源生產。並發生產元素的時候先將元素push到MpscLinkedQueue,再從MpscLinkedQueue彈出到SpscLinkedArrayQueue中,最終由一個線程傳遞給消費者。
public FluxSink<T> next(T t) {
Objects.requireNonNull(t, "t is null in sink.next(t)");
if (sink.isTerminated() || done) {
Operators.onNextDropped(t, sink.currentContext());
return this;
}
//通過WIP保證僅有一個線程將next委託給sink(BufferAsyncSink)來處理
if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) {
try {
sink.next(t);
}
catch (Throwable ex) {
Operators.onOperatorError(sink, ex, t, sink.currentContext());
}
if (WIP.decrementAndGet(this) == 0) { //
return this;
}
}
else {
//多線程並發生產元素時,並發的其他線程直接將元素髮射到mpscQueue
this.mpscQueue.offer(t);
if (WIP.getAndIncrement(this) != 0) {
return this;
}
}
//這裡其實又把mpsc隊列中的元素取出放到spsc中
//簡單點可以這麼理解,多源生產模式下,元素先發射到mpsc中,單消費者取出元素從放到spsc中,中間多加了一個過渡
drainLoop();
return this;
}
複製代碼
目前沒有GET到兩者的區別,這樣做的好處到底是什麼?
3.2.4. Cleaning up after push() or create()
兩個回調函數,onDispose和onCancel,在取消或終止時執行任何清理。onDispose可用於在Flux 完成、出現錯誤或被取消時執行清理。onCancel可用於在使用onDispose進行清理之前執行任何特定於取消的操作。
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
複製代碼
- onCancel 首先調用,僅用於取消信號。
- onDispose 為完成、錯誤或取消信號調用。
4. 從回調 API 到 Flux 流
關於流,我最喜歡的一個例子就是Twitter的狀態更新,也就是所謂的推文(tweet)。每秒都會有數千個用戶更新,很多更新會伴隨着地理位置、語言和其他元數據。為了完成這個練習,將會使用開源的Twitter4J庫,它使用基於回調的API將新推文的子集推送過來。實時讀取推文的最簡單的可運行樣例如下所示。
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import java.util.concurrent.TimeUnit;
/**
* @Author: wangxw
* @Date: 2021/08/31
* @Description:
*/
@Slf4j
public class Callback2FluxTest {
public void add() throws InterruptedException {
TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
log.info("Status: {} ", status);
}
@Override
public void onException(Exception e) {
log.error("Error callback ", e);
}
// 其他回調
});
twitterStream.sample();
TimeUnit.SECONDS.sleep(10);
twitterStream.shutdown();
}
複製代碼
調用twitterStream.sample()將會啟動一個後台線程,該線程會登錄Twitter並等待新的消息。每次有推文出現,onStatus回調就會執行。執行過程可能會跨線程,所以不能依賴異常拋出的機制,而是使用onException()通知。在休眠10秒之後,通過shutdown()關閉流並清理底層的資源,比如HTTP連接或線程。
整體而言,它看上去並沒有那麼糟糕,這個程序的問題在於什麼都不做。在現實生活中,你可能會以某種方式處理每條Status消息(推文),比如保存到數據庫中或者提供給一個機器學習算法。從技術上來講,你可以將這些邏輯放到回調中,但是這樣就將基礎設施調用和業務邏輯耦合在一起了。簡單地將功能委託給一個單獨的類會更好一些,但很遺憾的是無法重用。我們真正想要的是技術領域(消費HTTP連接中的數據)和業務領域(解釋輸入的數據)的清晰分離。所以,我們構建了第二層回調。
void consume(Consumer<Status> onStatus, Consumer<Exception> onException) throws InterruptedException {
TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
onStatus.accept(status);
}
@Override
public void onException(Exception e) {
onException.accept(e);
}
// 其他回調
});
twitterStream.sample();
TimeUnit.SECONDS.sleep(10);
twitterStream.shutdown();
}
複製代碼
通過添加這個額外的抽象層,現在能夠以各種方式重用consume()方法。假設不再是進行日誌記錄,而是要進行持久化、分析或欺詐檢測。
但是這隻將問題在層級結構中進行了提升。如果想要記錄每秒推文的數量該怎麼辦?或者只想消費前5條數據,又該怎樣實現?如果想要有多個監聽器,又會發生什麼情況?前述每種情況都會打開一個新的HTTP連接。最後不得不提的是,API不允許完成後再取消訂閱,以免帶來資源泄漏的風險。我們正在努力朝着基於Rx的API的方向努力。此時,不再傳遞迴調到可能要執行的地方,而是返回一個Flux 並允許每人按需對其進行訂閱。但是,需要記住的一點是,如下的實現還是會為每個Subscriber打開一個新的網絡連接。
public void flux() {
Flux<Status> flux = Flux.<Status>create(emmiter -> {
TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
emmiter.next(status);
}
@Override
public void onException(Exception e) {
emmiter.error(e);
}
// 其他回調
});
emmiter.onDispose(() -> twitterStream.shutdown()); // 關閉twitterStream
}).doOnSubscribe(s->log.info("doOnSubscribe"));
flux.subscribe(status -> log.info("Status: {} ", status),
ex -> log.error("Error callback ", ex));
}
複製代碼
以上代碼與consume(...)的巨大差別在於,不必將回調作為參數傳遞給observe()。相反,樣例可以返回Flux ,將它到處傳遞,然後在某個地方進行存儲,並且只要需要就可以隨時隨地使用。還可以將這個Flux 與其他的Flux 進行組合。還未討論的一個方面是資源清理,有人取消訂閱時,應當關閉TwitterStream,以避免資源泄漏。
public class LazyTwitterFlux {
private final Set<FluxSink<? super Status>> fluxSinks = new CopyOnWriteArraySet<>();
private final Flux<Status> flux = Flux.create(emmiter -> {
registrer(emmiter);
emmiter.onDispose(() -> unregistrer(emmiter));
});
private final TwitterStream twitterStream;
public LazyTwitterFlux() {
this.twitterStream = TwitterStreamFactory.getSingleton();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
fluxSinks.forEach(sink -> sink.next(status));
}
@Override
public void onException(Exception e) {
fluxSinks.forEach(fluxSink -> fluxSink.error(e));
}
// 其他回調
});
}
public Flux<Status> flux() {
return flux;
}
private synchronized void registrer(FluxSink<? super Status> fluxSink) {
fluxSinks.add(fluxSink);
if (fluxSinks.isEmpty()) {
twitterStream.sample();
}
}
private synchronized void unregistrer(FluxSink<? super Status> fluxSink) {
fluxSinks.remove(fluxSink);
if (fluxSinks.isEmpty()) {
twitterStream.shutdown();
}
}
}
@Test
public void lazy() {
LazyTwitterFlux lazyTwitterFlux = new LazyTwitterFlux();
Flux<Status> flux1 = lazyTwitterFlux.flux()
.doOnSubscribe(s -> log.info("doOnSubscribe"));
Flux<Status> flux2 = lazyTwitterFlux.flux()
.doOnSubscribe(s -> log.info("doOnSubscribe"));
flux1.subscribe(status -> log.info("Status: {} ", status),
ex -> log.error("Error callback ", ex));
flux2.subscribe(status -> log.info("Status: {} ", status),
ex -> log.error("Error callback ", ex));
}
複製代碼
fluxSinks 的集線程安全地存儲當前已訂閱的Subscriber集合。每次新的Subscriber出現,將其添加到一個集中,並連接到底層的事件源上。相反,最後的Subscriber消失時,就關閉上游的源。這裡的關鍵在於始終只有一個到上游系統的連接,而不是為每個訂閱者都建立連接。這個實現能夠正常運行而且比較健壯,但看起來過於低層級並且易於出錯。對subscribers的訪問必須使用synchronized同步,而且集合本身必須支持安全地迭代。對register()的調用必須發生在通過 reregister() 註銷回調之前,否則,後者可能會在註冊之前調用。將一個上游源多路復用到多個Flux 的通用場景,肯定有更好的方式來實現。幸而,至少有兩種這樣的機制。Rx致力於減少這樣危險的樣板代碼並抽象出並發性。6.4 節將使用 ConnectableFlux 的refCount() 實現單次訂閱。
5. Hot 和 Cold 類型的 Flux
在得到一個 Flux 實例之後,很重要的一點是要理解它是 hot 類型的還是 cold 類型的。它們的API和語義是相同的,但是使用Flux 的方法取決於它的類型。
5.1. Cold 類型
cold類型的Flux 完全是延遲(lazy)執行的,並且在有人對其感興趣時才會開始發布事件。如果沒有觀察者,那麼Flux 只是一個靜態的數據結構。這也意味着,每個訂閱者都會接收到屬於自己的流的副本,因為事件是延遲生成的,並且一般不會採取任何形式的緩存。cold類型的Flux 通常來源於Flux .create(),按照語義,它不會啟用任何邏輯,而是推遲到有人實際對其監聽才會執行。從某種程度上來說,cold 類型的 Flux 依賴Subscriber。cold類型的 Flux 的樣例除了create()之外,還包括Flux .just()、from()和range()。訂閱一個cold類型的Flux 通常還涉及create()中的副作用,比如查詢數據庫或打開連接。
5.2. Hot 類型
hot類型的Flux 則與之不同。在得到這種類型的Flux 的時候,不管是否有Subscriber,它都可能已經開始發布事件了。即便沒有人監聽,事件可能會丟失,Flux 依然會往下游推送事件。通常情況下,可以完全控制cold類型的Flux ,但是 hot類型的Flux 是獨立於消費者的。Subscriber出現時,hot類型的Flux 的行為類似於電話竊聽(wire tap),透明地發布流經它的事件。Subscriber的出現和消失並不會改變Flux 的行為,它是完全解耦和獨立的。
hot類型的Flux 通常發生在完全無法控制事件源的場景下。這種Flux 的樣例包括鼠標移動、鍵盤輸入或按鈕點擊。
依賴事件傳遞時,hot類型和cold類型Flux 的差異就變得非常重要了。不管立即訂閱還是幾個小時之後訂閱cold類型的Flux ,你都會獲得完整且一致的事件集。但如果Flux 是hot類型的,那麼你就無法確保能接收到所有事件。稍後將介紹一些技術,它們能夠確保每個訂閱者都能接收到所有事件,例如cache()操作符在技術上來講,可以緩衝來自hot類型Flux 的所有事件,讓後續的訂閱者都能接收到相同的事件序列。但是,在理論上,它消耗的內存量是沒有限制的,所以在緩存hot類型的Flux 時要非常小心。
6. ConnectableFlux
有時,我們可能不希望僅將某些處理延遲到某一個訂閱者的訂閱時間,而是希望其中幾個訂閱者會合,然後再觸發訂閱和數據生成(這有點類似於CountDownLantch)。
ConnectableFlux 就是為此而生,Flux API 中有兩種常用的返回ConnectableFlux 的方式:publish 和replay。
- publish會嘗試滿足各個不同訂閱者的需求(也就是回壓),並綜合這些請求反饋給源。假設有某個訂閱者的需求為 0,發布者會暫停向所有訂閱者發出元素。
- replay將對第一個訂閱後產生的數據進行緩存,最多緩存數量取決於配置(時間/緩存大小)。 它會對後續接入的訂閱者重新發送數據。
ConnectableFlux 提供了多種對訂閱的管理方式。包括:
- connect() 當有足夠的訂閱接入後,可以對 Flux 手動執行一次。它會觸發對上游源的訂閱。
- autoConnect(n) 與 connect() 類似,不過是在有 n 個訂閱的時候自動觸發。
- refCount(n) 不僅能夠在訂閱者接入的時候自動觸發,還會檢測訂閱者的取消動作。如果訂閱者全部取消訂閱,則會將源“斷開連接”,再有新的訂閱者接入的時候才會繼續“連上”發布者。
- refCount(int, Duration) 增加了一個倒計時:一旦訂閱者數量太低了,它會等待 Duration參數指定的時間,如果沒有新的訂閱者接入才會與源斷開連接。
6.1. connect 例子
@Test
public void connectTest() throws InterruptedException {
Flux<String> source = Flux.range(1, 3)
.map(Object::toString)
.doOnSubscribe(s -> log.info("subscribed to source"));
ConnectableFlux<String> co = source.publish();
co.subscribe(log::info);
co.subscribe(log::info);
log.info("done subscribing");
TimeUnit.SECONDS.sleep(1);
log.info("will now connect");
co.connect();
}
16:06:03.494 [main] INFO wangxw.flux.FluxTest - done subscribing
16:06:04.496 [main] INFO wangxw.flux.FluxTest - will now connect
16:06:04.498 [main] INFO wangxw.flux.FluxTest - subscribed to source
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 1
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 1
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 2
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 2
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 3
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 3
複製代碼
只有當connect() 之後上游才會發出數據。
6.2. autoConnect(n) 例子
@Test
public void autoConnect() throws InterruptedException {
Flux<String> source = Flux.range(1, 3)
.map(Object::toString)
.doOnSubscribe(s -> log.info("subscribed to source"));
Flux<String> co = source.publish().autoConnect(2);
log.info("subscribed first");
co.subscribe(log::info);
TimeUnit.SECONDS.sleep(1);
log.info("subscribing second");
co.subscribe(log::info);
}
17:18:09.468 [main] INFO wangxw.flux.FluxTest - subscribed first
17:18:10.475 [main] INFO wangxw.flux.FluxTest - subscribing second
17:18:10.486 [main] INFO wangxw.flux.FluxTest - subscribed to source
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 1
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 1
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 2
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 2
17:18:10.493 [main] INFO wangxw.flux.FluxTest - 3
17:18:10.493 [main] INFO wangxw.flux.FluxTest - 3
複製代碼
當兩個訂閱者都完成訂閱之後,上游才收到訂閱請求,並開始發出數據。
6.3. refCount() 例子
@Test
public void refCountTest() throws InterruptedException {
Flux<String> source = Flux.interval(Duration.ofMillis(500))
.map(Object::toString)
.doOnSubscribe(s -> log.info("doOnSubscribe"))
.doOnCancel(() -> log.info("doOnCancel"));
Flux<String> flux = source.publish().refCount(2, Duration.ofSeconds(2));
log.info("subscribed first");
Disposable s1 = flux.subscribe(x -> log.info("s1:" + x));
TimeUnit.SECONDS.sleep(1);
log.info("subscribed second");
Disposable s2 = flux.subscribe(x -> log.info("s2:" + x));
TimeUnit.SECONDS.sleep(1);
log.info("subscribed first disposable");
s1.dispose();
TimeUnit.SECONDS.sleep(1);
log.info("subscribed second disposable"); // 所有的訂閱者都取消了
s2.dispose();
TimeUnit.SECONDS.sleep(1); // 在2s內 s3進行了訂閱
log.info("subscribed third");
Disposable s3 = flux.subscribe(x -> log.info("s3:" + x));
TimeUnit.SECONDS.sleep(1);
log.info("subscribed third disposable");
s3.dispose(); // 所有訂閱者都取消了 disconnect
TimeUnit.SECONDS.sleep(3);
log.info("subscribed fourth"); // 3s 後(超過了2s)s4、s5訂閱,觸發connect
Disposable sub4 = flux.subscribe(l -> log.info("s4: " + l));
TimeUnit.SECONDS.sleep(1);
log.info("subscribed fifth");
Disposable sub5 = flux.subscribe(l -> log.info("s5: " + l));
TimeUnit.SECONDS.sleep(2);
}
17:29:23.044 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed first
17:29:24.052 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed second
17:29:24.067 [main] INFO wangxw.flux.ConnectableFluxTest - doOnSubscribe
17:29:24.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s1:0
17:29:24.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:0
17:29:25.076 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s1:1
17:29:25.076 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:1
17:29:25.076 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed first disposable
17:29:25.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:2
17:29:26.094 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:3
17:29:26.094 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed second disposable
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed third
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - s3:4
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - s3:5
17:29:27.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s3:6
17:29:28.075 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s3:7
17:29:28.102 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed third disposable
17:29:30.103 [parallel-3] INFO wangxw.flux.ConnectableFluxTest - doOnCancel // 注意時間 2s 後執行了 doOnCancel
17:29:31.103 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed fourth
17:29:32.104 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed fifth
17:29:32.104 [main] INFO wangxw.flux.ConnectableFluxTest - doOnSubscribe
17:29:32.606 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 0
17:29:32.606 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 0
17:29:33.107 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 1
17:29:33.107 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 1
17:29:33.605 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 2
17:29:33.605 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 2
17:29:34.105 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 3
17:29:34.105 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 3
複製代碼
本例中,refCount() 設置為最少兩個訂閱者接入是才開始發出數據,當所有訂閱者都取消時,如果不能在兩秒內接入新的訂閱者,則上游會斷開連接。
上邊的例子中,隨着前兩個訂閱者相繼取消訂閱,第三個訂閱者及時(在2秒內)開始訂閱,所以上游會繼續發出數據,而且根據輸出可以看出是“hot flux”。
當第三個訂閱者取消後,第四個訂閱者沒能及時開始訂閱,所以上游發布者斷開連接。當第五個訂閱者訂閱之後,第四和第五個訂閱者相當於開始了新一輪的訂閱。
6.4. 使用refCount()實現單次訂閱
ConnectableFlux 以一種有意思的方式來協調多個Subscriber,並共享一個底層的訂閱。還記得最初藉助LazyTwitterFlux 創建單個延遲執行的對底層資源的連接嗎?必須要手動跟蹤所有的Subscriber,如果第一個訂閱者出現或最後一個訂閱者離開,就建立連接或斷開連接。ConnectableFlux 是Flux 的一個特殊類型,能夠確保底層始終最多只有一個Subscriber,但實際上它又允許多個Subscriber共享相同的底層資源。
Subject是創建Flux的必要方式,而ConnectableFlux會保護原始的上游Flux,並確保最多只能有一個Subscriber可以接觸到它。不管有多少Subscriber連接到ConnectableFlux,系統只會打開一個Flux的訂閱,這個訂閱是基於該Flux創建的。
@Test
public void refCounted() {
Flux<Status> flux = Flux.<Status>create(emmiter -> {
log.info("Establishing connection");
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
emmiter.next(status);
}
@Override
public void onException(Exception e) {
emmiter.error(e);
}
// 其他回調
});
emmiter.onDispose(() -> {
log.info("Disconnecting");
twitterStream.shutdown();
});
}).doOnSubscribe(s -> log.info("doOnSubscribe"))
.doOnComplete(() -> log.info("doOnComplete"));
Flux<Status> refCounted = flux.publish().refCount();
Disposable s1 = refCounted.subscribe(status -> log.info("Status: {} ", status),
ex -> log.error("Error callback ", ex));
Disposable s2 = refCounted.subscribe(status -> log.info("Status: {} ", status),
ex -> log.error("Error callback ", ex));
s1.dispose();
s2.dispose();
}
17:51:21.610 [main] INFO wangxw.flux.Callback2FluxTest - doOnSubscribe
17:51:21.614 [main] INFO wangxw.flux.Callback2FluxTest - Establishing connection
17:51:21.616 [main] INFO wangxw.flux.Callback2FluxTest - Disconnecting
複製代碼
直到真正有第一個Subscriber的時候,連接才會建立。但是,更重要的在於,第二個Subscriber不會初始化新的連接,它甚至不會接觸到原始的Flux。publish(). refCount()會將底層的Flux 串聯包裝起來,並攔截所有的訂閱。
操作符
defer
延遲創建
@Test
public void deferTest() throws InterruptedException {
Flux<String> flux1 = Flux.just(PrintUtil.println(new Date()));
Flux<String> flux2 = Flux.defer(() -> Flux.just(PrintUtil.println(new Date())));
flux1.subscribe(x -> log.info("s1: " + x));
flux2.subscribe(x -> log.info("s2: " + x));
TimeUnit.SECONDS.sleep(3);
flux1.subscribe(x -> log.info("s3: " + x));
flux2.subscribe(x -> log.info("s4: " + x));
}
15:25:24.629 [main] INFO wangxw.operator.OperatorTest - s1: 2021-09-06 15:25:024
15:25:24.630 [main] INFO wangxw.operator.OperatorTest - s2: 2021-09-06 15:25:024
15:25:27.633 [main] INFO wangxw.operator.OperatorTest - s3: 2021-09-06 15:25:024
15:25:27.634 [main] INFO wangxw.operator.OperatorTest - s4: 2021-09-06 15:25:027
複製代碼
delayElements
@Test
public void delayTest() throws InterruptedException {
Flux.just("1", "2").delayElements(Duration.ofSeconds(2))
.subscribe(log::info);
TimeUnit.SECONDS.sleep(5);
}
16:02:40.482 [parallel-1] INFO wangxw.operator.OperatorTest - 1
16:02:42.483 [parallel-2] INFO wangxw.operator.OperatorTest - 2
複製代碼
延遲發布,在運行這個程序的時候,即便進行了訂閱,應用程序也會立即終止而不展現任何的結果,這是因為事件的發布是在後台異步運行的,所以需要在最後添加任意一個sleep()。
map
@Test
public void mapTest() throws InterruptedException {
Flux.just(1, 2, 3, 4)
.map(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i * 2 + "";
})
.log()
.subscribe(log::info);
}
16:07:24.005 [main] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
16:07:24.007 [main] INFO reactor.Flux.MapFuseable.1 - | request(unbounded)
16:07:25.007 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(2)
16:07:25.008 [main] INFO wangxw.operator.OperatorTest - 2
16:07:26.009 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(4)
16:07:26.009 [main] INFO wangxw.operator.OperatorTest - 4
16:07:27.009 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(6)
16:07:27.009 [main] INFO wangxw.operator.OperatorTest - 6
16:07:28.009 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(8)
16:07:28.009 [main] INFO wangxw.operator.OperatorTest - 8
16:07:28.012 [main] INFO reactor.Flux.MapFuseable.1 - | onComplete()
複製代碼
map 是同步執行的,會阻塞客戶端線程。
flatMap
flatMap()是Rx中最重要的操作符之一。乍看上去,它類似於map(),但是它對每個元素的轉換都會返回另外一個(內嵌的)Flux 。鑒於Flux 可以代表另外一個異步操作,我們很快就意識到flatMap()可以為上游的每個事件執行異步計算(fork執行)並將結果加入進來。
flatMap 本身並不是異步的,但是內嵌的Flux可以執行異步操作。
@Test
public void flatMapTest() throws InterruptedException {
Function<Integer, Publisher<String>> mapper = i -> Flux.just(i * 2 + "").delayElements(Duration.ofSeconds(1));
Flux.just(1, 2, 3, 4)
.flatMap(mapper)
.subscribe(log::info);
TimeUnit.SECONDS.sleep(10);
}
17:23:42.566 [parallel-2] INFO wangxw.operator.OperatorTest - 4
17:23:42.566 [parallel-1] INFO wangxw.operator.OperatorTest - 2
17:23:42.568 [parallel-3] INFO wangxw.operator.OperatorTest - 6
17:23:42.572 [parallel-4] INFO wangxw.operator.OperatorTest - 8
複製代碼
從本質上來講,flatMap()接收一個隨時間(事件)出現的值的主(master)序列(Flux ),然後將每個事件分別替換為獨立的子序列。這些子序列彼此之間是不相關的,並且與生成它們的主序列中的事件也是不相關的。更確切地說,此時擁有的不再是單個主序列,而是一組Flux ,其中每個都是獨立運行的,並且隨着時間的推移出現和消失。因此,flatMap()並不能對子事件抵達下游操作符/訂閱者的順序給出任何的保證。
@Test
public void flatMap3Test() throws InterruptedException {
Flux.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
.flatMap(this::loadRecordFor)
.subscribe(log::info);
TimeUnit.SECONDS.sleep(5);
}
private Flux<String> loadRecordFor(DayOfWeek dow) {
switch (dow) {
case SUNDAY:
return Flux.interval(Duration.ofMillis(90))
.take(5)
.map(i -> "Sun" + i);
case MONDAY:
return Flux.interval(Duration.ofMillis(65))
.take(5)
.map(i -> "Mon" + i);
default:
return Flux.empty();
}
}
17:31:38.893 [parallel-2] INFO wangxw.operator.OperatorTest - Mon0
17:31:38.918 [parallel-1] INFO wangxw.operator.OperatorTest - Sun0
17:31:38.957 [parallel-2] INFO wangxw.operator.OperatorTest - Mon1
17:31:39.007 [parallel-1] INFO wangxw.operator.OperatorTest - Sun1
17:31:39.023 [parallel-2] INFO wangxw.operator.OperatorTest - Mon2
17:31:39.087 [parallel-2] INFO wangxw.operator.OperatorTest - Mon3
17:31:39.096 [parallel-1] INFO wangxw.operator.OperatorTest - Sun2
17:31:39.152 [parallel-2] INFO wangxw.operator.OperatorTest - Mon4
17:31:39.188 [parallel-1] INFO wangxw.operator.OperatorTest - Sun3
17:31:39.277 [parallel-1] INFO wangxw.operator.OperatorTest - Sun4
複製代碼
控制flatMap 的並發性
假設你有大量用戶的一個列表,它們被包裝在Flux 中。每個User有一個loadProfile()方法,該方法會通過HTTP請求返回一個Flux 實例。我們的目標是儘快獲取所有用戶概況(profile), flatMap()就是為了實現該目標而設計的,可以對上游的值進行並發計算,如下所示:
@Test
public void flatMap4Test() {
List<User> users = new ArrayList<>();
Flux.fromIterable(users)
.flatMap(User::loadProfile);
}
static class User {
public Flux<Profile> loadProfile() {
// 發送HTTP請求 異步執行
return Flux.empty();
}
}
static class Profile {
}
複製代碼
乍看上去這種方式非常不錯。Flux是從一個使用from()操作符的固定List生成的。因此,訂閱它的時候,會將所有的用戶立即釋放出來。對於每個新User,flatMap()都會調用loadProfile()並返回Flux 。然後,flatMap()透明地訂閱每個新的Flux,將所有的Profile事件轉發至下游。訂閱內部Flux 就相當於發起新的HTTP連接。因此,假設我們有10000個用戶,那就會突然發起10000個並發的HTTP請求。如果所有的這些請求都訪問相同的服務器,預計得到的情況無外乎如下幾種:
- 拒絕連接。
- 長時間等待和超時。
- 服務器停機。
- 遇到限速或者被加入黑名單。
- 整體的延遲增加。
- 客戶端的問題,包括太多打開(open)狀態的Socket、線程,以及過多的內存消耗。
增加並發會在一定的程度上得到回報,但如果你嘗試運行太多並發操作,最終將會導致大量的上下文切換、過高的內存和CPU佔用,以及整體性能的下降。
Flux.fromIterable(users)
.flatMap(User::loadProfile, 10);
複製代碼
flatMap()有一個非常簡單的重載形式,能夠限制內部流的並發訂閱總數。參數concurrency 限制了內部Flux 的訂閱數量。在實踐中,flatMap()接收前10個User時,它會為每個User調用loadProfile(),但是來自上游的第11個User出現時,flatMap()不會再調用loadProfile()。相反,它會等正在運行的內部流完成。因此,concurrency 參數限制了flatMap()生成的後台任務的數量。
concatMap(f)在語義上是與flatMap(f, 1)(也就是concurrency 值為1的flatMap())等價的。
利用flatMap計算笛卡爾積
根據兩個流中的所有值生成笛卡兒積。例如,可能有兩個Flux ,一個代表棋盤的行(rank,從1到8),另一個代表棋盤的列(file,從a到h)。應該能夠找到棋盤上所有64個可能的方格。Flux 將會精確地發布64個事件:針對a它會生成a1、a2...a8,然後是b1、b2等,直到最後達到h7和h8。這是flatMap()另一個非常有意思的樣例,每列(file)都會生成該列對應的所有可能的方格。
@Test
public void cartesianTest() {
Flux<Integer> oneToEight = Flux.range(1, 8);
Flux<String> ranks = oneToEight.map(Objects::toString);
Flux<String> files = oneToEight.map(x -> 'a' + x - 1)
.map(ascii -> (char) ascii.intValue())
.map(ch -> Character.toString(ch));
Flux<String> squares = files
.flatMap(file -> ranks.map(rank -> file + rank));
squares.subscribe(log::info);
}
複製代碼
concatMap
concatMap() 可以保持下游事件的順序,使其與上游事件的順序完全契合。
@Test
public void flatMap3Test() throws InterruptedException {
Flux.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
.concatMap(this::loadRecordFor)
.subscribe(log::info);
TimeUnit.SECONDS.sleep(5);
}
private Flux<String> loadRecordFor(DayOfWeek dow) {
switch (dow) {
case SUNDAY:
return Flux.interval(Duration.ofMillis(90))
.take(5)
.map(i -> "Sun" + i);
case MONDAY:
return Flux.interval(Duration.ofMillis(65))
.take(5)
.map(i -> "Mon" + i);
default:
return Flux.empty();
}
}
17:27:15.161 [parallel-1] INFO wangxw.operator.OperatorTest - Sun0
17:27:15.250 [parallel-1] INFO wangxw.operator.OperatorTest - Sun1
17:27:15.340 [parallel-1] INFO wangxw.operator.OperatorTest - Sun2
17:27:15.432 [parallel-1] INFO wangxw.operator.OperatorTest - Sun3
17:27:15.520 [parallel-1] INFO wangxw.operator.OperatorTest - Sun4
17:27:15.587 [parallel-2] INFO wangxw.operator.OperatorTest - Mon0
17:27:15.652 [parallel-2] INFO wangxw.operator.OperatorTest - Mon1
17:27:15.716 [parallel-2] INFO wangxw.operator.OperatorTest - Mon2
17:27:15.783 [parallel-2] INFO wangxw.operator.OperatorTest - Mon3
17:27:15.848 [parallel-2] INFO wangxw.operator.OperatorTest - Mon4
複製代碼
第一個事件(Sunday)從上游出現的時候,concatMap()會訂閱loadRecordsFor()產生的Flux ,並將產生的所有事件傳遞到下游。這個內部流完成時,concatMap()會等待下一個上游事件(Monday)並重複以上過程。concatMap()不會涉及任何的並發性,但是它保證了上游事件的順序,避免出現重疊。
flatMap()內部使用了merge()操作符,同時訂閱所有的子Flux ,對它們不做任何的區分。這也是下游事件互相交叉的原因。但是,concatMap()可以在技術上使用concat()操作符。concat()只會先訂閱第一個底層的Flux ,只有第一個完成之後,才會訂閱第二個。
merge
按照順序合併流
@Test
public void mergeTest() throws InterruptedException {
Flux<String> flux1 = Flux.interval(Duration.ofMillis(300)).map(x -> "p1: " + x);
Flux<String> flux2 = Flux.interval(Duration.ofMillis(500)).map(x -> "p2: " + x);
Flux<String> mergeFlux = Flux.merge(flux1, flux2);
mergeFlux.subscribe(log::info);
TimeUnit.SECONDS.sleep(2);
}
19:03:12.339 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 0
19:03:12.541 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 0
19:03:12.641 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 1
19:03:12.939 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 2
19:03:13.040 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 1
19:03:13.240 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 3
19:03:13.540 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 4
19:03:13.542 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 2
19:03:13.841 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 5
19:03:14.040 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 3
複製代碼
需要注意,任何底層Flux 出現的錯誤都會立即傳遞到Observer。可以使用merge()的mergeDelayError()變種形式來推遲錯誤,這樣直到其他的流都完成,錯誤通知才會發布。mergeDelayError()甚至能夠保證收集所有的異常,而不僅僅是第一個,並將它們封裝到rx.exceptions.CompositeException。
zip
壓縮(zipping)指的是將兩個(或更多)流組合起來的操作,在這個過程中,某個流中的每個事件必須要與其他流對應的事件進行成對組合。下游事件是通過組合每個流中的第一個事件,然後再組合第二個事件,以此類推生成的。
concat
按順序訂閱,等待第一個流完成,再依次訂閱下一個。
@Test
public void concatTest () throws InterruptedException {
Flux<String> flux1 = Flux.just("1","2","3").delayElements(Duration.ofSeconds(1));
Flux<String> flux2 = Flux.just("4","5","6");
Flux<String> flux = Flux.concat(flux1, flux2);
flux.subscribe(log::info);
TimeUnit.SECONDS.sleep(3);
}
16:43:21.315 [parallel-1] INFO wangxw.operator.OperatorTest - 1
16:43:22.317 [parallel-2] INFO wangxw.operator.OperatorTest - 2
16:43:23.318 [parallel-3] INFO wangxw.operator.OperatorTest - 3
16:43:23.318 [parallel-3] INFO wangxw.operator.OperatorTest - 4
16:43:23.318 [parallel-3] INFO wangxw.operator.OperatorTest - 5
16:43:23.319 [parallel-3] INFO wangxw.operator.OperatorTest - 6
複製代碼
錯誤處理操作符
在 try-catch 塊中處理異常最常用的幾種方法:
- 捕獲並返回一個靜態默認值。
- 捕獲並動態計算回退值。
- 捕獲並執行一個回退方法。
- 捕獲,包裝為一個BusinessException,然後重新拋出。
- 捕獲,記錄一個特定的錯誤日誌,然後重新拋出。
- 使用 finally 塊清理資源或 Java 7 “try-with-resource” 構造。
靜態回退值(Static Fallback Value)
在 Reactor 中與“捕獲並返回靜態默認值”的等價的是onErrorReturn. 以下示例顯示了如何使用它:
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
public String doSomethingDangerous(int i) {
if (i == 10) {
throw new BusinessException();
}
return i + "";
}
複製代碼
在 Reactor 中等價的是:
Flux<String> flux = Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
flux.subscribe(log::info, e -> log.error("error", e));
00:07:57.556 [main] INFO wangxw.operator.ErrorHandleOperatorTest - RECOVERED
複製代碼
還可以執行選擇執對異常執行一個Predicate來決定是否恢復,如下面的示例:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
複製代碼
僅當異常消息為“boom10”時,執行回退。
回退方法(Fallback Method)
在Reactor中與“捕獲並執行一個回退方法”等價的是 onErrorResume ,例如從外部但不可靠的服務中獲取數據,當外部服務異常時則從緩存中獲取數據作為回退值,示例如下:
String v1;
try {
v1 = callExternalService("key1");
}
catch (Throwable error) {
v1 = getFromCache("key1");
}
String v2;
try {
v2 = callExternalService("key2");
}
catch (Throwable error) {
v2 = getFromCache("key2");
}
複製代碼
在 Reactor 中等價的是:
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k) // 每個鍵都調用外部服務
.onErrorResume(e -> getFromCache(k)) // 外部服務執行失敗執行回退
);
複製代碼
和 onErrorReturn 一樣,onErrorResume 也有一些變體,可以讓你根據異常的類型或Predicate來篩選返回的異常。它接受一個Function的事實並且允許根據遇到的錯誤選擇切換到不同的回退方法。下面的例子展示了如何做到這一點:
Flux.just("timeout1", "unknown", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(error -> { // 發生錯誤是,動態選擇如何繼續
if (error instanceof TimeoutException)
return getFromCache(k); // 如果超時,則從訪問本地緩存
else if (error instanceof UnknownKeyException)
return registerNewEntry(k, "DEFAULT"); // 如果 key 不存在則創建一個新的條目
else
return Flux.error(error); // 在其他情況下重新拋
})
);
複製代碼
動態回退值(Dynamic Fallback Value)
命令式示例如下:
try {
Value v = erroringMethod();
return MyWrapper.fromValue(v);
}
catch (Throwable error) {
return MyWrapper.fromError(error);
}
複製代碼
Reactor 代碼如下:
erroringFlux.onErrorResume(error -> Mono.just(
MyWrapper.fromError(error)
));
複製代碼
捕獲並重新拋出(Catch and Rethrow)
命令式示例如下:
try {
return callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
複製代碼
在“fallback method”示例中,flatMap中的最後一行提示我們如何以反應方式實現相同的目標,如下所示:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original))
);
複製代碼
但是,有一種更直接的方法,即可以通過onErrorMap實現相同的效果:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
複製代碼
記錄日誌(Log or React on the Side)
對於希望錯誤繼續傳播但在不修改序列的情況下對其作出反應(例如,記錄錯誤)的情況,可以使用doError操作符。這相當於“捕獲,記錄一個特定的錯誤日誌,然後重新拋出”模式,如下例所示:
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
複製代碼
doOnError運算符以及所有以doOn為前綴的操作符具有一定的“副作用(side-effect)”。它們允許在不修改序列的情況下查看序列的事件。
與前面顯示的命令式示例一樣,以下示例仍然傳播錯誤,但確保我們至少記錄了外部服務發生故障:
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k); // 記錄日誌
})
);
複製代碼
使用 Resources 和 Finally 塊 (Using Resources and the Finally Block)
- 使用 finally 塊清理資源
Stats stats = new Stats();
stats.startTimer();
try {
doSomethingDangerous();
}
finally {
stats.stopTimerAndRecordTiming();
}
複製代碼
- 使用 try-with-resource 語法
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
return disposableInstance.toString();
}
複製代碼
兩者都有各自的 Reactor 等價語法:doFinally 和 using。
doFinally 是當序列終止(使用 onComplete 或 onError )或取消時希望執行的副作用。它給你一個提示,什麼樣的終止觸發了副作用。以下示例顯示了如何使用 doFinally:
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();
Flux<String> flux =
Flux.just("foo", "bar")
.doOnSubscribe(s -> stats.startTimer())
.doFinally(type -> {
stats.stopTimerAndRecordTiming();
if (type == SignalType.CANCEL)
statsCancel.increment();
})
.take(1);
複製代碼
重試 (Retrying)
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.retry(1)
.elapsed()
.subscribe(System.out::println, System.err::println);
[259,tick 0]
[251,tick 1]
[250,tick 2]
[504,tick 0]
[250,tick 1]
[250,tick 2]
java.lang.RuntimeException: boom
複製代碼
從前面的例子中可以看出,retry(1)只是重新訂閱了interval 一次,從 0 重新開始計時。第二次,由於異常仍然發生,它放棄重試並向下游傳播錯誤。
還有一個更高級的版本的重試 retryWhen,它使用一個“伴生(companion) ”的Flux來判斷某個特定的失敗是否應該重試。這個伴生的Flux是由操作符創建的,但由用戶進行裝飾,以便自定義重試條件。
伴生的 Flux 是一個 Flux,它傳遞了重試的策略/函數,是retryWhen唯一的參數。用戶定義該函數並使其返回一個新的Publisher<?>。Retry類是一個抽象類,我們可以使用**Retry.from(Function)**來生成伴生的Flux。
retryWhen重試周期
- 每次發生錯誤(有可能會重試)時,一個 RetrySignal 就會被被發布到伴生的Flux中,該Flux已經由您的函數修飾。
- 如果伴生的Flux 發出一個值,就會發生重試。
- 如果伴生的Flux 完成(completes)那麼錯誤會被吞掉,重試周期停止,結果序列也將完成。
- 如果伴生的Flux 產生錯誤(error),重試周期停止,結果序列也將產生錯誤(error)。
使用 retryWhen 模擬 retry(3) 的方法,伴生的Flux將吞掉錯誤,這兩種情況之間的區別很重要。
Flux.<String>error( new RuntimeException("boom"))
.doOnError(e -> System.err.println("on error"))
.retryWhen(Retry.from(companion ->
companion.take(3)))
.subscribe(System.out::println, System.err::println);
on error
on error
on error
on error
複製代碼
實際上,上面的這個示例將會產生一個空的Flux,但是它成功的完成了。而retry(3)將會以一個錯誤終止,所以他們的結果並不完全相同。
Flux.<String>error( new RuntimeException("boom"))
.doOnError(e -> System.err.println("on error"))
.retry(3)
.subscribe(System.out::println, System.err::println);
on error
on error
on error
on error
java.lang.RuntimeException: boom
複製代碼
指數退避重試
將反應式編程應用於已有的程序
從阻塞式到反應式
public class PersonDao {
/**
* 阻塞式的
* @return
*/
public List<Person> listPeople() {
return query("select * from people");
}
/**
* 反應式的
* @return
*/
public Flux<Person> rxListPeople() {
return Flux.fromIterable(query("select * from people"));
}
private List<Person> query(String sql) {
List<Person> people = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Person person = new Person();
person.setId(i);
people.add(person);
}
return people;
}
}
複製代碼
我們對已有的阻塞式 API 變更到了 反應式 API 。根據系統規模的不同,和原有系統的兼容性可能是一個大問題。接下來我們通過 buffer 和 **blockLast ** 將反應式代碼和阻塞式代碼組合起來。
@Test
public void listPeopleTest() {
// 沒有任何副作用
Flux<Person> peopleFlux = personDao.rxListPeople();
Flux<List<Person>> listFlux = peopleFlux.buffer()
.log();
List<Person> people = listFlux.blockLast(Duration.ofSeconds(3));
assert people != null;
people.forEach(person -> log.info(person.toString()));
}
11:21:50.598 [main] INFO reactor.Flux.Buffer.1 - onSubscribe(FluxBuffer.BufferExactSubscriber)
11:21:50.601 [main] INFO reactor.Flux.Buffer.1 - request(unbounded)
11:21:50.601 [main] INFO reactor.Flux.Buffer.1 - onNext([Person(id=0), Person(id=1), Person(id=2)])
11:21:50.603 [main] INFO reactor.Flux.Buffer.1 - cancel()
11:21:50.608 [main] INFO reactor.Flux.Buffer.1 - onComplete()
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=0)
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=1)
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=2)
複製代碼
blockLast 阻塞等待onComplete 回調完成,你可能認為以上代碼只是封裝和拆封 Flux,而沒有特別明確的目的。但是,這只是第一步。下一個轉換將會引入一些延遲執行的功能。僅僅存在 Flux 並不意味着會有後台任務或副作用,這與Future不同,Future幾乎總是意味着有某些並發操作在執行(只有任務提交了才能返回一個Future)。
擁抱延遲執行
public Flux<Person> rxListPeople() {
return Flux.defer(() ->
Flux.fromIterable(query("select * from people")));
}
複製代碼
在訂閱之前什麼都不會發生。
從命令式並發到聲明式並發
在企業級應用程序中,顯式的並發並不常見。大多數情況下,每個請求都會由單個線程處理。同一個線程要完成如下工作。
- 接收 TCP/IP 連接
- 解析 HTTP 請求
- 調用 Controller 或 Servlet
- 阻塞對數據庫的調用
- 處理結果
- 編碼響應(如 JSON 格式)
- 將響應發送至客戶端
如果後端要發起多個獨立的請求,比如訪問數據庫,那麼這種分層的模型會影響用戶的延遲,因為它們是序列化執行的(當然可以很容易的並發執行)。除此之外,擴展性也會受到影響。例如,在Tomcat的執行器(executor)中,默認有200個負責處理請求的線程,這意味着處理的並發連接不能超過200個。如果流量突然暴增,傳入的連接將會排隊,服務器就會出現更高的延遲。但是,這種情況不會持續下去,Tomcat最終會開始拒絕傳入的流量。
**傳統的架構,在一個線程中執行請求處理的各個步驟也有一些益處,比如能夠提升緩存的本地化以及減少同步的損耗。**令人遺憾的是,**在典型的應用程序中,因為整體的延遲是每層延遲的總和,所以一個有故障的組件可能會對整體的延遲產生負面影響。**此外,有時許多步驟是相互獨立的,可以並發執行。例如,調用多個外部API或執行多個獨立的SQL查詢。例如,以下是一個沒有任何並發功能的程序。
@Slf4j
public class TicketService {
/**
* 查詢航班
*
* @param flightNo
* @return
*/
public Flight lookupFlight(String flightNo) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Flight(flightNo);
}
/**
* 查詢乘客
*
* @param id
* @return
*/
public Passenger findPassenger(Long id) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Passenger(id);
}
/**
* 根據航班和乘客訂票
*
* @param flight
* @param passenger
* @return
*/
public Ticket bookTicket(Flight flight, Passenger passenger) {
return new Ticket(flight, passenger);
}
/**
* 發送郵件
*
* @param ticket
* @return
*/
public boolean sendEmail(Ticket ticket) throws IOException {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("send email {}", ticket);
return true;
}
}
複製代碼
客戶端代碼如下所示。
@Test
public void blockBookTicket() throws IOException {
Flight flight = ticketService.lookupFlight("LOT 783");
Passenger passenger = ticketService.findPassenger(1L);
Ticket ticket = ticketService.bookTicket(flight, passenger);
ticketService.sendEmail(ticket);
}
複製代碼
這是非常典型的阻塞式代碼,與眾多應用程序中的代碼都很類似。但是,如果從延遲的角度來看,上述的代碼片段可以分為4個步驟。前兩個步驟相互獨立,只有第三個步驟(bookTicket())需要 lookupFlight() 和findPassenger() 的返回值。這裡顯然有機會利用並發的優勢,但是,開發人員很少採用這種方式,因為這需要比較複雜的線程池、Future以及回調。但是,如果API已經兼容Rx了,你可以簡單地將遺留的阻塞式代碼包裝到 Flux 中, 如下所示。
public Mono<Flight> rxLookupFlight(String flightNo) {
return Mono.defer(() ->
Mono.just(lookupFlight(flightNo)));
}
public Mono<Passenger> rxFindPassenger(Long id) {
return Mono.defer(() ->
Mono.just(findPassenger(id)));
}
複製代碼
從語義上講,rx-方法其實以相同的方式完成了相同的任務,換言之,它們默認都是阻塞的。從客戶端的角度看,除了API更加冗長之外,我們其實沒有得到任何好處。
@Test
public void rxBookTicket() throws IOException {
// 只是佔位,不產生任何的副作用
Mono<Flight> flight = ticketService.rxLookupFlight("LOT 783");
Mono<Passenger> passenger = ticketService.rxFindPassenger(1L);
Mono<Ticket> ticket = flight.zipWith(passenger,
(f, p) -> ticketService.bookTicket(f, p));
ticket.subscribe(ticketService::sendEmail);
}
複製代碼
無論是傳統的阻塞程序,還是使用 Rx 的程序,它們的運行方式完全相同。不過上述代碼的執行方式是延遲執行的,我們得到了兩個 Flight 和 Passenger 類型的兩個佔位符,但是還沒有產生任何的副作用。此時,並沒有執行任何的數據庫查詢或Web服務調用。
在上述代碼中你需要關注 subscribe() 位於何處。不過在通常情況下,你的業務邏輯只是一直組合Flux,並將它們返回給某個框架或者腳手架層。實際的訂閱是由Web框架或某些膠水代碼在幕後完成的。自行調用 subscribe() 也算不上糟糕的實踐,但是將訂閱推遲得越遠越好。
為了理解執行的流程,從下往上觀察是一種非常有幫助的方法。我們訂閱了ticket,因此Rx必須透明地訂閱flight和passenger。此時,真正的業務邏輯才會執行。因為兩個Flux 都是cold類型的,並且沒有涉及並發,所以對flight的訂閱會在調用線程中觸發lookupFlight()阻塞方法。當lookupFlight()完成的時候,RxJava就可以訂閱passenger了。此時,它已經通過同步的flight接收到Flight實例。rxFindPassenger()會以阻塞的方式調用findPassenger()並接收一個Passenger。經過這個連接點之後,數據會往下游流動。Flight和Passenger實例通過提供的lambda表達式(bookTicket)被結合起來,傳遞給ticket.subscribe()。
這裡看上去有不少工作要做,並且運行方式本質上和開始的阻塞式代碼並沒有區別。但是,現在我們不需要修改任何邏輯就能聲明式地應用並發了。
如果業務方法返回Future(或者CompletableFuture,沒有本質區別),其實系統已經為我們做出了兩個決策。
- 底層對 lookupFlight() 的調用已經開始,這裡沒有任何延遲執行的空間。我們不會在這個方法上阻塞,但是工作已經啟動。
- 我們對並發沒有任何控制權。方法的具體實現決定了Future任務是在線程池調用,還是為每個請求建立一個新的線程。
Reactor 給了用戶更多的控制權。實際上Flux 一般都已經是異步的了,但是在個別情況下,還是需要為已有的Flux 添加並發功能。在遇到同步Flux 時,可以自由決定使用何種線程機制的是API的消費者,而不是API的實現者。上述功能都是通過 subscribeOn() 操作符實現的,如下所示。
Mono<Flight> flight = ticketService.rxLookupFlight("LOT 783")
.subscribeOn(Schedulers.boundedElastic());
Mono<Passenger> passenger = ticketService.rxFindPassenger(1L)
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(3)); // 還可以聲明一個超時
複製代碼
如果 API 是Reactor驅動的,我們可以在訂閱之前的任何地方插入 subscribeOn() 操作符,並提供一個所謂的Scheduler實例,不用花費太大的力氣就能讓兩個方法並發執行。
但是bookTicket( )依然有點美中不足,它返回的是Ticket,這毫無疑問是阻塞式的。儘管訂票的執行過程可能會非常迅速,但是將其按照 Reactor 的方式進行聲明也是值得的,這樣會讓API更易於演化。
public Mono<Ticket> rxBookTicket(Flight flight, Passenger passenger) {
return Mono.defer(() ->
Mono.just(bookTicket(flight, passenger)));
}
複製代碼
但是,現在zipWith()返回的是一個看上去很詭異的Mono<Mono>,根據經驗,每當你看到雙重包裝的類型(比如Optional<Optional<...>>),就意味着在某些地方缺失了對flatMap()的調用。
Mono<Mono<Ticket>> ticket = flight
.zipWith(passenger, (f, p) -> ticketService.rxBookTicket(f, p));
複製代碼
我們可以使用flatMap,並為其傳遞一個恆等式函數,如下:
Mono<Ticket> ticket = flight
.zipWith(passenger, (f, p) -> ticketService.rxBookTicket(f, p))
.flatMap(abs -> abs);
複製代碼
我們難免會認為subscribeOn()是在 Reactor 中實現並發的恰當工具。這個操作符的確能夠實現這一點,但盡量還是不要使用它(以及後文描述的publishOn() )。在現實中,Flux 來源於異步源,所以根本就沒有必要進行自定義的調度。這裡使用 subscribeOn() 只是為了展現如何升級已有的應用程序,從而有選擇性地使用反應式原則。但是,在實踐中,Scheduler和subscribeOn()應該是最後的“武器”。
使用 flatMap()
在上述樣例中,我們必須通過電子郵件發送一個 Ticket 列表,在這裡我們需要注意以下三點:
- 這個列表可能很長。
- 發送一封郵件需要幾毫秒,甚至幾秒。
- 發送失敗的時候,應用程序需要平穩運行,但是最後要報告哪些 ticket 沒有投遞成功。
最後一項需求迅速排除了簡單的tickets.forEach(this::sendEmail)方式,因為這種方式會立即拋出異常,並且不會繼續投遞ticket。所以你只能使用迭代器,大致代碼如下:
List<Ticket> faitures = new ArrayList<>();
for (Ticket ticket : tickets) {
try {
ticketService.sendEmail(ticket);
} catch (Exception e) {
log.warn("Failed to send {}", ticket, e);
faitures.add(ticket);
}
}
複製代碼
但是,前面兩個需求並沒有得到解決。我們不需要在一個線程中串行的發送郵件,按照傳統的方式我們可以使用ExecutorService 將每個電子郵件提交為一個獨立的任務,代碼如下:
List<Pair<Ticket, Future<Boolean>>> tasks = tickets.stream()
.map(ticket -> Pair.of(ticket, ticketService.sendEmailAsync(ticket)))
.collect(Collectors.toList());
List<Ticket> failures = tasks.stream().flatMap(pair -> {
try {
Future<Boolean> future = pair.getRight();
future.get(1, TimeUnit.SECONDS); // 1s 的發送時間
return Stream.empty();
} catch (Exception e) {
Ticket ticket = pair.getLeft();
log.warn("Failed to send {}", ticket, e);
return Stream.of(ticket);
}
}).collect(Collectors.toList());
//-----------------------------------------------------------
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public Future<Boolean> sendEmailAsync(Ticket ticket) {
return pool.submit(() -> sendEmail(ticket));
}
複製代碼
首先,我們遍歷tickets,並將它們提交到一個線程池中。準確地說,我們調用sendEmailAsync()輔助方法,將對sendEmail()的調用提交到一個線程池中,執行過程會包裝到一個Callable中。更準確地說,Callable的實例被先放到線程池前面的一個無界(默認情況下)隊列中。如果任務提交的速度太快,它們將無法及時得到處理,進而在線程池的無界隊列中積壓。這裡缺乏一種減緩提交速度的機制,這也是反應式流和回壓致力於解決的問題。
為了在發送郵件失敗時能夠重試,所以必須跟蹤哪個Future負責哪個Ticket,這裡以Pair來表示。第二個循環遍歷所有的Future,並試圖通過阻塞(get())和等待完成的方式來解除對它們的引用。如果get()成功返回,將會跳過這個Ticket。但是,如果出現異常,將會返回與該任務關聯的Ticket實例,這樣就能知道它失敗了,稍後再報告它。Stream.flatMap()允許返回零個或一個元素(實際上可以是任意數量),而Stream.map()通常需要一個元素。
你可能想問,為何需要兩個循環而不是如下的一個循環呢?
tickets.stream()
.map(ticket -> Pair.of(ticket, ticketService.sendEmailAsync(ticket)))
.flatMap(pair -> {
// ...
}).collect(Collectors.toList());
複製代碼
如果你不理解Java 8的Stream是如何運行的,就很難發現這裡有一個非常有意思的bug。因為流與 Flux 類似,它們都是延遲執行的,所以只有在請求終端操作(terminal operation)的時候(例如collect(toList())),才會針對底層集合中的每個元素依次執行操作。這意味着啟動後台任務的map()操作並沒有針對所有ticket立即執行,而是每次只執行一個元素,交替使用flatMap()。除此之外,我們實際上啟動了一個Future,阻塞等待,然後啟動第二個Future,阻塞等待,以此類推。
上述代碼涉及很多工作,並且出現錯誤的可能性非常高,更不要提什麼可讀性和可維護性了。那麼 Reactor 能在這個場景中發揮什麼作用那? 首先,我們將發送郵件的 API 更新為使用 Reactor,如下所示:
public Mono<Boolean> rxSendEmail(Ticket ticket){
// 將阻塞的源包裝為反應式
return Mono.fromCallable(() -> sendEmail(ticket));
}
複製代碼
這裡還沒有涉及並發,它只是將sendEmail()包裝進了一個 Mono 中。然後可以像前面一樣遍歷所有的tickets,它並不涉及任何並發,如下所示:
List<Ticket> failures = Flux.fromIterable(this.tickets)
.flatMap(ticket ->
ticketService.rxSendEmail(ticket)
// .flatMap(response -> Mono.<Ticket>empty())
.ignoreElement()
.ofType(Ticket.class)
.doOnError(e -> log.warn("Failed to send {}", ticket, e))
.onErrorReturn(ticket)
.buffer()
.blockLast(); // 收集發送失敗的tickets
複製代碼
在以上的樣例中,很容易看到內層的flatMap()忽略了response並返回了一個空的流。在這樣的場景中,flatMap()就有點大材小用了,更有效的方式是ignoreElements()。ignoreElements()會忽略上游發布的值,只轉發onCompleted()或onError()通知。因為我們忽略實際的響應,只處理錯誤,所以這裡的ignoreElements()能夠運行得非常好。
我們感興趣的內容都在外層flatMap()中。如果只是使用flatMap(this::rxSendEmail),代碼也可以運行,只不過rxSendEmail引發的任何故障都會終結整個流。但是,我們想要“捕獲”所有發布出來的錯誤,將其收集起來供後續使用。我們使用了與Stream.flatMap()類似的技巧:如果response能夠成功發布,就將其轉換為一個空的 Flux。它的基本含義就是丟棄成功的ticket。但是,如果遇到故障,樣例會返回引發故障的ticket。額外的doOnError()回調允許將異常以日誌的形式記錄下來。當然,也可以將日誌記錄添加到onErrorReturn()操作符中,但是我們發現這種關注點分離的方式更符合函數式的風格。
需要注意,如果將外層的flatMap()替換為concatMap(),我們將會遇到與前文提及的JDK中的Stream類似的bug。flatMap(或merge)會立即訂閱所有的內部流。與之相反,concatMap(或concat)則會依次訂閱每個內部Flux。並且只要沒有人真正訂閱Flux,它就不會開展任何工作。
到目前為止,一個帶有try-catch的for循環被替換成了更難閱讀、更複雜的 Flux。但是,為了將序列化代碼轉換為多線程計算,我們只需要再加一個操作符,**通過將外層 flatMap 內嵌的 Flux 使用 subscribeOn() 這個操作符使 Mono 採用異步運行的方式。**如下所示。
List<Ticket> failures = Flux.fromIterable(this.tickets)
.flatMap(ticket ->
ticketService.rxSendEmail(ticket)
// .flatMap(response -> Mono.<Ticket>empty())
.ignoreElement()
.ofType(Ticket.class)
.doOnError(e -> log.warn("Failed to send {}", ticket, e))
.onErrorReturn(ticket)
.subscribeOn(Schedulers.boundedElastic())) // 內嵌的flux執行異步操作
.buffer()
.blockLast(); // 收集發送失敗的tickets
複製代碼
它沒有太多的侵入性,你甚至可能很難發現它的存在。額外的 subscribeOn() 操作符會讓每個單獨的rxSendMail()都在一個特定的Scheduler中執行,這是 Reactor 的優勢之一。
在線程方面,Flux 默認同步執行,但是它能夠實現無縫甚至透明的多線程功能。當然,這並不意味着我們可以在任意位置安全地注入Scheduler。只不過,它的API更加簡潔,抽象層級也更高。我們只需要記住 Flux 默認是同步的。但是,我們可以很輕鬆地改變這種行為,將並發功能用到往常我們認為不可能出現的地方。這對於現存的遺留應用程序很有價值,藉助這種功能,可以輕鬆地對其進行優化。
subscribeOn() 放在外部還是內部也是值得討論的,此外應該儘可能推遲對Flux的訂閱,一般這會發生在外部世界的Web框架附近。這會大大改變你的思維方式,因為整個業務邏輯都是延遲執行的,直到有人真正想要看到結果的時候才會運行。