3 CyclicBarrier¶
CyclicBarrier¶
La API de concurrencia de Java nos proporciona la clase CyclicBarrier
, que permite la sincronización de un número fijo de dos o más hilos que deben esperarse unos a los otros en un determinado punto de sincronización común antes de continuar su ejecución. La barrera (barrier) no dejará avanzar a los hilos que lleguen a un determinado punto de sincronización hasta que todos hayan llegado a dicho punto.
CyclicBarrier
Barrera de sincronización de un número predeterminado de hilos, que se reinicia automáticamente
La gran ventaja de CyclicBarrier
es que puede usar tanto una sincronización inicial como para una intermedia o una final.
El constructor de esta clase recibe un parámetro entero que indica el número de hilos que serán sincronizados (denominado internamente parties). Si pasamos un valor negativo para dicho parámetro se lanzará la excepción IllegalArgumentException
.
Un objeto CyclicBarrier
posee un contador interno para controlar cuántos hilos han llegado al punto de sincronización. Cuando uno de estos hilos llega al punto establecido, llama al método await()
para esperar al resto de los hilos, de manera que queda suspendido.
El método await(timeout, timeUnit)
está sobrecargado de manera que recibe un tiempo máximo de espera, transcurrido el cuál el hilo es reactivado automáticamente y se lanza la excepción TimeoutException
. Si el timeout
pasado al método es menor o igual que 0, el hilo no esperará.
Tanto await()
como await(timeout, timeUnit)
retornan el orden en el que el hilo ha llegado a la barrera, donde el primer en llegar recibe el valor participantes - 1
y el último en llegar recibe el valor 0
.
Cuando el último de los hilos llama al método await()
del objeto CyclicBarrier
, es decir, cuando el contador interno llega a 0
, se dice que se "abre la barrera" y el resto de hilos que estaban esperando en la misma son reactivados y continúan su ejecución.
Como la mayoría de los métodos bloqueantes, await()
y await(timeout, timeUnit)
lanzan la excepción InterruptedException
si el hilo es interrumpido mientras está esperando en la barrera o si ya había sido marcado para interrupción antes de llamar a estos métodos, de manera que el hilo es reactivado inmediatamente.
Pero entonces, ¿qué ocurre entonces con los otros hilos que estaban esperando en la barrera? Dado que un hilo ha sido interrumpido podemos afirmar que las condiciones que debían de darse para permitir avanzar a los hilos no van a poder darse, porque el contador interno de la barrera no podrá llegar a 0. En ese caso, podemos decir que la barrera "se ha roto", pasando a un estado interno conocido como broken.
Igualmente, si en un hilo que está ejecutando el método await(timeout, timeUnit)
se lanza la excepción TimeoutException
porque el hilo haya esperado "demasiado", la barrera quedará "rota".
Cuando una barrera "se rompe" los hilos que estuvieran esperando en dicha barrera son reactivados automáticamente lanzando la excepción BrokenBarrierException
. Podemos saber si una barrera está rota llamando al método isBroken()
.
Si un hilo llama al método await()
o await(timeout, timeUnit)
una vez que la barrera ha sido marcada como rota, el hilo no será bloqueado y se lanzará directamente la excepción BrokenBarrierException
.
Un aspecto interesante de la clase CyclicBarrier
es que su constructor puede recibir un parámetro adicional consistente en un objeto Runnable
que será ejecutado automáticamente cuando el último de los hilos participantes haya indicado que han llegado a la barrera ejecutando el método await()
o await(timeout, timeUnit)
. La tarea representada por el Runnable
será ejecutada en dicho hilo antes reactivar a los hilos que estaban esperando en la barrera para que continúen con su ejecución. Suele emplearse para actualizar un estado común a todos los hilos antes de que estos puedan continuar su ejecución.
Si se produjera una excepción no gestionada en el objeto Runnable
la excepción será propagada a dicho hilo y la barrera pasará al estado de rota, lanzando la excepción BrokenBarrierException
en el resto de hilos que estaban esperando en la barrera.
Debemos tener en cuenta que un objeto CyclicBarrier
corresponde, como su nombre indica, a una barrera cíclica. Esto quiere decir, que el contador interno es reseteado automáticamente a su valor inicial cuando todos los hilos participantes llegan a punto de sincronización y llaman al método await()
o await(timeout, timeUnit)
, siempre y cuando la barrera no se encuentre o pase al estado de rota. Por tanto podremos reutilizar el mismo objeto CyclicBarrier
para volver a sincronizar más adelante a los mismos hilos en un punto de sincronización posterior.
En algunas ocasiones complejas es necesario reiniciar el proceso de sincronización de la barrera como si nada hubiera ocurrido. Para ello se necesita resetear explícitamente el contador interno de la barrera a su valor inicial, para lo que podemos usar el método reset()
. Debemos tener en cuenta que en cuanto un hilo llama al método reset()
de la barrera los hilos que estuvieran esperando en la misma serán reactivados inmediatamente y se lanzará en ellos la excepción BrokenBarrierException
. En general la operación de reseteo puede ser compleja de gestionar por lo que puede resultar más conveniente simplemente crear un nuevo objeto CyclicBarrier
.
El objeto CyclicBarrier
presenta una serie de diferencias importantes respecto a CountDownLatch
:
CyclicBarrier
sincroniza hilos en un determinado punto de ejecución, mientrasCountDownLatch
bloquea el avance de hilos hasta que ocurra un determinado número de eventos, independientemente de en qué hilos se generen dichos eventos. Dicho de manera sencilla, cada hilo participante en unCyclicBarrier
sólo puede aportar un único evento (y no varios).- Una
CyclicBarrier
es por definición cíclica, mientras que unCountDownLatch
no puede ser reutilizado para una segunda sincronización una vez que su contador interno llega a0
. - Podemos resetear explícitamente una
CyclicBarrier
, mientras que losCountDownLatch
no pueden ser reseteados.
Otra característica importante de la clase CyclicBarrier
es que no se permite incrementar o decrementar dinámicamente el número de hilos que deben sincronizarse en la barrera, sino que la barrera siempre funcionará con el número de hilos que le hayamos indicado en el constructor.
La clase CyclicBarrier
posee también una serie de métodos informativos sobre el estado de la barrera, como:
getNumberWaiting()
: Retorna el número de hilos que están suspendidos en la barrera.getParties()
: Retorna el número de hilos participantes que van a ser sincronizados en la barrera.
Proyecto CyclicBarrier¶
En este proyecto desarrollaremos una aplicación que simula tres amigos que quedan en un bar para tomar dos cervezas. Como son bueno amigos, ninguno de ellos empieza a beber la primera cerveza hasta que todos han llegado al bar. De igual forma, hasta que todos no se han bebido la primera cerveza, ninguno comienza a beberse la segunda cerveza. Finalmente, cuando todos han terminado la segunda cerveza, los amigos se van cada uno a su casa. Para llevar a cabo la sincronización entre los amigos usaremos un objeto CyclicBarrier
y lo configuraremos de manera que cada vez que los amigos dejen de esperarse se lance un mensaje de cambio de actividad.
import java.util.concurrent.CyclicBarrier;
public class Main {
private static final int NUMBER_OF_FRIENDS = 3;
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER_OF_FRIENDS,
new OpenedBarrierAction());
for (int i = 0; i < NUMBER_OF_FRIENDS; i++) {
new Thread(new Friend("Friend #" + i, cyclicBarrier), "Friend #" + i).start();
}
}
}
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class Friend implements Runnable {
private final String name;
private final CyclicBarrier cyclicBarrier;
private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
public Friend(String name, CyclicBarrier cyclicBarrier) {
Objects.requireNonNull(name);
Objects.requireNonNull(cyclicBarrier);
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
goToPub();
} catch (InterruptedException e) {
System.out.printf("%s has been interrupted while going to the pub\n", name);
return;
}
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
System.out.printf("%s has been interrupted while waiting for friends in the pub\n", name);
return;
} catch (BrokenBarrierException e) {
System.out.printf("%s doesn't wait any more for friends in the pub because someone isn't coming\n", name);
}
try {
firstBeerInPub();
} catch (InterruptedException e) {
System.out.printf("%s has been interrupted while drinking the first beer\n", name);
return;
}
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
System.out.printf("%s has been interrupted while waiting for friends to finish their fist beer\n", name);
return;
} catch (BrokenBarrierException e) {
System.out.printf("%s doesn't wait any more for friends to finish their first beer because someone isn't going to finish it\n", name);
}
try {
secondBeerInPub();
} catch (InterruptedException e) {
System.out.printf("%s has been interrupted while drinking the second beer\n", name);
return;
}
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
System.out.printf("%s has been interrupted while waiting for friends to finish their second beer\n", name);
return;
} catch (BrokenBarrierException e) {
System.out.printf("%s doesn't wait any more for friends to finish their second beer because someone isn't going to finish it\n", name);
}
try {
goHome();
} catch (InterruptedException e) {
System.out.printf("%s has been interrupted while going back home\n", name);
}
}
private void goToPub() throws InterruptedException {
System.out.printf("%s -> %s is leaving home\n",
LocalTime.now().format(dateTimeFormatter), name);
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5) + 1);
System.out.printf("%s -> %s has arrived in the pub\n",
LocalTime.now().format(dateTimeFormatter), name);
}
private void firstBeerInPub() throws InterruptedException {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5) + 1);
System.out.printf("%s -> %s has finished the first beer\n",
LocalTime.now().format(dateTimeFormatter), name);
}
private void secondBeerInPub() throws InterruptedException {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5) + 1);
System.out.printf("%s -> %s has finished the first beer\n",
LocalTime.now().format(dateTimeFormatter), name);
}
private void goHome() throws InterruptedException {
System.out.printf("%s -> %s is leaving the pub\n",
LocalTime.now().format(dateTimeFormatter), name);
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5) + 1);
System.out.printf("%s -> %s is at home\n",
LocalTime.now().format(dateTimeFormatter), name);
}
}
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
public class OpenedBarrierAction implements Runnable {
private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("HH:mm:ss", Locale.getDefault());
@Override
public void run() {
System.out.printf("%s -> Phase change (executed in %s)\n",
LocalTime.now().format(dateTimeFormatter), Thread.currentThread().getName());
}
}
Si ejecutamos el programa veremos que unos amigos se esperan a los otros.