Skip to content

7 Concurrent-aware collections que usan cerrojos

ArrayBlockingQueue y LinkedBlockingQueue

La interfaz BlockingQueue define una estructura de datos en forma de cola FIFO proporcionando operaciones optimizadas concurrent-aware.

BlockingQueue proporciona cuatro grupos de métodos, dependiendo de lo que queremos que ocurra si no se puede realizar la operación solicitada inmediatamente porque otro hilo está haciendo otra operación que lo imposibilita, pero puede realizarse más adelante. Así, tendremos métodos que lanzarán una excepción, otros que retornan un valor especial (null o false), otros que bloquean al hilo llamador indefinidamente y otros que bloquean al hilo llamador una cantidad máxima de tiempo antes de darse por vencido.

En la siguiente tabla se muestran las operaciones, los grupos y los métodos:

Lanzan un excepción Retornan valor especial Bloquean Bloquean con limite tiempo
Inserción add(e) offer(e) put(e) offer(e, time, unit)
Eliminación remove() poll() take() poll(time, unit)
Consulta element() peek() - -

Las estructuras de datos BlockingQueue no acepta el valor null como elemento, dado que dicho valor es usado como valor centinela para indicar fallos en las operaciones poll(). De hecho, si pasamos el valor null a los métodos add(), put() o offer() se lanzará la excepción NullPointerException.

Se puede definir un límite de capacidad a una BlockingQueue, de manera que si una vez alcanzada la capacidad máxima se intenta agregar un elemento mediante put(), el hilo llamador quede bloqueado, en espera de que haya espacio disponible.

Por su parte, si la cola está vacía y un hilo llama al método take() el hilo quedará bloqueada hasta que haya algún elemento en la cola.

La interfaz BlockingQueue es implementada por diferentes clases que usan distintos tipos de estructuras para almacenar los datos:

  • ArrayBlockingQueue: Una blocking queue FIFO, limitada en capacidad, y en la que los elementos se almacenan en un array. Una vez creada, no se puede modificar su capacidad. Corresponde al típico buffer de capacidad fija. Podemos activar un modo justo (fair mode) para la espera de los hilos bloqueados para la inserción con put() o la extracción con take().
  • LinkedBlockingQueue: Una blocking queue FIFO, que opcionalmente podemos limitar en capacidad, y en la que los elementos se almacenan como nodos enlazados. Los nodos son creados dinámicamente cuando se insertan los elementos, siempre y cuando no se haya llegado a la capacidad máxima, si ésta ha sido especificada.
  • PriorityBlockingQueue: Una blocking queue, no limitada en capacidad, en la que los elementos son ordenados atendiendo a un determinado criterio o prioridad. Los elementos deben implementar la interfaz Comparable para poder llevar a cabo la ordenación.
  • DelayQueue: Una blocking queue, no limitada en capacidad, en la que cada elemento tiene asociado un determinado delay o tiempo de expiración, antes del cual no pueden ser extraídos de la cola. Así, la cola está ordenada por tiempo de expiración.
  • SynchronousQueue: Una blocking queue en la que cada operación de inserción debe esperar a su correspondiente operación de extracción, y viceversa. En realidad la cola no almacena ningún elemento y por tanto no tiene ninguna capacidad, ya que simplemente actúa como intermediario de entrega de datos. Por tanto no se puede consultar elementos en la cola, ni iterar sobre ellos, y sólo pueden insertarse si otro hilo va a extraerlo.

Proyecto ArrayBlockingQueue

Este proyecto es similar al Proyecto WaitNotify pero haciendo uso de una estructura de datos ArrayBlockingQueue, en vez de usar cerrojos intrínsecos.

public class Main {

    public static void main(String[] args) {
        Bakery bakery = new Bakery();
        Thread doughnutProducerThread = new Thread(new DoughnutProducer(bakery), "Doughnut producer");
        Thread doughnutConsumerThread = new Thread(new DoughnutConsumer(bakery), "Doughnut consumer");
        doughnutProducerThread.start();
        doughnutConsumerThread.start();
    }

}
import java.util.concurrent.ArrayBlockingQueue;

public class Bakery {

    private static final int TRAY_CAPACITY = 10;

    private final ArrayBlockingQueue<Integer> tray = new ArrayBlockingQueue<>(TRAY_CAPACITY);

