🕐 Tempo di lettura: 2 minuti

È stata una prima stagione di articoli che mi ha divertito e insegnato parecchio, dopo l'appuntamento di oggi Java Pills si prenderà una pausa estiva e tornerà a settembre!

Chiudiamo la stagione con la seconda parte della pillola #17, dedicandoci a .share(), per gestire più subscriber concorrenti in modo efficace.

📌 Cold stream vs hot shared

Quando più subscriber si sottoscrivono quasi nello stesso momento a uno stesso Flux, il comportamento di default – essendo cold – è quello di rieseguire la logica per ciascuno. Con .share(), invece, il Flux diventa hot:

📑 Il test (sì, è un po' contorto)

@Test
void test_buildFlux_withShare() throws InterruptedException {
    final AtomicInteger subscriptionCount = new AtomicInteger();
    //create a cold Flux that emits "A", "B", "C" and increments the counter on each new subscription
    final Flux<String> sharedFlux = buildFlux(subscriptionCount)
            .delayElements(Duration.ofMillis(100)) //simulate delay to keep the Flux active
            .share();                              //share the source among concurrent subscribers

    final List<String> result1 = new ArrayList<>();
    final List<String> result2 = new ArrayList<>();
    final CountDownLatch latch = new CountDownLatch(2); //to wait for both subscriptions to finish

    subscribeAsync(sharedFlux, result1, latch);
    subscribeAsync(sharedFlux, result2, latch);

    assertTrue(latch.await(2, TimeUnit.SECONDS)); //wait for both subscriptions to complete
    checkResults(result1, result2);
    //expect <1> because the subscription is shared among concurrent subscribers
    assertEquals(1, subscriptionCount.get());
}

//start both subscriber threads (they will subscribe almost simultaneously) in separate threads
private void subscribeAsync(final Flux<String> sharedFlux, final List<String> result, final CountDownLatch latch) {
    new Thread(() -> sharedFlux.collectList()
            .doOnNext(result::addAll)               //collect the emitted values
            .doFinally(signal -> latch.countDown()) //signal when done
            .block())                               //block until the Flux completes
        .start();
}

Cosa succede nel test:

📌 Quando conviene usare share?

⚠️ Limiti e attenzioni

Ci rivediamo a settembre con la prossima pillola! ☕