πŸ• Tempo di lettura: 4 minuti

Nel mondo della programmazione reattiva, uno dei concetti essenziali per una gestione efficiente dei dati Γ¨ il backpressure. Questo meccanismo consente di regolare il flusso tra produttore (publisher) e consumatore (subscriber) prevenendo sovraccarichi del sistema. In pratica, permette al consumatore di segnalare quando non riesce a gestire ulteriori dati, aspetto cruciale quando la velocitΓ  di generazione dei dati supera quella della loro elaborazione.

In WebFlux, due metodi fondamentali per gestire questa dinamica all'interno dei flussi reattivi sono hookOnSubscribe e hookOnNext. Entrambi appartengono alla classe BaseSubscriber e consentono di configurare la logica di sottoscrizione e di ricezione degli elementi del flusso.

πŸ“Œ hookOnSubscribe(Subscription subscription)

Invocato al momento della sottoscrizione del BaseSubscriber a un publisher, come un Flux o Mono, riceve come parametro un oggetto Subscription (rappresenta la relazione tra produttore e consumatore) e, consente di configurare la richiesta iniziale di elementi da ricevere tramite request(n) prevenendo così sovraccarichi fin dall'inizio.

πŸ“Œ hookOnNext(T value)

Viene invocato ogni volta che un nuovo elemento (value) viene emesso dal publisher, per gestire le logiche specifiche del consumatore: si occupa dell'elaborazione dei dati e include una nuova richiesta di elementi solo quando il consumatore Γ¨ pronto. Questo approccio garantisce un controllo granulare e permette di evitare overload, richiedendo nuovi elementi solo dopo aver completato l'elaborazione di quelli precedenti.

πŸ“‘ Esempio: richieste a blocchi di 5 elementi

Vediamo un'implementazione che dimostra come utilizzare il backpressure in un flusso reattivo con WebFlux. Il test genera un flusso di 40 elementi e, attraverso un BaseSubscriber personalizzato, il metodo hookOnSubscribe richiede inizialmente un blocco di 5 elementi, avviando il flusso in modo controllato. Il metodo hookOnNext viene chiamato per ciascun elemento ricevuto e, ogni volta che sono stati processati 5 elementi, richiede un nuovo blocco di dati, gestendo i picchi di carico senza incorrere in inefficienze o rallentamenti.

private static final int BUFFER_SIZE = 5; // number of elements requested in each batch
private static final int LIMIT = 40;
private int countRequests = 0;

@Test
void example() {
    Flux.range(1, LIMIT)
        .subscribe(new BaseSubscriber<>() {
            private int count = 0; //counter for processed elements

            @Override
            protected void hookOnSubscribe(final Subscription subscription) {
                System.out.println("--> hookOnSubscribe");
                System.out.println("Requested the first " + BUFFER_SIZE + " elements");
                request(BUFFER_SIZE);
                countRequests++;
            }

            @Override
            protected void hookOnNext(final Integer value) {
                //data processing β€” in our example, just printing the value
                System.out.println("Received: " + value);
                count++;
                //once BUFFER_SIZE elements have been processed, request another batch
                if (count % BUFFER_SIZE == 0 && count != LIMIT) {
                    System.out.println("--> hookOnNext");
                    System.out.println("Requested " + BUFFER_SIZE + " elements");
                    request(BUFFER_SIZE);
                    countRequests++;
                }
            }
        });

    assertEquals(LIMIT / BUFFER_SIZE, countRequests);
}

Alla prossima pillola! β˜•