    public void addToTray(Integer doughnut) throws InterruptedException {
        tray.put(doughnut);
        System.out.printf("Producer puts doughnut #%d on the tray\n", doughnut);
    }

    public Integer extractFromTray() throws InterruptedException {
        Integer doughnut = tray.take();
        System.out.printf("Consumer extracts doughnut #%d from tray\n", doughnut);
        return doughnut;
    }

}
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class DoughnutConsumer implements Runnable {

    private final Bakery bakery;

    public DoughnutConsumer(Bakery bakery) {
        Objects.requireNonNull(bakery);
        this.bakery = bakery;
    }

    @Override
    public void run() {
        Integer doughnut;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                doughnut = bakery.extractFromTray();
            } catch (InterruptedException e) {
                System.out.println("Consumer has been interrupted while extracting from tray");
                return;
            }
            try {
                eat(doughnut);
            } catch (InterruptedException e) {
                System.out.println("Consumer has been interrupted while eating");
                return;
            }
        }
        System.out.println("Consumer has been interrupted");
    }

    private void eat(int doughnut) throws InterruptedException {
        System.out.printf("Consumer is eating doughnut #%d\n", doughnut);
        TimeUnit.SECONDS.sleep(30);
    }

}
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class DoughnutProducer implements Runnable {

    private final Bakery bakery;
    private int doughnutNumber;

    public DoughnutProducer(Bakery bakery) {
        Objects.requireNonNull(bakery);
        this.bakery = bakery;
    }

    @Override
    public void run() {
        int doughnut;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                doughnut = makeDoughnut();
            } catch (InterruptedException e) {
                System.out.println("Producer has been interrupted while making a doughnut");
                return;
            }
            try {
                bakery.addToTray(doughnut);
            } catch (InterruptedException e) {
                System.out.println("Producer has been interrupted while adding a doughnut to the tray");
                return;
            }
        }
        System.out.println("Producer has been interrupted");
    }

    private int makeDoughnut() throws InterruptedException {
        int doughnut = ++doughnutNumber;
        System.out.printf("Producer is making doughnut #%d\n", doughnut);
        TimeUnit.SECONDS.sleep(1);
        return doughnut;
    }

}

Proyecto DelayedQueue

Este proyecto es parecido al Proyecto ArrayBlockingQueue pero haciendo que no sea posible extraer un donut de la bandeja si éste no lleva en ella al menos tres segundos (para dar tiempo a que el glaseado se condense adecuadamente). Por tanto, el consumidor será bloqueado si no hay en la bandeja ningún donut que cumpla dicha característica. Para proporcionar dicha funcionalidad usaremos una DelayedQueue.

public class Main {

    public static void main(String[] args) {
        Bakery bakery = new Bakery();
        Thread doughnutProducerThread = new Thread(new DoughnutProducer(bakery), "Doughnut producer");
        Thread doughnutConsumerThread = new Thread(new DoughnutConsumer(bakery), "Doughnut consumer");
        doughnutProducerThread.start();
        doughnutConsumerThread.start();
    }

}
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Doughnut implements Delayed {

    private static final long DONUT_GLAZE_DELAY_MILLIS = 3000;

    private final int number;
    private final long startTime;

    public Doughnut(int number) {
        this.number = number;
        this.startTime = System.currentTimeMillis() + DONUT_GLAZE_DELAY_MILLIS;
    }

    public int getNumber() {
        return number;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.startTime - ((Doughnut) o).startTime);
    }

}
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;

public class Bakery {

    final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
    private final DelayQueue<Doughnut> tray = new DelayQueue<>();

    public void addToTray(Doughnut doughnut) {
        tray.put(doughnut);
        System.out.printf("%s -> Producer puts doughnut #%d on the tray\n",
                LocalTime.now().format(dateTimeFormatter), doughnut.getNumber());
    }

