Skip to content

1 Introducción a los Streams

Introducción

Un stream (flujo) en Java es una secuencia de elementos que se pueden procesar (mapear, filtrar, transformar, reducir y recolectar), de forma secuencial o paralela, mediante una cadena de operaciones especificadas a través de expresiones lambda. Introducidos en Java 8, los streams permiten optimizar la forma de procesar grandes colecciones de datos.

La interfaz Stream<T> representa un flujo de elementos de tipo T aunque también se definen interfaces concretas para los tipos primitivos, como IntStream, LongStream, etc.

Un stream es una abstracción que representa un flujo de datos pero no una estructura de datos, ya que los elementos no son almacenados en el stream, sino tan solo procesador por él. De hecho, no se puede acceder individualmente a un determinado elemento del stream, sino que se define la fuente de datos origen del stream y la secuencia de operaciones que se deben aplicar sobre sus elementos, especificadas de forma funcional mediante expresiones lambda.

Más aún, la fuente de datos de origen del stream no se ve afectada por las operaciones realizadas dentro del stream. Por ejemplo, si se filtran algunos elementos de datos del stream, no se eliminan realmente de la fuente de datos origen, simplemente se omiten en el stream a partir de ese momento y ya no se tienen en cuenta la siguiente operación incluida en la secuencia de operaciones del stream. Por tanto, los datos con los que trabajamos no se ven afectados por el stream.

Los streams solo gestionan datos transitorios en memoria, lo que implica que si la aplicación falla dichos datos se perderán.

Un stream puede ser finito, es decir, tener un número finito de elementos, o infinito, si genera un número infinito de elementos. Algunas operaciones permiten restringir el número de elementos procesados, como limit() o findFirst().

Pipeline

Una vez hayamos creado un stream a partir de una fuentes de datos, podemos ejecutar sobre él cero o más operaciones intermedias, y forzosamente, una operación final. A esta cadena de operaciones se le conoce como pipeline. Un pipeline tiene los siguientes elementos en el siguiente orden:

  1. Una función generadora del stream.
  2. Cero o más operaciones intermedias.
  3. Un operación terminal.

Debemos tener en cuenta que cada operación intermedia del pipeline genera un nuevo stream resultante de aplicar la operación indicada al stream anterior de la cadena.

  • Operaciones intermedias (aggregate operations): producen como resultado un nuevo stream. Se usan para transformar, filtrar y clasificar los elementos del stream. Puede ser:
    • Operaciones sin estado: al aplicarlas, el procesamiento de un elemento del stream es independiente de cualquier otro elemento del mismo. Por ejemplo, la operación de filtrado es sin estado, ya que el filtro de cada elemento sólo depende de una condición, no de ningún otro elemento del stream.
    • Operaciones con estado: al aplicarlas, el procesamiento de un elemento del stream depende de algún otro elemento del mismo. Por ejemplo, la operación de ordenación es con estado, ya que para posicionar un elemento es necesario compararlo con el resto.
  • Operaciones terminales (terminal operations): procesan todos los elementos del stream para generar un resultado o un efecto secundario. De hecho, no retornan un stream. Después de su ejecución, el stream original no puede ser usado de nuevo, produciendo una excepción si se intenta. De ahí que se denominen operaciones terminales. Por tanto, un determinado stream puede ser usado una sola vez; si necesitamos procesar la misma fuente de datos, deberemos crear un nuevo stream con ella como origen.

Otra característica importantísima del pipeline es que es perezoso (lazy), lo que quiere decir que las operaciones intermedias sólo son ejecutadas cuando las requiere la operación terminal que se esté ejecutando.

Por defecto, los elementos de un stream son procesados secuencialmente de uno en uno en el mismo hilo de ejecución. Es lo que se conoce como stream secuencial. Sin embargo, podemos convertir un stream secuencial en un stream paralelo con tan sólo llamar a su método parallel(). Los elementos de los streams paralelos son agrupados en conjuntos y se usa un grupo de hilos en ejecución, denominados common fork-join pool, para procesar estos conjuntos de elementos en hilos de ejecución independientes.

Debemos tener en cuenta que al llamar al método parallel() se convierte el stream completo en paralelo, no solo desde el punto en el que se llama al método.

Debemos tener en cuenta que las operaciones intermedias con estado no utilizarán todas las posibilidades de paralelismo existentes, dada su naturaleza en lo relativo a lo dependencia entre elementos.

Modelo MapReduce

En Java, los streams utilizan el modelo MapReduce, que es un modelo de programación utilizado para procesar conjuntos de datos muy grandes y que ha sido adoptado por la programación funcional. Este modelo se basa en los siguientes tipos de operaciones:

  • Transformación (map): filtra o crea copias modificadas de los elementos originales. Todas las operaciones intermedias de los streams corresponden a operaciones de transformación.
  • Reducción (reduce): genera un resultado resumen de todos los elementos, por ejemplo, la suma o la media aritmética. Las operaciones terminales de la clase Stream corresponde a operaciones de reducción. De hecho, la clase Stream implementa dos operaciones de reducción diferentes:

    • De reducción pura: implementada en las diferentes versiones del método reduce, que procesa un flujo de elementos para obtener un único valor.
    • De reducción mutable: implementada en las diferentes versiones del método collect, que procesa un flujo de elementos para generar una estructura de datos mutable, como por ejemplo, una colección.

Creación de un stream a partir de una fuente de datos

