Skip to content

7 Transformar el resultado de manera asíncrona

Cuando se complete un CompletableFuture

El método thenApplyAsync(function) retornará un nuevo CompletableFuture<T> donde T corresponderá al tipo de retorno de la función de transformación pasada como argumento, que se aplicará sobre el valor del CompletableFuture original sobre el que se ejecuta.

Método thenApply

También existe el método thenApply(function), pero debemos tener en cuenta que bajo ciertas circunstancias es posible que la function se ejecute en el hilo principal, como vimos anteriormente en el caso de thenRun(runnable).

private void thenApplyAsyncExample() {
    CompletableFuture<Void> cf =
        CompletableFuture.supplyAsync(this::generateNumber)
                        .thenApplyAsync(this::duplicate)
                        .thenAcceptAsync(this::printNumber);
    System.out.printf("%s - Main\n", Thread.currentThread().getName());
    cf.join();
}

private int generateNumber() {
    System.out.printf("%s - Supplier\n", Thread.currentThread().getName());
    return 2;
}

private Integer duplicate(Integer value) {
    int duplicated = value * 2;
    System.out.printf("%s - Function - Duplicated: %d\n",
                    Thread.currentThread().getName(), duplicated);
    return duplicated;
}

private void printNumber(Integer value) {
    System.out.printf("%s - Consumer - %d\n",
                    Thread.currentThread().getName(), value);
}

El tipo T del valor de retorno de la función de transformación NO tiene por qué ser el mismo del tipo del valor del CompletableFuture original. Por ejemplo:

private void thenApplyAsyncExample() {
    CompletableFuture<Void> cf =
        CompletableFuture.supplyAsync(this::generateNumber)
                        .thenApplyAsync(this::duplicate)
                        .thenAcceptAsync(this::printNumber);
    System.out.printf("%s - Main\n", Thread.currentThread().getName());
    cf.join();
}

private int generateNumber() {
    System.out.printf("%s - Supplier\n", Thread.currentThread().getName());
    return 2;
}

private String duplicate(Integer value) {
    String duplicated = "value " + (value * 2);
    System.out.printf("%s - Function - Duplicated: %s\n",
                    Thread.currentThread().getName(), duplicated);
    return duplicated;
}

private void printNumber(String value) {
    System.out.printf("%s - Consumer - %s\n",
                    Thread.currentThread().getName(), value);
}

Funciona por tanto como el map clásico de la programación funcional.

Cuando se complete un CompletableFuture y la function retorne un CompletableFuture (compose)

Supongamos que encadenamos una tarea mediante thenApplyAsync(function). Como ya sabemos, dicho método retornará un CompletableFuture<T>, donde T corresponde al tipo de valor de retorno de la función suministrada a thenApplyAsync(). Pero ¿qué ocurre si dicha función no puede calcular el valor inmediatamente sino que retorna un valor futuro en forma de CompletableFuture<Integer>? Como consecuencia, el tipo de retorno del método thenApplyAsync será CompletableFuture<CompletableFuture<Integer>>. Si encadenamos varias operaciones de este tipo el tipo de retorno cada vez se hace más problemático CompletableFuture<CompletableFuture<CompletableFuture<..., lo que hace que no sea práctico.

Para solucionar este problema, cuando la función vaya a retornar un CompletableFuture, en vez de usar el método thenApplyAsync(function), deberemos usar el método thenComposeAsync(functionReturnsCompletableFuture), que retorna directamente el valor de retorno de la función suministrada, que deberá ser un CompletableFuture, evitando así encapsular el CompletableFuture retornado dentro dentro de otro CompletableFuture.

El método está sobrecargado thenComposeAsync(functionReturnsCompletableFuture, executor) para que podamos indicar en qué ejecutor queremos que se ejecute la función.

Su comportamiento corresponde a la operación flatMap de la programación funcional.

Método thenCompose

También existe el método thenCompose(functionReturnsCompletableFuture), pero debemos tener en cuenta que bajo ciertas circunstancias es posible que la function se ejecute en el hilo principal, como vimos anteriormente en el caso de thenRun(runnable).

private void thenComposeAsyncExample() {
    CompletableFuture<Void> cf = 
        CompletableFuture.supplyAsync(this::generateNumber)
                        .thenComposeAsync(this::duplicate)
                        .thenAcceptAsync(this::printNumber(value));
    System.out.printf("%s - Main\n", Thread.currentThread().getName());
    cf.join();
}

private int generateNumber() {
    System.out.printf("%s - Supplier\n", Thread.currentThread().getName());
    return 2;
}

private CompletableFuture<Integer> duplicate(Integer value) {
    return CompletableFuture.supplyAsync(() -> {
        int duplicated = value * 2;
        System.out.printf("%s - Function - Duplicated: %d\n",
                        Thread.currentThread().getName(), duplicated);
        return duplicated;
    });
}

private void printNumber(Integer value) {
    System.out.printf("%s - Consumer - %d\n",
                    Thread.currentThread().getName(), value);
}