    public Doughnut extractFromTray() throws InterruptedException {
        System.out.printf("%s -> Consumer tries to extract a doughnut from tray\n",
                LocalTime.now().format(dateTimeFormatter));
        Doughnut doughnut = tray.take();
        System.out.printf("%s -> Consumer extracts doughnut #%d from tray\n",
                LocalTime.now().format(dateTimeFormatter), doughnut.getNumber());
        return doughnut;
    }

}
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class DoughnutConsumer implements Runnable {

    private final Bakery bakery;

    public DoughnutConsumer(Bakery bakery) {
        Objects.requireNonNull(bakery);
        this.bakery = bakery;
    }

    @Override
    public void run() {
        Doughnut doughnut;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                doughnut = bakery.extractFromTray();
            } catch (InterruptedException e) {
                System.out.println("Consumer has been interrupted while extracting from tray");
                return;
            }
            try {
                eat(doughnut);
            } catch (InterruptedException e) {
                System.out.println("Consumer has been interrupted while eating");
                return;
            }
        }
        System.out.println("Consumer has been interrupted");
    }

    private void eat(Doughnut doughnut) throws InterruptedException {
        System.out.printf("Consumer is eating doughnut #%d\n", doughnut.getNumber());
        TimeUnit.SECONDS.sleep(1);
    }

}
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class DoughnutProducer implements Runnable {

    private final Bakery bakery;
    private int doughnutNumber;

    public DoughnutProducer(Bakery bakery) {
        Objects.requireNonNull(bakery);
        this.bakery = bakery;
    }

    @Override
    public void run() {
        Doughnut doughnut;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                doughnut = makeDoughnut();
            } catch (InterruptedException e) {
                System.out.println("Producer has been interrupted while making a doughnut");
                return;
            }
            bakery.addToTray(doughnut);
        }
        System.out.println("Producer has been interrupted");
    }

    private Doughnut makeDoughnut() throws InterruptedException {
        int doughnut = ++doughnutNumber;
        System.out.printf("Producer is making doughnut #%d\n", doughnut);
        TimeUnit.SECONDS.sleep(4);
        return new Doughnut(doughnut);
    }

}

Si ejecutamos la aplicación veremos que no rd posible extraer un donut de la bandeja si éste no lleva en ella al menos tres segundos.

Proyecto SynchronousQueue

Este proyecto es parecido al Proyecto ArrayBlockingQueue pero haciendo que el productor sea bloqueado en la bandeja cada vez de agrega un donuts a ella, hasta que el cliente lo extraiga de la misma. De esta manera el productor se asegura que no produce mas de un donut por adelantado.

public class Main {

    public static void main(String[] args) {
        Bakery bakery = new Bakery();
        Thread doughnutProducerThread = new Thread(new DoughnutProducer(bakery), "Doughnut producer");
        Thread doughnutConsumerThread = new Thread(new DoughnutConsumer(bakery), "Doughnut consumer");
        doughnutProducerThread.start();
        doughnutConsumerThread.start();
    }

}
import java.util.concurrent.SynchronousQueue;

public class Bakery {

    private final SynchronousQueue<Integer> tray = new SynchronousQueue<>();

    public void addToTray(Integer doughnut) throws InterruptedException {
        System.out.printf("Producer puts doughnut #%d on the tray\n", doughnut);
        tray.put(doughnut);
    }

    public Integer extractFromTray() throws InterruptedException {
        Integer doughnut = tray.take();
        System.out.printf("Consumer extracts doughnut #%d from tray\n", doughnut);
        return doughnut;
    }

}
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class DoughnutConsumer implements Runnable {

    private final Bakery bakery;

    public DoughnutConsumer(Bakery bakery) {
        Objects.requireNonNull(bakery);
        this.bakery = bakery;
    }

    @Override
    public void run() {
        Integer doughnut;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                doughnut = bakery.extractFromTray();
            } catch (InterruptedException e) {
                System.out.println("Consumer has been interrupted while extracting from tray");
                return;
            }
            try {
                eat(doughnut);
            } catch (InterruptedException e) {
                System.out.println("Consumer has been interrupted while eating");
                return;
            }
        }
        System.out.println("Consumer has been interrupted");
    }

    private void eat(int doughnut) throws InterruptedException {
        System.out.printf("Consumer is eating doughnut #%d\n", doughnut);
        TimeUnit.SECONDS.sleep(20);
    }

}
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class DoughnutProducer implements Runnable {

    private final Bakery bakery;
    private int doughnutNumber;

    public DoughnutProducer(Bakery bakery) {
        Objects.requireNonNull(bakery);
        this.bakery = bakery;
    }

    @Override
    public void run() {
        int doughnut;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                doughnut = makeDoughnut();
            } catch (InterruptedException e) {
                System.out.println("Producer has been interrupted while making a doughnut");
                return;
            }
            try {
                bakery.addToTray(doughnut);
            } catch (InterruptedException e) {
                System.out.println("Producer has been interrupted while adding a doughnut to the tray");
                return;
            }
        }
        System.out.println("Producer has been interrupted");
    }

    private int makeDoughnut() throws InterruptedException {
        int doughnut = ++doughnutNumber;
        System.out.printf("Producer is making doughnut #%d\n", doughnut);
        TimeUnit.SECONDS.sleep(1);
        return doughnut;
    }

}

