Skip to content

6 Consumir el resultado de manera asíncrona

Cuando se complete un CompletableFuture

El método thenAcceptAsync(consumer) permite pasar el resultado de la tarea de un CompletableFuture a un consumidor para que haga uso de él.

El método thenAcceptAsync() retorna un CompletableFuture<Void>, ya que la función consumidora no retorna nada, por lo que no se podrá encadenar otra operación después de él. Corresponde a una tarea terminal, por lo que siempre se utiliza al final de una cadena de operaciones.

La ventaja que tiene el método thenAcceptAsync(consumer) es que la ejecución encadenada de ambas operaciones se realiza de manera asíncrona sin que sea necesaria la intervención del hilo principal, es decir sin que éste tenga que hacer get().

Este método se encuentra sobrecargado, thenAcceptAsync(consumer, executor), de manera que recibe un segundo parámetro en el que indicaremos el ejecutor en el que debe consumirse el valor.

Método thenAccept

También existe el método thenAccept(consumer), pero debemos tener en cuenta que bajo ciertas circunstancias es posible que el consumer se ejecute en el hilo principal, como vimos anteriormente en el caso de thenRun(runnable).

Veamos un ejemplo:

private void thenAcceptAsyncExample() {
    CompletableFuture<Void> cf = 
        CompletableFuture.supplyAsync(this::generateNumber)
        .thenAcceptAsync(this::printNumber);
    System.out.printf("%s - Main\n", Thread.currentThread().getName());
    cf.join();
}

private int generateNumber() {
    System.out.printf("%s - Supplier\n", Thread.currentThread().getName());
    return 2;
}

private void printNumber(Integer value) {
    System.out.printf("%s - Consumer - %d\n",
                    Thread.currentThread().getName(), value);
}

Cuando se complete un CompletableFuture retornando el valor con el que se ha completado

Como hemos comentado en el apartado anterior, el método thenAcceptAsync() retorna un CompletableFuture<Void>, ya que la función consumidora no retorna nada, por lo que no se podrá encadenar otra operación después de él, al ser una operación terminal.

Sin embargo hay ocasiones en las que queremos consumir el valor simplemente para realizar alguna tarea auxiliar colateral, pero que podamos continuar la cadena de operaciones. Para ello tenemos disponible el método whenCompleteAsync(biConsumer), al que pasamos un biConsumer que recibirá el valor con el que se ha completado el CompletableFuture y un Throwable con la excepción que se ha producido. Uno de los dos argumentos será null dependiendo de si el CompletableFuture ha sido completado con una excepción o completado correctamente produciendo un valor.

El método whenCompleteAsync(biConsumer) retorna un CompletableFuture<T>, donde T es el tipo del CompletableFuture sobre el que se ejecuta, ya que retorna el valor con el que éste es completado. Por tanto, podemos encadenar más operaciones después de esta operación. En este sentido es similar a la operación peek() de los Streams.

Este método se encuentra sobrecargado, whenCompleteAsync(biConsumer, executor), de manera que recibe un segundo parámetro en el que indicaremos el ejecutor en el que debe ejecutarse el biConsumer.

Método whenComplete

También existe el método whenComplete(biConsumer), pero debemos tener en cuenta que bajo ciertas circunstancias es posible que el biConsumer se ejecute en el hilo principal, como vimos anteriormente en el caso de thenRun(runnable).

private void whenCompleteAsyncExample() {
    CompletableFuture<Void> cf =
        CompletableFuture.supplyAsync(this::generateNumber)
        .whenCompleteAsync(this::log)
        .thenApplyAsync(this::duplicate)
        .thenAcceptAsync(this::printNumber);
    System.out.printf("%s - Main\n", Thread.currentThread().getName());
    cf.join();
}

private int generateNumber() {
    System.out.printf("%s - Supplier\n", Thread.currentThread().getName());
    return 2;
}

private void log(Integer value, Throwable throwable) {
    if (value != null) {
        System.out.printf("%s - Before Duplication - %d\n",
                        Thread.currentThread().getName(), value);
    }
}

private Integer duplicate(Integer value) {
    int duplicated = value * 2;
    System.out.printf("%s - Function - Duplicated: %d\n",
                    Thread.currentThread().getName(), duplicated);
    return duplicated;
}

private void printNumber(Integer value) {
    System.out.printf("%s - Consumer - %d\n",
                    Thread.currentThread().getName(), value);
}

Cuando se complete el primero de dos CompletableFuture

