Skip to content

5 Exchanger

Exchanger

La API de concurrencia de Java nos proporciona un tipo adicional de barrera, a través de la clase Exchanger<T> (intercambiador), que permite definir un punto de sincronización entre dos hilos de manera que el primero de los dos en llegar es bloqueado en espera del otro, de manera que cuando ambos llegan a dicho punto se sincronización intercambian entre ellos un determinado objeto de tipo T.

Exchanger

Barrera de sincronización entre dos hilos para el intercambio de una estructura de datos

Ambos hilos tendrán definido un objeto del tipo T cuyas direcciones de memoria serán intercambiadas al llegar al punto de sincronización.

Para poder llevar a cabo el intercambio ambos hilos deben compartir el objeto Exchanger<T> y tener definido un objeto propio del tipo T. Al llegar al punto de sincronización, cada hilo llamará al método exchange(T_object) del objeto Exchanger pasándole el objeto T que aporta al intercambio. Si el otro hilo aún no ha llegado al punto de sincronización el hilo es suspendido hasta que el otro hilo ejecute el mismo método. Cuando ambos hilos hayan llamado al método exchange(T_object), el hilo que estuviera esperando (el primero que llegó) será reactivado y el otro ni siquiera será bloqueado, y como valor de retorno de la llamada a exchange(T_object), cada hilo recibirá el objeto proporcionando por el otro hilo, que normalmente asignará a la variable que albergaba su objeto. De esta manera, ambos hilos pueden continuar su ejecución, pero ahora su objeto en realidad corresponde al que contenía el otro hilo antes del intercambio.

El método exchange(T_object, timeout, timeUnit) está sobrecargado de manera que recibe un tiempo máximo de espera, transcurrido el cuál el hilo es reactivado automáticamente y se lanza la excepción TimeoutException. Si el timeout pasado al método es menor o igual que 0, el hilo no esperará.

Como la mayoría de los métodos bloqueantes, exchange(T_object) y exchange(T_object, timeout, timeUnit) lanzarán la excepción InterruptedException si el hilo es interrumpido mientras estaba esperando en dichos métodos, o si ya había sido marcado para interrupción antes de ejecutarlos, reactivando inmediatamente el hilo correspondiente.

Exchanger

Figura - Exchanger

Esta clase puede ser muy útil en problemas parecidos al del productor-consumidor, aunque con la limitación de poder sincronizar un único productor y un único consumidor, dado que la clase Exchanger<T> sólo funciona entre dos hilos.

Proyecto Exchanger

En este proyecto desarrollaremos una aplicación para simular el problema del productor-consumidor con un solo productor y un solo consumidor, que intercambiarán el buffer de datos una vez éste se haya llenado, para lo que hará uso de la clase Exchanger.

import java.util.List;
import java.util.concurrent.Exchanger;

public class Main {

    public static final int BUFFER_SIZE = 10;

    public static void main(String[] args) {
        Exchanger<List<Integer>> exchanger = new Exchanger<>();
        Thread doughnutProducerThread = new Thread(new DoughnutProducer(exchanger), 
                                                "Doughnut producer");
        Thread doughnutConsumerThread = new Thread(new DoughnutConsumer(exchanger), 
                                                "Doughnut consumer");
        doughnutProducerThread.start();
        doughnutConsumerThread.start();
    }
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DoughnutProducer implements Runnable {

    private final Exchanger<List<Integer>> exchanger;
    private List<Integer> buffer = new ArrayList<>();
    private int doughnutNumber;

    public DoughnutProducer(Exchanger<List<Integer>> exchanger) {
        Objects.requireNonNull(exchanger);
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        Integer doughnut;
        while (!Thread.currentThread().isInterrupted()) {
            // Fill the buffer and then exchange.
            for (int i = 0; i < Main.BUFFER_SIZE; i++){
                try {
                    doughnut = makeDoughnut();
                } catch (InterruptedException e) {
                    System.out.println(
                        "Producer has been interrupted while making a doughnut");
                    return;
                }
                System.out.printf("Producer has made doughnut #%d\n", doughnut);
                buffer.add(doughnut);
            }
            System.out.println("Producer ready for exchange");
            try {
                buffer = exchanger.exchange(buffer, 20, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                System.out.println(
                    "Producer has been interrupted while exchanging buffer");
                return;
            } catch (TimeoutException e) {
                System.out.println("Producer can't wait for the consumer anymore");
                return;
            }
        }
        System.out.println("Producer has been interrupted");
    }

    private int makeDoughnut() throws InterruptedException {
        int doughnut = ++doughnutNumber;
        TimeUnit.SECONDS.sleep(1);
        return doughnut;
    }

}
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DoughnutConsumer implements Runnable {

    private final Exchanger<List<Integer>> exchanger;
    private List<Integer> buffer = new ArrayList<>();

    public DoughnutConsumer(Exchanger<List<Integer>> exchanger) {
        Objects.requireNonNull(exchanger);
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        Integer doughnut;
        while (!Thread.currentThread().isInterrupted()) {
            // Exchage the empty buffer for a full one and then consume it.
            System.out.println("Consumer ready for exchange");
            try {
                buffer = exchanger.exchange(buffer, 20, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                System.out.println(
                    "Consumer has been interrupted while exchanging buffer");
                return;
            } catch (TimeoutException e) {
                System.out.println("Consumer can't wait for the producer anymore");
                return;
            }
            for (int i = 0; i < Main.BUFFER_SIZE; i++){
                doughnut = buffer.remove(0);
                try {
                    eat(doughnut);
                } catch (InterruptedException e) {
                    System.out.println("Consumer has been interrupted while eating");
                    return;
                }
            }
        }
        System.out.println("Consumer has been interrupted");
    }

    private void eat(int doughnut) throws InterruptedException {
        System.out.printf("Consumer is eating doughnut #%d\n", doughnut);
        TimeUnit.MILLISECONDS.sleep(500);
    }

}