🕐 Tempo di lettura: 2 minuti
È stata una prima stagione di articoli che mi ha divertito e insegnato parecchio, dopo l'appuntamento di oggi Java Pills si prenderà una pausa estiva e tornerà a settembre!
Chiudiamo la stagione con la seconda parte della pillola #17, dedicandoci a .share(), per gestire più subscriber concorrenti in modo efficace.
📌 Cold stream vs hot shared
Quando più subscriber si sottoscrivono quasi nello stesso momento a uno stesso Flux, il comportamento di default – essendo cold – è quello di rieseguire la logica per ciascuno. Con .share(), invece, il Flux diventa hot:
- la prima sottoscrizione attiva il flusso
- le successive si agganciano allo stesso stream condiviso
- la logica sorgente viene eseguita una sola volta (fintanto che c'è almeno un subscriber attivo)
📑 Il test (sì, è un po' contorto)
@Test
void test_buildFlux_withShare() throws InterruptedException {
final AtomicInteger subscriptionCount = new AtomicInteger();
//create a cold Flux that emits "A", "B", "C" and increments the counter on each new subscription
final Flux<String> sharedFlux = buildFlux(subscriptionCount)
.delayElements(Duration.ofMillis(100)) //simulate delay to keep the Flux active
.share(); //share the source among concurrent subscribers
final List<String> result1 = new ArrayList<>();
final List<String> result2 = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(2); //to wait for both subscriptions to finish
subscribeAsync(sharedFlux, result1, latch);
subscribeAsync(sharedFlux, result2, latch);
assertTrue(latch.await(2, TimeUnit.SECONDS)); //wait for both subscriptions to complete
checkResults(result1, result2);
//expect <1> because the subscription is shared among concurrent subscribers
assertEquals(1, subscriptionCount.get());
}
//start both subscriber threads (they will subscribe almost simultaneously) in separate threads
private void subscribeAsync(final Flux<String> sharedFlux, final List<String> result, final CountDownLatch latch) {
new Thread(() -> sharedFlux.collectList()
.doOnNext(result::addAll) //collect the emitted values
.doFinally(signal -> latch.countDown()) //signal when done
.block()) //block until the Flux completes
.start();
}
Cosa succede nel test:
- il
Fluxemette i valori"A","B","C"e (dovrebbe) incrementare un contatore ogni volta che parte una nuova sottoscrizione - i due thread però si sottoscrivono quasi in parallelo
- senza
.share(),subscriptionCountsarebbe2 - con
.share(),subscriptionCountè1: la sorgente è eseguita una sola volta, e i dati (a partire dalla sottoscrizione) vengono inviati ad entrambi i subscriber
📌 Quando conviene usare share?
- subscriber simultanei: quando più thread o client si sottoscrivono in parallelo (o quasi) allo stesso flusso
- elaborazioni condivise: per evitare calcoli duplicati su dati che cambiano spesso (es. accessi a microservizi, query pesanti, stream Kafka)
⚠️ Limiti e attenzioni
- il punto chiave:
.share()non è una cache, e non è un replay. È un alias di.publish().refCount(1) - ogni elemento emesso viene trasmesso solo ai subscriber attivi in quel momento
- se un subscriber si iscrive dopo che il flusso è iniziato, perde gli elementi già emessi
- se tutti i subscriber si disconnettono, il flusso si interrompe: alla prossima sottoscrizione ripartirà da capo
- quindi:
.share()funziona solo se i subscriber sono "live" nello stesso momento
Ci rivediamo a settembre con la prossima pillola! ☕