Algunas veces lanzamos dos cadenas de operaciones asíncronas, pero sólo estamos interesados en el primer resultado obtenido de los dos posibles. Si queremos consumir dicho resultado podemos usar el método acceptEitherAsync(completableFuture, consumer). Evidentemente el tipo del resultado de ambos CompletableFuture (sobre el que se ejecuta el método y el pasado como argumento) tiene que ser el mismo.

Este método retorna un CompletableFuture<Void>, ya que se usa para consumir un valor, por lo que no se podrá encadenar más operaciones tras él. Por este motivo solo se puede usar este método al final de una cadena de operaciones.

El método se encuentra sobrecargado acceptEitherAsync(completableFuture, consumer, executor) para recibir el executor en el que queremos ejecutar el consumer.

Un aspecto muy importante es que este método no cancela la ejecución de la segunda operación una vez que es completada la primera en ser completada.

Método acceptEither

También existe el método acceptEither(completableFuture, consumer), pero debemos tener en cuenta que bajo ciertas circunstancias es posible que el consumer se ejecute en el hilo principal, como vimos anteriormente en el caso de thenRun(runnable).

private void acceptEitherAsyncExample() {
    CompletableFuture<Integer> cf1 = 
        CompletableFuture.supplyAsync(this::generateNumber1);
    CompletableFuture<Integer> cf2 = 
        CompletableFuture.supplyAsync(this::generateNumber2);
    CompletableFuture<Void> cfFirstConsumed = 
        cf1.acceptEitherAsync(cf2, this::printNumber);
    System.out.printf("%s - Main\n", Thread.currentThread().getName());
    cfFirstConsumed.join();
    cf1.join();
    cf2.join();
}

private int generateNumber1() {
    int value = ThreadLocalRandom.current().nextInt(5) + 1;
    sleep(value * 1000);
    System.out.printf("%s - Supplier1 - %d\n", Thread.currentThread().getName(), value);
    return value;
}

private int generateNumber2() {
    int value = ThreadLocalRandom.current().nextInt(5) + 1;
    sleep(value * 1000);
    System.out.printf("%s - Supplier2 - %d\n", Thread.currentThread().getName(), value);
    return value;
}

private void printNumber(Integer value) {
    System.out.printf("%s - Consumer - %s\n",
                    Thread.currentThread().getName(), value);
}

private boolean sleep(long timeInMilis) {
    try {
        Thread.sleep(timeInMilis);
        return true;
    } catch (InterruptedException e) {
        return false;
    }
}

Cuando se completen dos CompletableFuture

Algunas veces lanzamos dos cadenas de operaciones asíncronas y estamos interesados en consumir sus resultados de forma conjunta cuando ambas han completado. Para ello podemos usar el método thenAcceptBothAsync(completableFuture, biConsumer).

Este método retorna un CompletableFuture<Void>, ya que se usa para consumir dos valores, por lo que no se podrá encadenar más operaciones tras él. Por este motivo solo se puede usar este método al final de una cadena de operaciones.

El método se encuentra sobrecargado thenAcceptBothAsync(completableFuture, biConsumer, executor) para recibir el executor en el que queremos ejecutar el consumer.

Método thenAcceptBoth

También existe el método thenAcceptBoth(completableFuture, biConsumer), pero debemos tener en cuenta que bajo ciertas circunstancias es posible que el consumer se ejecute en el hilo principal, como vimos anteriormente en el caso de thenRun(runnable).

private void thenAcceptBothAsyncExample() {
    CompletableFuture<Integer> cf1 = 
        CompletableFuture.supplyAsync(this::generateNumber1);
    CompletableFuture<Integer> cf2 = 
        CompletableFuture.supplyAsync(this::generateNumber2);
    CompletableFuture<Void> cfBothConsumed = cf1.thenAcceptBothAsync(cf2, this::add);
    System.out.printf("%s - Main\n", Thread.currentThread().getName());
    cfBothConsumed.join();
}

private int generateNumber1() {
    int value = ThreadLocalRandom.current().nextInt(5) + 1;
    sleep(value * 1000);
    System.out.printf("%s - Supplier1 - %d\n", Thread.currentThread().getName(), value);
    return value;
}

private int generateNumber2() {
    int value = ThreadLocalRandom.current().nextInt(5) + 1;
    sleep(value * 1000);
    System.out.printf("%s - Supplier2 - %d\n", Thread.currentThread().getName(), value);
    return value;
}

private void add(Integer value1, Integer value2) {
    System.out.printf("%s - BiConsumer - %d + %d = %d\n",
                    Thread.currentThread().getName(), value1, value2, value1 + value2);
}

private boolean sleep(long timeInMillis) {
    try {
        Thread.sleep(timeInMillis);
        return true;
    } catch (InterruptedException e) {
        return false;
    }
}