🕐 Tempo di lettura: 3 minuti

Nel progetto di AML su cui siamo attualmente allocati, il buon Mario ha dovuto migliorare le performance di alcuni servizi, evitando il fetching di dati che non mutavano frequentemente. Esplorando la documentazione di WebFlux, ha trovato i metodi .cache() e .share(), che si prestano bene ad un nuovo articolo!

📌 Flux cold e perché serve .cache()

In Reactor, i Flux sono cold: ogni volta che un subscriber si sottoscrive, il flusso riparte da capo, rieseguendo la logica alla sorgente. Questo è il comportamento standard, ma non sempre desiderabile. Immagina, ad esempio, più subscriber che, a pochi minuti l'uno dall'altro, interrogano la stessa sorgente costosa (es. una tabella di configurazione che si aggiorna ogni 10 minuti). In questi casi, rieseguire tutto ogni volta è inefficiente. Il metodo .cache() permette di memorizzare i dati emessi dal flusso alla prima sottoscrizione, e riutilizzarli per quelle successive, evitando così ricalcoli.

📑 L'esperimento – con e senza cache

class SampleTest017 {

    @Test
    void test_buildFlux_noCache() {
        final AtomicInteger subscriptionCount = new AtomicInteger();
        final Flux<String> sharedFlux = buildFlux(subscriptionCount);
        final List<String> result1 = invokeAndCollect(sharedFlux);
        final List<String> result2 = invokeAndCollect(sharedFlux);
        checkResults(result1, result2);
        //expect <2> because the source is cold and not cached
        assertEquals(2, subscriptionCount.get());
    }

    @Test
    void test_buildFlux_withCache() {
        final AtomicInteger subscriptionCount = new AtomicInteger();
        final Flux<String> sharedFlux = buildFlux(subscriptionCount)
                .cache(); //cache the emitted items for all subsequent subscribers
        final List<String> result1 = invokeAndCollect(sharedFlux); //1st subscription triggers the source
        final List<String> result2 = invokeAndCollect(sharedFlux); //2nd subscription uses the cached data
        checkResults(result1, result2);
        //expect <1> because the result is cached after the first subscription
        assertEquals(1, subscriptionCount.get());
    }

    //build a Flux that emits "A", "B", "C" and increments the count each time the Flux is subscribed to
    private Flux<String> buildFlux(final AtomicInteger count) {
        return Flux.just("A", "B", "C").doOnSubscribe(subscription -> count.incrementAndGet());
    }

    private List<String> invokeAndCollect(final Flux<String> sharedFlux) {
        return sharedFlux.collectList().block();
    }

    @SafeVarargs
    private void checkResults(final List<String>... results) {
        Arrays.stream(results).forEach(result -> assertEquals(List.of("A", "B", "C"), result));
    }
}

📑 Analisi del 1° test – test_buildFlux_noCache()

Il comportamento di default è ben visibile:

📑 Analisi del 2° test – test_buildFlux_withCache()

Qui usiamo .cache():

📌 Quando conviene usare cache?

⚠️ Limiti e attenzioni

L'articolo sta diventando un pippone troppo lungo… e dopotutto, la rubrica si chiama "Pills"! Mi fermo qui e rimando il metodo .share() alla parte 2 – stay tuned!

Alla prossima pillola! ☕