6 FixedThreadPool¶
FixedThreadPool¶
Como vimos en el apartado anterior, si usamos un ThreadPoolExecutor
creado mediante el método newCachedThreadPool()
de la clase Executors
corremos el riesgo de sobrecargar el sistema, especialmente si el número de tareas es alto y de larga duración.
Para evitar este problema, la clase Executors
proporciona el método estático factoría Executors.newFixedThreadPool(numberOfThreads)
para crear un ejecutor que use como máximo el número fijo de hilos que recibe como argumento.
Este método está sobrecargado Executors.newFixedThreadPool(numberOfThreads, threadFactory)
para recibir también un objeto que implemente la interfaz ThreadFactory
, cuyo método newThread(runnable)
será llamado cuando sea necesario crear un hilo para el threadpool del ejecutor.
Veamos la definición interna del método factoría para comprender su funcionamiento:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Por defecto, los hilos del thread pool se van creando bajo demanda conforme son necesarios, hasta alcanzar el número de hilos indicado, pero una vez alcanzado dicho máximo dichos hilos no podrán ser eliminados.
Además, cuando el ejecutor recibe una tarea y todos los hilos de su threadpool están ocupados y dicho número corresponde al máximo permitido, el ejecutor no creará nuevos hilos, sino que la tarea será almacenada en en la LinkedBlockingQueue proporcionada, esperando a que haya algún hilo del threadpool disponible para ejecutarla.
Si queremos que en el threadpool se cree directamente el número de hilos con el que se ha configurado el ejecutor, podemos llamar a su método preStartAllCoreThreads()
, que creará e iniciará dichos hilos, que quedarán en espera de tener alguna tarea que ejecutar. También tenemos el método preStartCoreThreads()
, que crea e inicia un hilo del thead pool, quedando en espera de alguna tarea.
Gracias a estos métodos sobrescribimos la política por defecto de crear e iniciar los hilos solo cuando llega una nueva tarea y no hay ningún hilo disponible.
Cuando el ejecutor recibe una tarea y todos los hilos de su threadpool están ocupados y dicho número corresponde al máximo permitido, el ejecutor no creará nuevos hilos, sino que, por defecto, la tarea será almacenada en la LinkedBlockingQueue proporcionada, esperando a que haya algún hilo del threadpool disponible para ejecutarla.
El número recomendado de hilos depende normalmente del número de núcleos de procesamiento disponibles y del tipo de tareas que se envían (intensivas de CPU o intensivas de E/S). Podemos obtener el número de núcleos de procesamiento disponibles en la máquina en la que no estamos ejecutando mediante Runtime.getRuntime().availableProcessors()
.
Como norma general, si se van a enviar al ejecutor tareas intensivas de CPU (normalmente de corta duración), se recomienda usar un tamaño de threadpool igual al número de núcleos de procesamiento + 1 (para prevenir potenciales pausas). Sin embargo, si se van a enviar al ejecutor tareas intensivas de entrada y salida, como por ejemplo de acceso a Internet o a una base de datos, se recomienda usar un tamaño de threadpool mayor, dado que los hilos serán bloqueados en entrada / salida, permitiendo así a más tareas ejecutarse.
Un caso extremo de este tipo de ejecutor es el que se obtiene llamando al método estático factoría Executors.newSingleThreadExecutor()
, que crea un ejecutor cuyo threadpool sólo podrá contener como máximo un solo hilo, por lo que, en la práctica, las tareas son ejecutadas en dicho ejecutor de una en una de forma secuencial. El método también está sobrecargado Executor.newSingleThreadExecutor(threadFactory)
para recibir la factoría a usar para crear el hilo.
La definición interna de este método factoría es (más o menos) la siguiente:
public static ExecutorService newSingleThreadExecutor() {
return new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Políticas de saturación¶
Llamamos saturación, cuando llega una tarea al ejecutor y todos los hilos de su thread pool están ocupados ejecutando alguna tarea. La política por defecto corresponderá a que la tarea esperará en una cola hasta que pueda ser ejecutada por alguno de los hilos del ejecutor.
Sin embargo, podemos configurar nuestro ejecutor para que implemente otras polícitas, usando el método setRejectedExecutionHandler(handler)
.
Así, tenemos predefinidos una serie de handlers que implementan distintas políticas de rechazo:
ThreadPoolExecutor.AbortPolicy
: Se lanza una excepciónRejectedExecutionException
.ThreadPoolExecutor.CallerRunsPolicy
: La tarea se ejecuta en el hilo que envió la tarea al ejecutor. De esta manera se implementa un mecanismo de control natural que hace que se ralentice la frecuencia de envío de tareas al ejecutor por parte del hilo emisor.ThreadPoolExecutor.DiscardPolicy
: Se descarta la tarea, que no es ejecutada.ThreadPoolExecutor.DiscardOldestPolicy
: Si el ejecutor no está terminado, la tarea más antigua en la cola de espera es descartada y no llegará a ejecutarse, y se trata de ejecutar de nuevo la tarea enviada.
Proyecto FixedThreadPool¶
En este proyecto simularemos el funcionamiento de un servidor web que procesa peticiones de varios clientes usando internamente un fixed thread pool executor de como máximo 5 hilos.
class Main {
public static void main(String[] args) {
Server server = new Server();
for (int i = 0; i < 50; i++) {
Task task = new Task("Task " + i);
server.execute(task);
try {
// The less time you sleep the greater the thread pool size gets.
// Try to reduce the time sleeping and see what happens to thread pool size.
Thread.sleep(100);
} catch (InterruptedException e) {
return;
}
}
try {
server.shutdown();
// Try shutdownNow instead and see what happens.
// server.shutdownNow();
} catch (InterruptedException exception) {
return;
}
Task task = new Task("Task sent after shutdown");
server.execute(task);
}
}
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class Server {
private final ThreadPoolExecutor fixedThreadPool =
(ThreadPoolExecutor) Executors.newFixedThreadPool(5);
void execute(Task task) {
try {
fixedThreadPool.execute(task);
System.out.printf("Server -> Thread pool size: %d\n", fixedThreadPool.getPoolSize());
System.out.printf("Server -> Active threads count: %d\n", fixedThreadPool.getActiveCount());
} catch (RejectedExecutionException e) {
System.out.printf("Server -> Task rejected: %s\n", task.getName());
}
}
void shutdown() throws InterruptedException {
fixedThreadPool.shutdown();
if (fixedThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.printf("Server -> Terminated. Completed: %d\n",
fixedThreadPool.getCompletedTaskCount());
} else {
System.out.printf("Server -> Await termination timeout. Completed: %d\n",
fixedThreadPool.getCompletedTaskCount());
}
}
@SuppressWarnings("unused")
void shutdownNow() throws InterruptedException {
fixedThreadPool.shutdownNow();
if (fixedThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.printf("Server -> Terminated. Completed: %d\n",
fixedThreadPool.getCompletedTaskCount());
} else {
System.out.printf("Server -> Await termination timeout. Completed: %d\n",
fixedThreadPool.getCompletedTaskCount());
}
}
}
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
class Task implements Runnable {
private final String name;
private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
Task(String name) {
this.name = name;
}
String getName() {
return name;
}
@Override
public void run() {
System.out.printf("%s -> %s -> Started at: %s\n",
Thread.currentThread().getName(), name, dateTimeFormatter.format(LocalDateTime.now()));
try {
work();
} catch (InterruptedException e) {
System.out.printf("%s -> %s -> Interrupted at: %s\n",
Thread.currentThread().getName(), name, dateTimeFormatter.format(LocalDateTime.now()));
return;
}
System.out.printf("%s -> %s -> Finished at: %s\n",
Thread.currentThread().getName(), name, dateTimeFormatter.format(LocalDateTime.now()));
}
private void work() throws InterruptedException {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
}
}
Si ejecutamos el programa veremos que el servidor no puede procesar más de 5 peticiones a la vez.