Skip to content

10 InvokeAll()

invokeAll()

Como hemos visto en el ejemplo anterior, cuando queremos enviar una colección de tareas callable al ejecutor para que sean ejecutadas debemos enviarlas una a una e ir almacenando en una List<Future<V>> los objetos Future<V> retornados por las repetidas llamadas al método submit(callableV) del ejecutor. Posteriormente recorreremos dicha lista de objetos Future<V> obteniendo uno a uno el resultado retornado por cada tarea callable, esperando en orden a que el resultado de cada tarea esté disponible.

Sin embargo, la interfaz ExecutorService define un método adicional que nos permite en un solo realizar lo descrito en el apartado anterior. Así el método invokeAll(callableVCollection) recibe una colección de objetos Callable<V> que queremos que sean ejecutadas por el ejecutor, y retorna una List<Future<V>>, es decir una lista que contendrá un objeto Future por cada tarea en la lista recibida como parámetro, y en el mismo orden, cada uno de los cuales contendrá más adelante el valor retornado por la tarea Callable<V> correspondiente.

Si le pasamos null como callableVCollection el método lanzará la excepción NullPointerException. Si cualquiera de las tareas enviadas es rechazada por el ejecutor, por ejemplo porque éste ya haya sido terminado, el método lanzará la excepción RejectedExecutionException.

Lo más interesante de este método es que es bloqueante, en el sentido de que el hilo desde el que se llama queda bloqueado hasta que todas las tareas de la lista hayan finalizado su ejecución, ya sea correctamente o lanzando una excepción. Si alguna de la tareas callable lanzó una excepción, cuando accedamos al resultado de dicha tarea a través del método get() del correspondiente objeto Future se lanzará la excepción ExecutionException.

Si mientras está bloqueado esperando en el método invokeAll(callableVCollection) es hilo desde el que se llamó es interrumpido, se lanzará la excepción InterruptedException y se cancelarán las tareas de la lista que no hayan comenzado su ejecución y aquellas que no la hayan completado, marcando como interrumpidos los correspondientes hilos del threadpool del ejecutor.

Un aspecto muy importante es que debemos asegurarnos de no agregar o quitar elementos de la lista de callables pasada como argumento mientras se están ejecutando las tareas en el ejector, o de lo contrario se podrán producir errores y el resultado no está definido.

Este método se encuentra sobrecargado invokeAll(callableList, timeout, timeUnit), para que podamos especificar el tiempo máximo que queremos que el hilo desde el que se llama esté bloqueado esperando a que finalicen todas las tareas. Si todas las tareas no han finalizado su ejecución antes de dicho tiempo máximo, cuando éste se alcance todas las tareas de la lista que estén en ejecución o pendientes de ejecución serán canceladas, marcando como interrumpidos los correspondientes hilos de threadpool del ejecutor.

Proyecto InvokeAll

En este proyecto desarrollaremos una aplicación similar a la del proyecto anterior, en la que se calcula el factorial de 10 números distintos. Crearemos un tarea Callable que calcule el factorial de un número y un ejecutor con un threadpool fijo de 2 hilos que lance las diez tareas y obtenga el resultado de cada una de ellas, usando objetos Future, y lo almacene en una lista, para finalmente mostrar todos los resultados.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

class Main {

    public static void main(String[] args) {
        ThreadPoolExecutor fixedThreadPool =
                (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
        List<Task> tasks = new ArrayList<>();
        int[] numbers = new int[10];
        for (int i = 0; i < 10; i++) {
            numbers[i] = ThreadLocalRandom.current().nextInt(15) - 5;
            tasks.add(new Task(numbers[i]));
        }
        try {
            List<Future<Integer>> futureList = 
                fixedThreadPool.invokeAll(tasks, 2, TimeUnit.SECONDS);
            System.out.print("Results:\n");
            for (int i = 0; i < futureList.size(); i++) {
                // This get returns inmediately.
                Future<Integer> future = futureList.get(i);
                try {
                    Integer factorial = future.get();
                    System.out.printf("Task %d -> factorial(%d) = %d\n", i + 1,
                            numbers[i], factorial);
                } catch (ExecutionException e) {
                    System.out.printf("Task %d -> factorial(%d) threw an exception\n",
                            i + 1, numbers[i]);
                } catch (CancellationException e) {
                    System.out.printf("Task %d -> factorial(%d) was cancelled\n",
                            i + 1, numbers[i]);
                }
            }
        } catch (InterruptedException ignored) {
        } finally {
            fixedThreadPool.shutdown();
        }
    }

}
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

class Task implements Callable<Integer> {

    private final int number;

    Task(int number) {
        this.number = number;
    }

    @Override
    public Integer call() throws InterruptedException {
        return factorial(number);
    }

    private Integer factorial(int number) throws InterruptedException {
        if (number < 0) throw new IllegalArgumentException("Number can't be negative");
        int factorial = 1;
        for (int i = 2; i <= number; i++) {
            factorial *= i;
            TimeUnit.MILLISECONDS.sleep(200);
        }
        return factorial;
    }

}

Si ejecutamos el programa veremos cómo se envían al ejecutor todas la tareas de cálculo de una vez y el hilo principal queda bloqueado hasta que todas hayan terminado.