Skip to content

5 CachedThreadPool

CachedThreadPool

Uno de los tipos de ThreadPoolExecutor que podemos crear es lo que se conoce como cached thread pool (grupo de hilos cacheado), que se caracteriza porque cuando recibe una nueva tarea que ejecutar, trata de ejecutarla en algún hilo del thread pool que esté "libre", y si todos los hilos del threadpool están ocupados ejecutando otras tareas, el ejecutor crea un nuevo hilo que es añadido al thread pool y ejecuta en él la tarea recibida.

Para crear un cached thread pool executor usaremos el método estático factoría Executors.newCachedThreadPool(), que retorna un objeto de la interfaz ExecutorService, al que haremos un cast a ThreadPoolExecutor.

ThreadPoolExecutor cachedThreadPoolExecutor = 
    (ThreadPoolExecutor) Executors.newCachedThreadPool();

Este método factoría está sobrecargado Executors.newCachedThreadPool(threadFactory) para recibir como argumento un objeto de la interfaz ThreadFactory (factoría de hilos) que será usado para construir los hilos del threadpool cuando sea necesario, permitiéndonos así personalizar dicha construcción.

Este tipo de ejecutor tiene el problema de que si enviamos muchas tareas a la vez o de manera muy seguida podremos sobrecargar el sistema, dado que el threadpool irá creciendo con cada nueva tarea recibida al estar todos sus hilos ocupados con otras tareas.

Por otra parte, tenemos la ventaja de que las tareas siempre serán ejecutadas inmediatamente en cuanto hayan sido recibidas por el ejecutor, tanto si finalmente son ejecutadas en un hilo ya existente en el threadpool como si se hace crecer éste creando y añadiendo un nuevo hilo para ejecutar la tarea.

Por tanto, cuanto más espaciadas en el tiempo lleguen en el tiempo y menos tiempo de ejecución requieran, mejor será el rendimiento de este tipo de ejecutor.

Veamos la definición interna del método Executors.newCachedThreadPool(), lo que nos servirá para entender correctamente su funcionamiento:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Como vemos, no es obligatorio mantener un número mínimo de hilos, no hay un número máximo de hilos, los hilos serán destruidos automáticamente tras 60 segundos de inactividad y la cola de trabajo corresponde a una SynchronousQueue<Runnable>, que se caracteriza porque la inserción en la cola de trabajo espera directamente a su extracción, dado que en realidad nunca habrá más de una tarea en la cola puesto que se extraerá inmediatamente de ella en cuanto llegue, creando un hilo adicional si todos los hilos actuales están ocupados.

Proyecto CachedThreadPool

En este proyecto simularemos el funcionamiento de un servidor web que procesa peticiones de varios clientes. Internamente el servidor hará uso de un cached thread pool executor para ejecutar las tareas que le vayan llegando.

class Main {

    public static void main(String[] args) {
        Server server = new Server();
        for (int i = 0; i < 50; i++) {
            Task task = new Task("Task " + i);
            server.execute(task);
            try {
                // The less time you sleep the greater the thread pool size gets.
                // Try to reduce the time sleeping and see what happens to thread pool size.
                Thread.sleep(100);
            } catch (InterruptedException e) {
                return;
            }
        }
        try {
            server.shutdown();
            // Try shutdownNow instead and see what happens.
            // server.shutdownNow();
        } catch (InterruptedException e) {
            return;
        }
        Task task = new Task("Task sent after shutdown");
        server.execute(task);
    }

}
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class Server {

    private final ThreadPoolExecutor cachedThreadPool =
            (ThreadPoolExecutor) Executors.newCachedThreadPool();

    void execute(Task task) {
        try {
            cachedThreadPool.execute(task);
            System.out.printf("Server -> Thread pool size: %d\n", cachedThreadPool.getPoolSize());
            System.out.printf("Server -> Active threads count: %d\n", cachedThreadPool.getActiveCount());
        } catch (Exception e) {
            System.out.printf("Server -> Task rejected: %s\n", task.getName());
        }
    }

    void shutdown() throws InterruptedException {
        cachedThreadPool.shutdown();
        if (cachedThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
            System.out.printf("Server -> Terminated. Completed: %d\n",
                    cachedThreadPool.getCompletedTaskCount());
        } else {
            System.out.printf("Server -> Await termination timeout. Completed: %d\n",
                    cachedThreadPool.getCompletedTaskCount());
        }
    }

    @SuppressWarnings("unused")
    void shutdownNow() throws InterruptedException {
        cachedThreadPool.shutdownNow();
        if (cachedThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
            System.out.printf("Server -> Terminated. Completed: %d\n",
                    cachedThreadPool.getCompletedTaskCount());
        } else {
            System.out.printf("Server -> Await termination timeout. Completed: %d\n",
                    cachedThreadPool.getCompletedTaskCount());
        }
    }

}
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

class Task implements Runnable {

    private final String name;
    private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");

    Task(String name) {
        this.name = name;
    }

    String getName() {
        return name;
    }

    @Override
    public void run() {
        System.out.printf("%s -> %s -> Started at: %s\n",
                Thread.currentThread().getName(), name, dateTimeFormatter.format(LocalDateTime.now()));
        try {
            work();
        } catch (InterruptedException e) {
            System.out.printf("%s -> %s -> Interrupted at: %s\n",
                    Thread.currentThread().getName(), name, dateTimeFormatter.format(LocalDateTime.now()));
            return;
        }
        System.out.printf("%s -> %s -> Finished at: %s\n",
                Thread.currentThread().getName(), name, dateTimeFormatter.format(LocalDateTime.now()));
    }

    private void work() throws InterruptedException {
        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
    }

}

Si ejecutamos la aplicación veremos que el servidor hace uso de un cached thread pool executor para ejecutar las tareas que le vayan llegando.