18 ExecutorCompletionService¶
ExecutorCompletionService¶
Normalmente, cuando ejecutamos tareas concurrentes mediante un ejecutor, le enviamos la tarea Runnable
o Callable
mediante alguno de sus métodos, que nos retorna un objeto Future
, con el que podremos, más adelante, obtener el resultado retornado por la tarea. Pero tanto el envío como el procesado del resultado se realiza en el mismo hilo (en nuestros proyectos anteriores en el hilo principal correspondiente a la clase Main
). Además, la obtención del valor mediante el método get()
es bloqueante, por lo que se suele decir que se trata de un modelo de "procesamiento futuro síncrono" (synchronous future processing).
Sin embargo, en algunas ocasiones es necesario independizar el envío de tareas al ejecutor en un hilo y el procesamiento de los resultados en otro hilo distinto o simplemente nos gustaría realizar un procesamiento asíncrono del resultado (en contraste con el procesamiento futuro asíncrono descrito en el párrafo anterior).
Para ello, la API de Concurrencia de Java proporciona un mecanismo, conocido como CompletionService
, que nos permite crear un objeto que actúa como servicio intermedio de almacenaje de los objetos Future
correspondientes a tareas Callable
que han completado su ejecución. De esta manera el procesamiento de los resultados puede llevarse a cabo asíncronamente en hilos diferentes a los que enviaron las tareas.
La clase principal de este mecanismo es ExecutorCompletionService
, que usa internamente un objeto ejecutor que implemente la interfaz ExecutorService
y que deberemos suministrarlo en la construcción. Además, la clase ExecutorCompletionService
implementa la interfaz CompletionService
, que establece los métodos para enviar tareas al servicio y para extraer objetos Future
del mismo.
De esta manera, el objeto ExecutorCompletionService
actúa como puente entre una serie de hilos productores que le envían tareas a través de su método submit()
, y otra serie de hilos consumidores que extraen los objetos Future
resultantes de la ejecución de las tareas, por ejemplo a través de su método poll()
para procesar resultados.
Este mecanismo tiene la limitación de que los hilos consumidores de resultados tan solo puede obtener los objetos Future
de aquellas tareas que hayan finalizado su ejecución, por lo que únicamente pueden ser usados para obtener el resultado de las tareas y no para otras operaciones de control, como por ejemplo la cancelación de las tareas. De hecho, cuando un hilo consumidor extraiga un Future
de la cola del ExecutorCompletionService
dicho Future
forzosamente habrá sido completado (isDone()
retornará true
), por lo que si se llama al método get()
sobre dicho Future
nunca quedará bloqueado.
La interfaz CompletionService
proporciona los siguientes métodos para enviar tareas:
submit(callable)
: Envía unCallable
al ejecutor para que sea ejecutado. El resultado de la tarea será encolado en el servicio.submit(runnable, returnValue)
: Envía unRunnable
al ejecutor. Una vez concluida su ejecución elreturnValue
indicado se establece como valor delFuture
que es encolado en el servicio.
¡OJO!
Aunque estos métodos retornan un Future
, no debemos usarlo, ya que todo el propósito de usar un CompletionService
es que los hilos productores no actúen como consumidores de los resultados.
Por otra parte, la interfaz CompletionService
proporciona los siguientes métodos para obtener los objetos Future
correspondiente al resultado de la ejecución de las tareas:
Future<V> poll()
: Extrae de la cola interna del servicio el objetoFuture
correspondiente al resultado de la siguiente tarea completada onull
si no hay ninguno.Future<V> poll(long timeout, TimeUnit unit)
: Extrae de la cola interna el objetoFuture
correspondiente al resultado de la siguiente tarea completada, esperando si es necesario el tiempo máximo pasado como argumento. Lanza la excepciónInterruptedException
si el hilo es interrumpido mientras esperaba.Future<V> take()
: Extrae de la cola interna el objetoFuture
correspondiente al resultado de la siguiente tarea completada, bloqueando el hilo hasta que la cola tenga algún elemento. Lanza la excepciónInterruptedException
si el hilo es interrumpido mientras esperaba.
Proyecto ExecutorCompletionService¶
En este proyecto vamos a desarrollar una aplicación en la que varios hilos solicitantes solicitan el cálculo de factoriales a un ExecutorCompletionService
. Por otro lado, un hilo consumidor de resultados va obteniendo del ExecutorCompletionService
los resultados uno a uno y los muestra por pantalla.
La tarea de cálculo que se envía está representada por la clase Task
, que implementa la interfaz Callable<FactorialCalculation>
. El resultado del cálculo está representado por la clase FactorialCalculation
, que contiene información sobre quién la solicitó, cuándo, con qué valor, y el resultado obtenido o la excepción lanzada durante el cálculo.
Para representar el hecho de que haya un resultado o se haya producido una excepción definimos una clase ResultOrThrowable<T>
.
import java.util.concurrent.*;
class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
CompletionService<FactorialCalculation> completionService
= new ExecutorCompletionService<>(executor);
Thread taskProducer1 = new Thread(new TaskProducer("Task producer 1", completionService));
Thread taskProducer2 = new Thread(new TaskProducer("Task producer 2", completionService));
Thread taskConsumer = new Thread(new TaskConsumer(completionService));
taskProducer1.start();
taskProducer2.start();
taskConsumer.start();
try {
TimeUnit.SECONDS.sleep(15);
taskConsumer.interrupt();
taskConsumer.join();
} catch (InterruptedException ignored) {
} finally {
executor.shutdownNow();
}
}
}
class ResultOrThrowable<T> {
private final T result;
private final Throwable throwable;
private ResultOrThrowable(T result, Throwable throwable) {
this.result = result;
this.throwable = throwable;
}
T getResult() {
return result;
}
Throwable getThrowable() {
return throwable;
}
static <T> ResultOrThrowable<T> newResult(T result) {
return new ResultOrThrowable<>(result, null);
}
static <T> ResultOrThrowable<T> newThrowable(Throwable throwable) {
return new ResultOrThrowable<>(null, throwable);
}
}
class FactorialCalculation {
private final String requester;
private final int value;
private final String when;
private final ResultOrThrowable<Integer> resultOrThrowable;
FactorialCalculation(String requester, int value, String when,
ResultOrThrowable<Integer> resultOrException) {
this.requester = requester;
this.value = value;
this.when = when;
this.resultOrThrowable = resultOrException;
}
String getRequester() {
return requester;
}
int getValue() {
return value;
}
String getWhen() {
return when;
}
Integer getResult() {
return resultOrThrowable.getResult();
}
Throwable getThrowable() {
return resultOrThrowable.getThrowable();
}
}
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
class Task implements Callable<FactorialCalculation> {
private final String requester;
private final int value;
private final String when;
Task(String requester, int value, String when) {
this.requester = requester;
this.value = value;
this.when = when;
}
@Override
public FactorialCalculation call() {
Integer factorial;
ResultOrThrowable<Integer> resultOrThrowable;
try {
factorial = factorial(value);
resultOrThrowable = ResultOrThrowable.newResult(factorial);
} catch (Exception e) {
resultOrThrowable = ResultOrThrowable.newThrowable(e);
}
return new FactorialCalculation(requester, value, when, resultOrThrowable);
}
private Integer factorial(int number) throws InterruptedException {
if (number < 0) throw new IllegalArgumentException("Number can't be negative");
int factorial = 1;
for (int i = 2; i <= number; i++) {
factorial *= i;
}
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5) + 1);
return factorial;
}
}
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
class TaskProducer implements Runnable {
private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("HH:mm:ss");
private final String name;
private final CompletionService<FactorialCalculation> completionService;
TaskProducer(String name, CompletionService<FactorialCalculation> completionService) {
this.name = name;
this.completionService = completionService;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
int value = ThreadLocalRandom.current().nextInt(15) - 5;
String when = dateTimeFormatter.format(LocalTime.now());
System.out.printf("%s -> %s requests factorial(%d) at %s\n",
Thread.currentThread().getName(), name, value, when);
completionService.submit(new Task(name, value, when));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.printf("%s -> %s interrupted\n",
Thread.currentThread().getName(), name);
return;
}
}
System.out.printf("%s -> %s finished\n",
Thread.currentThread().getName(), name);
}
}
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
class TaskConsumer implements Runnable {
private final CompletionService<FactorialCalculation> completionService;
TaskConsumer(CompletionService<FactorialCalculation> completionService) {
this.completionService = completionService;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
Future<FactorialCalculation> taskResultFuture;
try {
taskResultFuture = completionService.take();
if (taskResultFuture != null) {
FactorialCalculation factorialCalculation = taskResultFuture.get();
if (factorialCalculation.getThrowable() == null) {
showResult(factorialCalculation);
} else {
showError(factorialCalculation);
}
}
} catch (InterruptedException e) {
System.out.printf("%s -> Task consumer interrupted\n",
Thread.currentThread().getName());
return;
} catch (ExecutionException e) {
System.out.printf("%s -> Task consumer calculation poll - Error calculating factorial\n",
Thread.currentThread().getName());
}
}
System.out.printf("%s -> Task consumer finished\n",
Thread.currentThread().getName());
}
private void showError(FactorialCalculation factorialCalculation) {
System.out.printf("%s -> Task consumer calculation poll - factorial(%d) threw %s, requested by %s at %s\n",
Thread.currentThread().getName(), factorialCalculation.getValue(),
factorialCalculation.getThrowable().getClass().getSimpleName(), factorialCalculation.getRequester(),
factorialCalculation.getWhen());
}
private void showResult(FactorialCalculation factorialCalculation) {
System.out.printf("%s -> Task consumer calculation poll - factorial(%d) = %d requested by %s at %s\n",
Thread.currentThread().getName(), factorialCalculation.getValue(),
factorialCalculation.getResult(), factorialCalculation.getRequester(),
factorialCalculation.getWhen());
}
}