Skip to content

11 InvokeAny()

invokeAny()

Una de las estrategias de resolución de problemas con programación concurrente consiste en ejecutar simultáneamente varias tareas que resuelven el mismo problema y quedarnos con el resultado de la primera tarea que lo resuelva. Por ejemplo, supongamos que queremos ordenar un array, por lo que creamos una tarea por cada algoritmo de ordenación que hayamos implementado. Podemos lanzar todas estas tareas y quedarnos con el resultado del algoritmo que tarde menos en ordenar el array.

Para llevar a cabo esta estrategia deberemos agrupar los objetos Callable correspondientes a las distintas tareas en una lista de tareas y enviar dicha lista al ejecutor mediante el método invokeAny(callableList).

El método invokeAny(callableList) de la clase ThreadPoolExecutor recibe una lista de tareas, inicia su ejecución y retorna el resultado de la primera tarea que finalice sin lanzar una excepción. En cuanto una tarea finaliza, el resto de tareas son marcadas para cancelación automáticamente, marcando como interrumpidos los hilos del threadpool del ejecutor de aquellas que se estén ejecutando.

El hilo que llama a invokeAny(callableList) es bloqueado hasta que la primera tarea finalice sin lanzar una excepción o a que todas finalicen con una excepción.

Si todas las tareas enviadas al ejecutor mediante el método invokeAny() lanzaran una excepción, el propio método generará la excepción ExecutionException y el resultado del método sería indefinido.

No podemos olvidar que el método invokeAny(callableList) no retorna un Future, sino directamente un valor del tipo V correspondiente al tipo retornado por los callables de la lista enviada al ejecutar a través de dicho método.

La clase ThreadPoolExecutor proporciona una versión sobrecargada del método invokeAny(callableList, time, timeUnit), para que podamos especificar el tiempo máximo que queremos que el hilo llamador esté bloqueado esperando a que finalice alguna de las tareas sin lanzar una excepción. Si transcurrido dicho tiempo ninguna tarea ha finalizado satisfactoriamente, pero aún quedan tareas en ejecución o por ejecutarse, se lanzará las excepción TimeoutException.

Proyecto InvokeAny

En este proyecto vamos a desarrollar una aplicación para la autenticación de un usuario en el sistema. La comprobación se llevará a cabo mediante dos métodos: comprobación sobre una base de datos local y comprobación sobre una base de datos remota. En realidad, en cuanto una de las dos autenticaciones tengan éxito y nos indique que el usuario es un usuario registrado en el sistema, el resultado de la otra comprobación no nos será de interés, y de hecho será cancelada. Finalmente mostraremos por pantalla el método que ha autenticado al usuario, si es que alguno lo ha hecho. Si ninguno lo ha hecho, se mostrará un mensaje de que el usuario no se ha autenticado.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

class Main {

    public static void main(String[] args) {
        String username = "username";
        String password = "password";
        RemoteDatabase remoteDatabase = new RemoteDatabase();
        LocalDatabase localDatabase = new LocalDatabase();
        AuthenticationTask remoteDatabaseTask = 
            new AuthenticationTask(remoteDatabase, username, password);
        AuthenticationTask localDatabaseTask = 
            new AuthenticationTask(localDatabase, username, password);
        List<AuthenticationTask> tasks = new ArrayList<>();
        tasks.add(remoteDatabaseTask);
        tasks.add(localDatabaseTask);
        ThreadPoolExecutor fixedThreadPoolExecutor =
                (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
        try {
            AuthenticationSystem authenticationSystem = 
                fixedThreadPoolExecutor.invokeAny(tasks);
            System.out.printf("User authenticated by %s\n", 
                            authenticationSystem.getName());
        } catch (InterruptedException ignored) {
        } catch (ExecutionException e) {
            System.out.print("User not authenticated\n");
        } finally {
            fixedThreadPoolExecutor.shutdown();
        }
    }

}
public interface AuthenticationSystem {
    String getName();
    @SuppressWarnings("unused")
    boolean authenticate(String username, String password) throws InterruptedException;
}
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class LocalDatabase implements AuthenticationSystem {

    @Override
    public String getName() {
        return "Local database";
    }

    @Override
    public boolean authenticate(String username, String password) 
        throws InterruptedException {
        int searchDuration = ThreadLocalRandom.current().nextInt(5) + 1;
        System.out.print("Local database -> Authenticating...\n");
        search(searchDuration);
        boolean authenticated = ThreadLocalRandom.current().nextBoolean();
        if (authenticated) {
            System.out.printf("Local database -> Authenticated in %d seconds\n", 
                            searchDuration);
        } else {
            System.out.printf("Local database -> Not authenticated in %d seconds\n", 
                            searchDuration);
        }
        return authenticated;
    }

    private void search(int searchDuration) throws InterruptedException {
        try {
            TimeUnit.SECONDS.sleep(searchDuration);
        } catch (InterruptedException e) {
            System.out.print("Local database -> Authentication cancelled\n");
            throw new InterruptedException();
        }
    }

}
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class RemoteDatabase implements AuthenticationSystem {

    @Override
    public String getName() {
        return "Remote database";
    }

    @Override
    public boolean authenticate(String username, String password) 
        throws InterruptedException {
        int searchDuration = ThreadLocalRandom.current().nextInt(5) + 1;
        System.out.print("Remote database -> Authenticating...\n");
        search(searchDuration);
        boolean authenticated = ThreadLocalRandom.current().nextBoolean();
        if (authenticated) {
            System.out.printf("Remote database -> Authenticated in %d seconds\n", 
                            searchDuration);
        } else {
            System.out.printf("Remote database -> Not authenticated in %d seconds\n", 
                            searchDuration);
        }
        return authenticated;
    }

    private void search(int searchDuration) throws InterruptedException {
        try {
            TimeUnit.SECONDS.sleep(searchDuration);
        } catch (InterruptedException e) {
            System.out.print("Remote database -> Authentication cancelled\n");
            throw new InterruptedException();
        }
    }

}
import java.util.concurrent.Callable;

class AuthenticationTask implements Callable<AuthenticationSystem> {

    private final AuthenticationSystem authenticationSystem;
    private final String username;
    private final String password;

    AuthenticationTask(AuthenticationSystem authenticationSystem, String username, 
                    String password) {
        this.authenticationSystem = authenticationSystem;
        this.username = username;
        this.password = password;
    }

    @Override
    public AuthenticationSystem call() throws InterruptedException {
        boolean authenticated = authenticationSystem.authenticate(username, password);
        if (!authenticated) {
            throw new RuntimeException("Authentication failed");
        }
        return authenticationSystem;
    }

}