Cuando se complete el primero de dos CompletableFuture

Algunas veces lanzamos dos cadenas de operaciones asíncronas, pero sólo estamos interesados en el primer resultado obtenido de los dos posibles. Si queremos transformar dicho resultado podemos usar el método applyToEitherAsync(completableFuture, function). Evidentemente el tipo del resultado de ambos CompletableFuture (sobre el que se ejecuta el método y el pasado como argumento) tiene que ser el mismo.

Este método retorna un CompletableFuture<T>, donde T es el tipo de retorno de la function. Podemos encadenar más operaciones detrás de la llamada a este método.

El método se encuentra sobrecargado applyToEitherAsync(completableFuture, function, executor) para recibir el executor en el que queremos ejecutar la function.

Un aspecto muy importante es que este método no cancela la ejecución de la segunda operación una vez que es completada la primera en ser completada.

Método applyToEither

También existe el método applyToEither(completableFuture, function), pero debemos tener en cuenta que bajo ciertas circunstancias es posible que la function se ejecute en el hilo principal, como vimos anteriormente en el caso de thenRun(runnable).

private void applyToEitherAsyncExample() {
    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(this::generateNumber1);
    CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(this::generateNumber2);
    CompletableFuture<Void> cfFirstTransformedAndConsumed =
        cf1.applyToEitherAsync(cf2, this::duplicate)
        .thenAccept(this::printNumber);
    System.out.printf("%s - Main\n", Thread.currentThread().getName());
    cfFirstTransformedAndConsumed.join();
    cf1.join();
    cf2.join();
}

private int generateNumber1() {
    int value = ThreadLocalRandom.current().nextInt(5) + 1;
    sleep(value * 1000);
    System.out.printf("%s - Supplier1 - %d\n", Thread.currentThread().getName(), value);
    return value;
}

private int generateNumber2() {
    int value = ThreadLocalRandom.current().nextInt(5) + 1;
    sleep(value * 1000);
    System.out.printf("%s - Supplier2 - %d\n", Thread.currentThread().getName(), value);
    return value;
}

private Integer duplicate(Integer value) {
    int duplicated = value * 2;
    System.out.printf("%s - Function - Duplicated: %d\n",
                    Thread.currentThread().getName(), duplicated);
    return duplicated;
}

private void printNumber(Integer value) {
    System.out.printf("%s - Consumer - %d\n",
                    Thread.currentThread().getName(), value);
}

private boolean sleep(long timeInMilis) {
    try {
        Thread.sleep(timeInMilis);
        return true;
    } catch (InterruptedException e) {
        return false;
    }
}

Cuando se completen dos CompletableFuture (combine)

Algunas veces lanzamos dos cadenas de operaciones asíncronas y estamos interesados en combinar sus resultados cuando ambas han completado, mediante una función de combinación que produzca un resultado combinado con el que seguir la cadena de operaciones.

Para ello tenemos disponible el método thenCombineAsync(completableFuture, biFunction), que retorna un CompletableFuture<U> donde U corresponde al tipo de retorno de la biFunction. La biFunction recibirá dos parámetros, el primero de un tipo T, correspondiente al tipo del valor al que se ha completado el CompletableFuture sobre el que estamos ejecutando el método thenCombineAsync(), y el segundo parámetro de un tipo R, correspondiente al tipo del valor al que se ha completado el CompletableFuture pasado como argumento a thenCombineAsync().

La biFunction sólo será ejecutada cuando ambos CompletableFuture (sobre el que se está ejecutando y el pasado como argumento) han completado su ejecución.

El método está sobrecargado thenCombineAsync(completableFuture, biFunction, executor) para recibir un tercer argumento con el ejecutor en el que queremos que se ejecute la biFunction.

Método thenCombine

También existe el método thenCombine(completableFuture, biFunction), pero debemos tener en cuenta que bajo ciertas circunstancias es posible que la biFunction se ejecute en el hilo principal, como vimos anteriormente en el caso de thenRun(runnable).

private void thenCombineAsyncExample() {
    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(this::generateNumber);
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(this::generateName);
    CompletableFuture<Void> cfCombined = 
        cf1.thenCombineAsync(cf2, this::combineNumberAndName)
        .thenAcceptAsync(this::printCombination);
    System.out.printf("%s - Main\n", Thread.currentThread().getName());
    cfCombined.join();
}

private int generateNumber() {
    System.out.printf("%s - Number Supplier\n", Thread.currentThread().getName());
    return 2;
}

private String generateName() {
    System.out.printf("%s - Name Supplier\n", Thread.currentThread().getName());
    return "Baldomero";
}

private String combineNumberAndName(Integer number, String name) {
    String combination = name + " " + number;
    System.out.printf("%s - Bifunction: %s\n", 
                    Thread.currentThread().getName(), combination);
    return combination;
}

private void printCombination(String combination) {
    System.out.printf("%s - Consumer - %s\n",
                    Thread.currentThread().getName(), combination);
}

Digamos que corresponde a una reducción simple de dos CompletableFutures en uno solo.