Si ejecutamos la aplicación veremos que los donuts se entregan "en mano" uno a uno entre el productor y el consumidor.

LinkedTransferQueue

Una TransferQueue es una blocking queue, que opcionalmente podemos limitar en capacidad, con una funcionalidad adicional a la BlockingQueue, la de permitir al hilo que realiza la inserción bloquearse esperando a que un consumidor extraiga dicho elemento, es decir, esperar a que la transferencia se hace efectiva (de ahí su nombre).

Para ello, además de los métodos habituales de inserción proporcionados por la interfaz BlockingQueue, la interfaz TransferQueue proporciona el método transfer(e) para tal fin, que hará que el hilo que lo ejecute será bloqueado hasta que algún otro hilo llame al método take() o al método poll() para obtener dicho elemento. Si cuando se llama al método transfer(e) ya había elementos en la lista, el hilo será bloqueado mientras otros hilos procesan todos los elementos anteriores de la cola y sólo será desbloqueado cuando otro hilo extraiga de la cola elemento transferido.

Es posible también llamar al método hasWaitingConsumer() para consultar si hay algún consumidor esperando a que haya elementos en la cola.

La clase que implementa esta interfaz es LinkedTransferQueue, en la que los elementos se almacenan como nodos enlazados.

La clase LinkedTransferQueue proporciona un modo de funcionamiento similar al de SynchronousQueue. Sin embargo, a diferencia de ésta, LinkedTransferQueue proporciona otros modos de funcionamiento, en concreto los proporcionados por la interfaz BlockingQueue, por lo que nos permite decidir en cada momento qué modo usar.

Proyecto LinkedTransferQueue

Este proyecto es parecido al Proyecto ArrayBlockingQueue pero haciendo que el productor sea bloqueado en la bandeja cada tres donuts, hasta que el cliente extraiga de la misma el tercer donut. De esta manera el productor se asegura que no produce mas de tres donuts por adelantado.

public class Main {

    public static void main(String[] args) {
        Bakery bakery = new Bakery();
        Thread doughnutProducerThread = new Thread(new DoughnutProducer(bakery), "Doughnut producer");
        Thread doughnutConsumerThread = new Thread(new DoughnutConsumer(bakery), "Doughnut consumer");
        doughnutProducerThread.start();
        doughnutConsumerThread.start();
    }

}
import java.util.concurrent.LinkedTransferQueue;

public class Bakery {

    private final LinkedTransferQueue<Integer> tray = new LinkedTransferQueue<>();

    public void addToTray(Integer doughnut) {
        System.out.printf("Producer puts doughnut #%d on the tray\n", doughnut);
        tray.put(doughnut);
    }

    public void addToTrayAndBlock(Integer doughnut) throws InterruptedException {
        System.out.printf("Producer puts doughnut #%d on the tray and waits to the consumer to extract it\n", doughnut);
        tray.transfer(doughnut);
    }

    public Integer extractFromTray() throws InterruptedException {
        Integer doughnut = tray.take();
        System.out.printf("Consumer extracts doughnut #%d from tray\n", doughnut);
        return doughnut;
    }

}
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class DoughnutProducer implements Runnable {

    private final Bakery bakery;
    private int doughnutNumber;

    public DoughnutProducer(Bakery bakery) {
        Objects.requireNonNull(bakery);
        this.bakery = bakery;
    }

    @Override
    public void run() {
        int doughnut;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                doughnut = makeDoughnut();
            } catch (InterruptedException e) {
                System.out.println("Producer has been interrupted while making a doughnut");
                return;
            }
            try {
                if (doughnut % 3 == 0) {
                    bakery.addToTrayAndBlock(doughnut);
                } else {
                    bakery.addToTray(doughnut);
                }
            } catch (InterruptedException e) {
                System.out.println("Producer has been interrupted while adding a doughnut to the tray");
                return;
            }
        }
        System.out.println("Producer has been interrupted");
    }

    private int makeDoughnut() throws InterruptedException {
        int doughnut = ++doughnutNumber;
        System.out.printf("Producer is making doughnut #%d\n", doughnut);
        TimeUnit.SECONDS.sleep(1);
        return doughnut;
    }

}
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class DoughnutProducer implements Runnable {

    private final Bakery bakery;
    private int doughnutNumber;

    public DoughnutProducer(Bakery bakery) {
        Objects.requireNonNull(bakery);
        this.bakery = bakery;
    }

    @Override
    public void run() {
        int doughnut;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                doughnut = makeDoughnut();
            } catch (InterruptedException e) {
                System.out.println("Producer has been interrupted while making a doughnut");
                return;
            }
            try {
                if (doughnut % 3 == 0) {
                    bakery.addToTrayAndBlock(doughnut);
                } else {
                    bakery.addToTray(doughnut);
                }
            } catch (InterruptedException e) {
                System.out.println("Producer has been interrupted while adding a doughnut to the tray");
                return;
            }
        }
        System.out.println("Producer has been interrupted");
    }

    private int makeDoughnut() throws InterruptedException {
        int doughnut = ++doughnutNumber;
        System.out.printf("Producer is making doughnut #%d\n", doughnut);
        TimeUnit.SECONDS.sleep(1);
        return doughnut;
    }

}

