🕐 Tempo di lettura: 3 minuti
Nel progetto di AML su cui siamo attualmente allocati, il buon Mario ha dovuto migliorare le performance di alcuni servizi, evitando il fetching di dati che non mutavano frequentemente. Esplorando la documentazione di WebFlux, ha trovato i metodi .cache() e .share(), che si prestano bene ad un nuovo articolo!
📌 Flux cold e perché serve .cache()
In Reactor, i Flux sono cold: ogni volta che un subscriber si sottoscrive, il flusso riparte da capo, rieseguendo la logica alla sorgente. Questo è il comportamento standard, ma non sempre desiderabile. Immagina, ad esempio, più subscriber che, a pochi minuti l'uno dall'altro, interrogano la stessa sorgente costosa (es. una tabella di configurazione che si aggiorna ogni 10 minuti). In questi casi, rieseguire tutto ogni volta è inefficiente. Il metodo .cache() permette di memorizzare i dati emessi dal flusso alla prima sottoscrizione, e riutilizzarli per quelle successive, evitando così ricalcoli.
📑 L'esperimento – con e senza cache
class SampleTest017 {
@Test
void test_buildFlux_noCache() {
final AtomicInteger subscriptionCount = new AtomicInteger();
final Flux<String> sharedFlux = buildFlux(subscriptionCount);
final List<String> result1 = invokeAndCollect(sharedFlux);
final List<String> result2 = invokeAndCollect(sharedFlux);
checkResults(result1, result2);
//expect <2> because the source is cold and not cached
assertEquals(2, subscriptionCount.get());
}
@Test
void test_buildFlux_withCache() {
final AtomicInteger subscriptionCount = new AtomicInteger();
final Flux<String> sharedFlux = buildFlux(subscriptionCount)
.cache(); //cache the emitted items for all subsequent subscribers
final List<String> result1 = invokeAndCollect(sharedFlux); //1st subscription triggers the source
final List<String> result2 = invokeAndCollect(sharedFlux); //2nd subscription uses the cached data
checkResults(result1, result2);
//expect <1> because the result is cached after the first subscription
assertEquals(1, subscriptionCount.get());
}
//build a Flux that emits "A", "B", "C" and increments the count each time the Flux is subscribed to
private Flux<String> buildFlux(final AtomicInteger count) {
return Flux.just("A", "B", "C").doOnSubscribe(subscription -> count.incrementAndGet());
}
private List<String> invokeAndCollect(final Flux<String> sharedFlux) {
return sharedFlux.collectList().block();
}
@SafeVarargs
private void checkResults(final List<String>... results) {
Arrays.stream(results).forEach(result -> assertEquals(List.of("A", "B", "C"), result));
}
}
📑 Analisi del 1° test – test_buildFlux_noCache()
Il comportamento di default è ben visibile:
- il
Fluxemette i valori"A","B","C"e incrementa un contatore ogni volta che parte una nuova sottoscrizione - due chiamate a
invokeAndCollect(...)equivalgono a due esecuzioni distinte
📑 Analisi del 2° test – test_buildFlux_withCache()
Qui usiamo .cache():
- la prima sottoscrizione attiva il flusso e ne memorizza i valori
- le sottoscrizioni successive ottengono i dati già emessi, senza riesecuzione (e
subscriptionCountnon viene incrementato)
📌 Quando conviene usare cache?
- quando i dati non cambiano spesso
- se i subscriber arrivano vicini nel tempo
- se vogliamo evitare latenza, carico inutile o spreco di risorse
⚠️ Limiti e attenzioni
- la cache è in memoria e locale al
Flux: se ricrei ilFlux, perdi la cache - più thread che accedono allo stesso oggetto
Fluxcon.cache()condividono i dati, ma non è una cache condivisa a livello globale - di default, la cache non ha scadenza: i dati rimangono finché il
Fluxnon viene ricreato o il garbage collector non ripulisce. Puoi però specificare un timeout con.cache(Duration). cachenon va confuso con l'annotazione@Cacheabledi Spring:cacheè un metodo reattivo, locale e temporaneo; mentre@Cacheableè centralizzata e pensata per cache distribuite o persistenti, basate su chiavicachesi adatta meglio a flussi finiti: se usato con flussi infiniti, rischia di occupare sempre più memoria, a meno di limitazioni esplicite come.take(n)
L'articolo sta diventando un pippone troppo lungo… e dopotutto, la rubrica si chiama "Pills"! Mi fermo qui e rimando il metodo .share() alla parte 2 – stay tuned!
Alla prossima pillola! ☕