Java permite muchas maneras de crear un stream, dependiendo de la fuente de datos origen deseada. Veamos algunas de estas fuentes:

  • Colección: se ejecuta el método stream() sobre una colección para crear un stream que tenga como fuente de datos de origen dicha colección. También tenemos disponible el método parallelStream() para que los elementos sean procesados en modo paralelo.

    Podemos crear un stream a partir del cualquier interfaz que extienda de Collection, como List, Set o Queue y cualquiera de las clases que implementen dichas interfaces. Por ejemplo, desde una lista:

    List<Integer> list = Arrays.asList(1, 2, 3, 4);
    Stream<Integer> stream = list.stream();
    

    o desde el Set correspondiente a las entradas de un Map:

    Map<Integer, String> map = new HashMap<>();
    Stream<Map.Entry<Integer, String>> stream = map.entrySet().stream();
    
  • Array: el método estático Arrays.stream(array) recibe un array, que actuará como fuente de datos origen del stream. Ejemplo:

    Integer array[] = {1, 2, 3, 4};
    Stream<Integer> stream = Arrays.stream(array);
    
  • Conjunto predeterminado de elementos: el método estático Stream.of() recibe un número variable de elementos que actuarán como fuente de datos origen del stream. Ejemplo

    Stream<Integer> stream = Stream.of(1, 2, 3, 4);
    
  • Función suministradora de objetos (interfaz funcional Supplier): el método estático Stream.generate() recibe un Supplier, es decir, una función suministradora de elementos, que actuará como fuente de datos origen infinita para el stream. Ejemplo:

    Stream<Integer> stream = Stream.generate(new Random()::nextInt);
    
  • Un valor inicial y una función que obtiene el siguiente elemento a partir del anterior: el método estático Stream.iterate(seed, unaryOperator) recibe un valor inicial y una función que recibe el elemento anterior y retorna el valor del nuevo elemento, que debe ser del mismo tipo. Se tratará de un stream infinito.

    Ejemplo:

    Stream<Integer> stream = Stream.iterate(0, x -> x + 5);
    

    Java 9 introdujo una nueva versión de este método Stream.iterate(seed, predicate, unaryOperator) que recibe un parámetro intermedia adicional correspondiente a un predicate que al dejar de cumplir hace que el stream no emita más valores, convirtiéndose en un stream finito. Ejemplo:

    Stream<Integer> stream = Stream.iterate(0, x -> x < 100, x -> x + 5);
    
  • Detectando patrones en una cadena: el método splitAsStream(cadena) de la clase Pattern permite dividir una cadena en base a un patrón y retornar un stream de subcadenas. Por ejemplo:

    Stream<String> stream = Pattern.compile(",".splitAsStream("Luis,Paco,Ricardo"));
    

    retorna un stream cuyos elementos serán Luis, Paco y Ricardo.

  • Generador de números aleatorios: el método ints(limiteInf, limiteSup) de la clase Random retorna un stream (un IntStream) cuya fuente de datos origen es el generador de números aleatorios contenidos entre limiteInf y limiteSup. Es especialmente útil para hacer pruebas. Ejemple:

    IntStream stream = new Random().ints(1, 100);
    
  • Métodos estáticos de la clase IntStream:

    • IntStream.range(start, end): retorna un IntStream ordenado cuyos elementos corresponden a los enteros que van desde start hasta end - 1, es decir, end está excluido. Ejemplo:

      IntStream stream = IntStream.range(1,8);
      
    • IntStream.rangeClosed(start, end): retorna un IntStream ordenado cuyos elementos corresponden a los enteros que van desde start hasta end incluido. Ejemplo:

      IntStream stream = IntStream.rangeClosed(1, 8);
      
  • Método chars() de String: retorna un IntStream cuyos elementos corresponden a los caracteres de la cadena. Ejemplo:

    IntStream stream = "Programación".chars();
    
  • Un valor inicial: Java 9 incorpora el método estático Stream.ofNullable(T value) que retorna un Stream<T> con el valor indicado o vacío si el valor proporcionado es null.

  • Un Optional: Java 9 incorpora el método stream() que retorna un Stream<T> con un único valor correspondiente al valor contenido en el optional o un stream vacío si el optional no tiene valor presente.
  • Un stream vacío: podemos crear un stream vacío mediante el método estático empty() de la interfaz Stream<T>.
  • Un stream builder: podemos crear un stream a partir de un objeto Stream.Builder<T> al que podamos agregar elementos mediante el método add(item) y posteriormente usar el método build() del mismo para obtener el objeto Stream<T>.
  • La concatenación de dos streams: El método estático Stream.concat(stream1, stream2) retorna un stream resultante de la concatenación de los dos streams recibidos. Ejemplo:

    Stream<String> stream1 = Stream.of("Luis", "Paco", "Ricardo");
    Stream<String> stream2 = Stream.of("Ana", "Lidia", "Esther");
    Stream<String> stream = Stream.concat(stream1, stream2);
    

Tratamiento individual de los elementos

En algunas ocasiones necesitamos realizar algún tratamiento sobre cada uno de los elementos del stream. En dicho caso debemos diferenciar entre operaciones terminales y no terminales. Las operaciones terminales no producirán un nuevo stream mientras que las operaciones intermedias sí que lo producirán.

  • void forEach(Consumer<? super T> action): operación terminal para tratar cada uno de los elementos del stream. Aplica la acción recibida en forma de Consumer a cada uno de los elementos del stream.
  • Stream<T> peek(Consumer<? super T> action): también se aplica la acción recibida en forma de Consumer a cada uno de los elementos del stream pero retorna un nuevo stream con los mismos elementos que el original, por lo que se trata de una operación intermedia. Este método se utiliza para tratar individualmente cada uno de los elementos del stream sin tener por ello que terminar la cadena de operaciones.