LinkedBlockingDequeue

La interfaz BlockingDequeue define una estructura de datos en forma de cola bidireccional proporcionando operaciones optimizadas concurrent-aware. Se caracteriza porque se pueden realizar inserciones y extracciones en ambos extremos de la cola.

BlockingDequeue proporciona cuatro grupos de métodos, dependiendo de lo que queremos que ocurra si no se puede realizar la operación solicitada inmediatamente porque otro hilo está haciendo otra operación que lo imposibilita, pero puede realizarse más adelante. Así, tendremos métodos que lanzarán una excepción, otros que retornan un valor especial (null o false), otros que bloquean al hilo llamador indefinidamente y otros que bloquean al hilo llamador una cantidad máxima de tiempo antes de darse por vencido.

En las siguientes tablas se muestran las operaciones, los grupos, los extremos y los métodos:

Extremo head Lanzan un excepción Retornan valor especial Bloquean Bloquean con límite tiempo
Inserción addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit)
Eliminación removeFirst(e) pollFirst() takeFirst() pollFirst(time, unit)
Consulta getFirst() peekFirst() - -
Extremo tail Lanzan un excepción Retornan valor especial Bloquean Bloquean con límite tiempo
Inserción addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit)
Eliminación removeLast(e) pollLast() takeLast() pollLast(time, unit)
Consulta getLast() peekLast() - -

Al igual que BlockingQueue, las estructuras de datos BlockingDequeue no aceptan el valor null como elemento.

Una implementación de BlockingDeque pude ser usada directamente como una cola BlockingQueue, ya que hereda de ella, haciendo métodos equivalentes. Por ejemplo, add(e) es equivalente a addLst(e), remove() es equivalente a removeFirst() y element() es equivalente a getFirst().

La clase que implementa esta interfaz es LinkedBlockingDequeue, que opcionalmente podemos limitar en capacidad, y en la que los elementos se almacenan como nodos enlazados.

ConcurrentHashMap

La interfaz ConcurrentMap define una estructura de datos mapa concurrent-aware, que garantiza la atomicidad de las operaciones.

Proporciona métodos del tipo check-then-act (comprueba y después actúa) que son realizados atómicamente, como:

  • replace(K key, V oldValue, V newValue): Sólo reemplaza el valor asociado a la clave si en ese momento contiene el valor oldValue.
  • putIfAbsent(K key, V value): Sólo asocia el valor a la clave si ésta no tiene ningún valor asociado.
  • computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction): Sólo si la clave no tiene asociado ningún valor ejecuta la función recibida como argumento y asocia el valor retornado por ésta a la clave.
  • computeIfPresent(K key, Function<? super K,? extends V> mappingFunction): Sólo si la clave tiene asociado algún valor ejecuta la función recibida como argumento y asocia el valor retornado por ésta a la clave.

La clase principal que implementa la interfaz ConcurrentMap es ConcurrentHashMap, que usa internamente una tabla *hash* para almacenar los datos.

Para la mayoría de las operaciones de actualización ConcurrentHashMap usa operaciones CAS (compare and swap), por los que proporciona buen rendimiento. Sin embargo si se produce una colisión en el hash entre dos elementos, usará un cerrojo. Por este motivo no se puede clasificar totalmente como lock-free.