7 Concurrent-aware collections que usan cerrojos¶
ArrayBlockingQueue y LinkedBlockingQueue¶
La interfaz BlockingQueue
define una estructura de datos en forma de cola FIFO proporcionando operaciones optimizadas concurrent-aware.
BlockingQueue
proporciona cuatro grupos de métodos, dependiendo de lo que queremos que ocurra si no se puede realizar la operación solicitada inmediatamente porque otro hilo está haciendo otra operación que lo imposibilita, pero puede realizarse más adelante. Así, tendremos métodos que lanzarán una excepción, otros que retornan un valor especial (null
o false
), otros que bloquean al hilo llamador indefinidamente y otros que bloquean al hilo llamador una cantidad máxima de tiempo antes de darse por vencido.
En la siguiente tabla se muestran las operaciones, los grupos y los métodos:
Lanzan un excepción | Retornan valor especial | Bloquean | Bloquean con limite tiempo | |
---|---|---|---|---|
Inserción | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Eliminación | remove() | poll() | take() | poll(time, unit) |
Consulta | element() | peek() | - | - |
Las estructuras de datos BlockingQueue
no acepta el valor null
como elemento, dado que dicho valor es usado como valor centinela para indicar fallos en las operaciones poll()
. De hecho, si pasamos el valor null
a los métodos add()
, put()
o offer()
se lanzará la excepción NullPointerException
.
Se puede definir un límite de capacidad a una BlockingQueue
, de manera que si una vez alcanzada la capacidad máxima se intenta agregar un elemento mediante put()
, el hilo llamador quede bloqueado, en espera de que haya espacio disponible.
Por su parte, si la cola está vacía y un hilo llama al método take()
el hilo quedará bloqueada hasta que haya algún elemento en la cola.
La interfaz BlockingQueue
es implementada por diferentes clases que usan distintos tipos de estructuras para almacenar los datos:
ArrayBlockingQueue
: Una blocking queue FIFO, limitada en capacidad, y en la que los elementos se almacenan en un array. Una vez creada, no se puede modificar su capacidad. Corresponde al típico buffer de capacidad fija. Podemos activar un modo justo (fair mode) para la espera de los hilos bloqueados para la inserción conput()
o la extracción contake()
.LinkedBlockingQueue
: Una blocking queue FIFO, que opcionalmente podemos limitar en capacidad, y en la que los elementos se almacenan como nodos enlazados. Los nodos son creados dinámicamente cuando se insertan los elementos, siempre y cuando no se haya llegado a la capacidad máxima, si ésta ha sido especificada.PriorityBlockingQueue
: Una blocking queue, no limitada en capacidad, en la que los elementos son ordenados atendiendo a un determinado criterio o prioridad. Los elementos deben implementar la interfazComparable
para poder llevar a cabo la ordenación.DelayQueue
: Una blocking queue, no limitada en capacidad, en la que cada elemento tiene asociado un determinado delay o tiempo de expiración, antes del cual no pueden ser extraídos de la cola. Así, la cola está ordenada por tiempo de expiración.SynchronousQueue
: Una blocking queue en la que cada operación de inserción debe esperar a su correspondiente operación de extracción, y viceversa. En realidad la cola no almacena ningún elemento y por tanto no tiene ninguna capacidad, ya que simplemente actúa como intermediario de entrega de datos. Por tanto no se puede consultar elementos en la cola, ni iterar sobre ellos, y sólo pueden insertarse si otro hilo va a extraerlo.
Proyecto ArrayBlockingQueue¶
Este proyecto es similar al Proyecto WaitNotify pero haciendo uso de una estructura de datos ArrayBlockingQueue
, en vez de usar cerrojos intrínsecos.
public class Main {
public static void main(String[] args) {
Bakery bakery = new Bakery();
Thread doughnutProducerThread = new Thread(new DoughnutProducer(bakery), "Doughnut producer");
Thread doughnutConsumerThread = new Thread(new DoughnutConsumer(bakery), "Doughnut consumer");
doughnutProducerThread.start();
doughnutConsumerThread.start();
}
}
import java.util.concurrent.ArrayBlockingQueue;
public class Bakery {
private static final int TRAY_CAPACITY = 10;
private final ArrayBlockingQueue<Integer> tray = new ArrayBlockingQueue<>(TRAY_CAPACITY);
public void addToTray(Integer doughnut) throws InterruptedException {
tray.put(doughnut);
System.out.printf("Producer puts doughnut #%d on the tray\n", doughnut);
}
public Integer extractFromTray() throws InterruptedException {
Integer doughnut = tray.take();
System.out.printf("Consumer extracts doughnut #%d from tray\n", doughnut);
return doughnut;
}
}
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class DoughnutConsumer implements Runnable {
private final Bakery bakery;
public DoughnutConsumer(Bakery bakery) {
Objects.requireNonNull(bakery);
this.bakery = bakery;
}
@Override
public void run() {
Integer doughnut;
while (!Thread.currentThread().isInterrupted()) {
try {
doughnut = bakery.extractFromTray();
} catch (InterruptedException e) {
System.out.println("Consumer has been interrupted while extracting from tray");
return;
}
try {
eat(doughnut);
} catch (InterruptedException e) {
System.out.println("Consumer has been interrupted while eating");
return;
}
}
System.out.println("Consumer has been interrupted");
}
private void eat(int doughnut) throws InterruptedException {
System.out.printf("Consumer is eating doughnut #%d\n", doughnut);
TimeUnit.SECONDS.sleep(30);
}
}
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class DoughnutProducer implements Runnable {
private final Bakery bakery;
private int doughnutNumber;
public DoughnutProducer(Bakery bakery) {
Objects.requireNonNull(bakery);
this.bakery = bakery;
}
@Override
public void run() {
int doughnut;
while (!Thread.currentThread().isInterrupted()) {
try {
doughnut = makeDoughnut();
} catch (InterruptedException e) {
System.out.println("Producer has been interrupted while making a doughnut");
return;
}
try {
bakery.addToTray(doughnut);
} catch (InterruptedException e) {
System.out.println("Producer has been interrupted while adding a doughnut to the tray");
return;
}
}
System.out.println("Producer has been interrupted");
}
private int makeDoughnut() throws InterruptedException {
int doughnut = ++doughnutNumber;
System.out.printf("Producer is making doughnut #%d\n", doughnut);
TimeUnit.SECONDS.sleep(1);
return doughnut;
}
}
Proyecto DelayedQueue¶
Este proyecto es parecido al Proyecto ArrayBlockingQueue pero haciendo que no sea posible extraer un donut de la bandeja si éste no lleva en ella al menos tres segundos (para dar tiempo a que el glaseado se condense adecuadamente). Por tanto, el consumidor será bloqueado si no hay en la bandeja ningún donut que cumpla dicha característica. Para proporcionar dicha funcionalidad usaremos una DelayedQueue
.
public class Main {
public static void main(String[] args) {
Bakery bakery = new Bakery();
Thread doughnutProducerThread = new Thread(new DoughnutProducer(bakery), "Doughnut producer");
Thread doughnutConsumerThread = new Thread(new DoughnutConsumer(bakery), "Doughnut consumer");
doughnutProducerThread.start();
doughnutConsumerThread.start();
}
}
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Doughnut implements Delayed {
private static final long DONUT_GLAZE_DELAY_MILLIS = 3000;
private final int number;
private final long startTime;
public Doughnut(int number) {
this.number = number;
this.startTime = System.currentTimeMillis() + DONUT_GLAZE_DELAY_MILLIS;
}
public int getNumber() {
return number;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.startTime - ((Doughnut) o).startTime);
}
}
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
public class Bakery {
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
private final DelayQueue<Doughnut> tray = new DelayQueue<>();
public void addToTray(Doughnut doughnut) {
tray.put(doughnut);
System.out.printf("%s -> Producer puts doughnut #%d on the tray\n",
LocalTime.now().format(dateTimeFormatter), doughnut.getNumber());
}
public Doughnut extractFromTray() throws InterruptedException {
System.out.printf("%s -> Consumer tries to extract a doughnut from tray\n",
LocalTime.now().format(dateTimeFormatter));
Doughnut doughnut = tray.take();
System.out.printf("%s -> Consumer extracts doughnut #%d from tray\n",
LocalTime.now().format(dateTimeFormatter), doughnut.getNumber());
return doughnut;
}
}
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class DoughnutConsumer implements Runnable {
private final Bakery bakery;
public DoughnutConsumer(Bakery bakery) {
Objects.requireNonNull(bakery);
this.bakery = bakery;
}
@Override
public void run() {
Doughnut doughnut;
while (!Thread.currentThread().isInterrupted()) {
try {
doughnut = bakery.extractFromTray();
} catch (InterruptedException e) {
System.out.println("Consumer has been interrupted while extracting from tray");
return;
}
try {
eat(doughnut);
} catch (InterruptedException e) {
System.out.println("Consumer has been interrupted while eating");
return;
}
}
System.out.println("Consumer has been interrupted");
}
private void eat(Doughnut doughnut) throws InterruptedException {
System.out.printf("Consumer is eating doughnut #%d\n", doughnut.getNumber());
TimeUnit.SECONDS.sleep(1);
}
}
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class DoughnutProducer implements Runnable {
private final Bakery bakery;
private int doughnutNumber;
public DoughnutProducer(Bakery bakery) {
Objects.requireNonNull(bakery);
this.bakery = bakery;
}
@Override
public void run() {
Doughnut doughnut;
while (!Thread.currentThread().isInterrupted()) {
try {
doughnut = makeDoughnut();
} catch (InterruptedException e) {
System.out.println("Producer has been interrupted while making a doughnut");
return;
}
bakery.addToTray(doughnut);
}
System.out.println("Producer has been interrupted");
}
private Doughnut makeDoughnut() throws InterruptedException {
int doughnut = ++doughnutNumber;
System.out.printf("Producer is making doughnut #%d\n", doughnut);
TimeUnit.SECONDS.sleep(4);
return new Doughnut(doughnut);
}
}
Si ejecutamos la aplicación veremos que no rd posible extraer un donut de la bandeja si éste no lleva en ella al menos tres segundos.
Proyecto SynchronousQueue¶
Este proyecto es parecido al Proyecto ArrayBlockingQueue pero haciendo que el productor sea bloqueado en la bandeja cada vez de agrega un donuts a ella, hasta que el cliente lo extraiga de la misma. De esta manera el productor se asegura que no produce mas de un donut por adelantado.
public class Main {
public static void main(String[] args) {
Bakery bakery = new Bakery();
Thread doughnutProducerThread = new Thread(new DoughnutProducer(bakery), "Doughnut producer");
Thread doughnutConsumerThread = new Thread(new DoughnutConsumer(bakery), "Doughnut consumer");
doughnutProducerThread.start();
doughnutConsumerThread.start();
}
}
import java.util.concurrent.SynchronousQueue;
public class Bakery {
private final SynchronousQueue<Integer> tray = new SynchronousQueue<>();
public void addToTray(Integer doughnut) throws InterruptedException {
System.out.printf("Producer puts doughnut #%d on the tray\n", doughnut);
tray.put(doughnut);
}
public Integer extractFromTray() throws InterruptedException {
Integer doughnut = tray.take();
System.out.printf("Consumer extracts doughnut #%d from tray\n", doughnut);
return doughnut;
}
}
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class DoughnutConsumer implements Runnable {
private final Bakery bakery;
public DoughnutConsumer(Bakery bakery) {
Objects.requireNonNull(bakery);
this.bakery = bakery;
}
@Override
public void run() {
Integer doughnut;
while (!Thread.currentThread().isInterrupted()) {
try {
doughnut = bakery.extractFromTray();
} catch (InterruptedException e) {
System.out.println("Consumer has been interrupted while extracting from tray");
return;
}
try {
eat(doughnut);
} catch (InterruptedException e) {
System.out.println("Consumer has been interrupted while eating");
return;
}
}
System.out.println("Consumer has been interrupted");
}
private void eat(int doughnut) throws InterruptedException {
System.out.printf("Consumer is eating doughnut #%d\n", doughnut);
TimeUnit.SECONDS.sleep(20);
}
}
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class DoughnutProducer implements Runnable {
private final Bakery bakery;
private int doughnutNumber;
public DoughnutProducer(Bakery bakery) {
Objects.requireNonNull(bakery);
this.bakery = bakery;
}
@Override
public void run() {
int doughnut;
while (!Thread.currentThread().isInterrupted()) {
try {
doughnut = makeDoughnut();
} catch (InterruptedException e) {
System.out.println("Producer has been interrupted while making a doughnut");
return;
}
try {
bakery.addToTray(doughnut);
} catch (InterruptedException e) {
System.out.println("Producer has been interrupted while adding a doughnut to the tray");
return;
}
}
System.out.println("Producer has been interrupted");
}
private int makeDoughnut() throws InterruptedException {
int doughnut = ++doughnutNumber;
System.out.printf("Producer is making doughnut #%d\n", doughnut);
TimeUnit.SECONDS.sleep(1);
return doughnut;
}
}
Si ejecutamos la aplicación veremos que los donuts se entregan "en mano" uno a uno entre el productor y el consumidor.
LinkedTransferQueue¶
Una TransferQueue
es una blocking queue, que opcionalmente podemos limitar en capacidad, con una funcionalidad adicional a la BlockingQueue
, la de permitir al hilo que realiza la inserción bloquearse esperando a que un consumidor extraiga dicho elemento, es decir, esperar a que la transferencia se hace efectiva (de ahí su nombre).
Para ello, además de los métodos habituales de inserción proporcionados por la interfaz BlockingQueue
, la interfaz TransferQueue
proporciona el método transfer(e)
para tal fin, que hará que el hilo que lo ejecute será bloqueado hasta que algún otro hilo llame al método take()
o al método poll()
para obtener dicho elemento. Si cuando se llama al método transfer(e)
ya había elementos en la lista, el hilo será bloqueado mientras otros hilos procesan todos los elementos anteriores de la cola y sólo será desbloqueado cuando otro hilo extraiga de la cola elemento transferido.
Es posible también llamar al método hasWaitingConsumer()
para consultar si hay algún consumidor esperando a que haya elementos en la cola.
La clase que implementa esta interfaz es LinkedTransferQueue
, en la que los elementos se almacenan como nodos enlazados.
La clase LinkedTransferQueue
proporciona un modo de funcionamiento similar al de SynchronousQueue
. Sin embargo, a diferencia de ésta, LinkedTransferQueue
proporciona otros modos de funcionamiento, en concreto los proporcionados por la interfaz BlockingQueue
, por lo que nos permite decidir en cada momento qué modo usar.
Proyecto LinkedTransferQueue¶
Este proyecto es parecido al Proyecto ArrayBlockingQueue pero haciendo que el productor sea bloqueado en la bandeja cada tres donuts, hasta que el cliente extraiga de la misma el tercer donut. De esta manera el productor se asegura que no produce mas de tres donuts por adelantado.
public class Main {
public static void main(String[] args) {
Bakery bakery = new Bakery();
Thread doughnutProducerThread = new Thread(new DoughnutProducer(bakery), "Doughnut producer");
Thread doughnutConsumerThread = new Thread(new DoughnutConsumer(bakery), "Doughnut consumer");
doughnutProducerThread.start();
doughnutConsumerThread.start();
}
}
import java.util.concurrent.LinkedTransferQueue;
public class Bakery {
private final LinkedTransferQueue<Integer> tray = new LinkedTransferQueue<>();
public void addToTray(Integer doughnut) {
System.out.printf("Producer puts doughnut #%d on the tray\n", doughnut);
tray.put(doughnut);
}
public void addToTrayAndBlock(Integer doughnut) throws InterruptedException {
System.out.printf("Producer puts doughnut #%d on the tray and waits to the consumer to extract it\n", doughnut);
tray.transfer(doughnut);
}
public Integer extractFromTray() throws InterruptedException {
Integer doughnut = tray.take();
System.out.printf("Consumer extracts doughnut #%d from tray\n", doughnut);
return doughnut;
}
}
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class DoughnutProducer implements Runnable {
private final Bakery bakery;
private int doughnutNumber;
public DoughnutProducer(Bakery bakery) {
Objects.requireNonNull(bakery);
this.bakery = bakery;
}
@Override
public void run() {
int doughnut;
while (!Thread.currentThread().isInterrupted()) {
try {
doughnut = makeDoughnut();
} catch (InterruptedException e) {
System.out.println("Producer has been interrupted while making a doughnut");
return;
}
try {
if (doughnut % 3 == 0) {
bakery.addToTrayAndBlock(doughnut);
} else {
bakery.addToTray(doughnut);
}
} catch (InterruptedException e) {
System.out.println("Producer has been interrupted while adding a doughnut to the tray");
return;
}
}
System.out.println("Producer has been interrupted");
}
private int makeDoughnut() throws InterruptedException {
int doughnut = ++doughnutNumber;
System.out.printf("Producer is making doughnut #%d\n", doughnut);
TimeUnit.SECONDS.sleep(1);
return doughnut;
}
}
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class DoughnutProducer implements Runnable {
private final Bakery bakery;
private int doughnutNumber;
public DoughnutProducer(Bakery bakery) {
Objects.requireNonNull(bakery);
this.bakery = bakery;
}
@Override
public void run() {
int doughnut;
while (!Thread.currentThread().isInterrupted()) {
try {
doughnut = makeDoughnut();
} catch (InterruptedException e) {
System.out.println("Producer has been interrupted while making a doughnut");
return;
}
try {
if (doughnut % 3 == 0) {
bakery.addToTrayAndBlock(doughnut);
} else {
bakery.addToTray(doughnut);
}
} catch (InterruptedException e) {
System.out.println("Producer has been interrupted while adding a doughnut to the tray");
return;
}
}
System.out.println("Producer has been interrupted");
}
private int makeDoughnut() throws InterruptedException {
int doughnut = ++doughnutNumber;
System.out.printf("Producer is making doughnut #%d\n", doughnut);
TimeUnit.SECONDS.sleep(1);
return doughnut;
}
}
LinkedBlockingDequeue¶
La interfaz BlockingDequeue
define una estructura de datos en forma de cola bidireccional proporcionando operaciones optimizadas concurrent-aware. Se caracteriza porque se pueden realizar inserciones y extracciones en ambos extremos de la cola.
BlockingDequeue
proporciona cuatro grupos de métodos, dependiendo de lo que queremos que ocurra si no se puede realizar la operación solicitada inmediatamente porque otro hilo está haciendo otra operación que lo imposibilita, pero puede realizarse más adelante. Así, tendremos métodos que lanzarán una excepción, otros que retornan un valor especial (null
o false
), otros que bloquean al hilo llamador indefinidamente y otros que bloquean al hilo llamador una cantidad máxima de tiempo antes de darse por vencido.
En las siguientes tablas se muestran las operaciones, los grupos, los extremos y los métodos:
Extremo head | Lanzan un excepción | Retornan valor especial | Bloquean | Bloquean con límite tiempo |
---|---|---|---|---|
Inserción | addFirst(e) | offerFirst(e) | putFirst(e) | offerFirst(e, time, unit) |
Eliminación | removeFirst(e) | pollFirst() | takeFirst() | pollFirst(time, unit) |
Consulta | getFirst() | peekFirst() | - | - |
Extremo tail | Lanzan un excepción | Retornan valor especial | Bloquean | Bloquean con límite tiempo |
---|---|---|---|---|
Inserción | addLast(e) | offerLast(e) | putLast(e) | offerLast(e, time, unit) |
Eliminación | removeLast(e) | pollLast() | takeLast() | pollLast(time, unit) |
Consulta | getLast() | peekLast() | - | - |
Al igual que BlockingQueue
, las estructuras de datos BlockingDequeue
no aceptan el valor null
como elemento.
Una implementación de BlockingDeque
pude ser usada directamente como una cola BlockingQueue
, ya que hereda de ella, haciendo métodos equivalentes. Por ejemplo, add(e)
es equivalente a addLst(e)
, remove()
es equivalente a removeFirst()
y element()
es equivalente a getFirst()
.
La clase que implementa esta interfaz es LinkedBlockingDequeue
, que opcionalmente podemos limitar en capacidad, y en la que los elementos se almacenan como nodos enlazados.
ConcurrentHashMap¶
La interfaz ConcurrentMap
define una estructura de datos mapa concurrent-aware, que garantiza la atomicidad de las operaciones.
Proporciona métodos del tipo check-then-act (comprueba y después actúa) que son realizados atómicamente, como:
replace(K key, V oldValue, V newValue)
: Sólo reemplaza el valor asociado a la clave si en ese momento contiene el valoroldValue
.putIfAbsent(K key, V value)
: Sólo asocia el valor a la clave si ésta no tiene ningún valor asociado.computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction)
: Sólo si la clave no tiene asociado ningún valor ejecuta la función recibida como argumento y asocia el valor retornado por ésta a la clave.computeIfPresent(K key, Function<? super K,? extends V> mappingFunction)
: Sólo si la clave tiene asociado algún valor ejecuta la función recibida como argumento y asocia el valor retornado por ésta a la clave.
La clase principal que implementa la interfaz ConcurrentMap
es ConcurrentHashMap
, que usa internamente una tabla *hash* para almacenar los datos.
Para la mayoría de las operaciones de actualización ConcurrentHashMap
usa operaciones CAS (compare and swap), por los que proporciona buen rendimiento. Sin embargo si se produce una colisión en el hash entre dos elementos, usará un cerrojo. Por este motivo no se puede clasificar totalmente como lock-free.