Java 8: Streams

Posted on Tue 01 April 2014 in coding

Esta é a segunda parte numa série de posts que escrevo a apresentar as features da nova versão do Java. Na primeira escrevi sobre lambdas (ou funções anónimas). Nesta, falo sobre um conceito novo na linguagem que utiliza lambdas intensivamente - Streams.

Stream é uma nova interface que permite executar múltiplas operações em cadeia sobre um conjunto de elementos. As operações são organizadas num pipeline de dados constituído por uma origem (i.e.: um array, uma função geradora, I/O...), zero ou mais operações intermédias e uma operação final de consumo.

Para quem usa Linux e sistemas parecidos, a ideia é a mesma de quando se utiliza vários processos separados por pipes. Para obter os 10 maiores ficheiros/pastas de uma diretoria, por exemplo, podemos executar o seguinte comando:

du -s * | sort -n | tail

Utilizando ainda o exemplo do post anterior, vamos reimplementar a classe Chatserver com apenas um simples getter e um construtor com alguns dados de inicialização:

class Chatserver {
    private List<User> users;

    public Chatserver() {
        users = new ArrayList<>();
        users.add(new User("Anabela", 'f', 25, false));
        users.add(new User("Beatriz", 'f', 22, true));
        users.add(new User("Cláudio", 'm', 17, false));
        users.add(new User("Daniela", 'f', 16, true));
        users.add(new User("Eduardo", 'm', 19, true));
        users.add(new User("Fabiana", 'f', 26, true));
        users.add(new User("Gonçalo", 'm', 23, false));
    }

    List<User> getUsers() {
        return new ArrayList<>(users);
    }
}

Todas as coleções têm um novo método stream() que, como se deve esperar, devolve um novo stream associado:

Stream<User> stream = cs.getUsers().stream();

Os objetos Stream têm dois tipos diferentes de operações:

  • operações intermédias - que aplicam transformações em todos os elementos, devolvendo uma nova stream;
  • operações terminais - que consomem a stream, produzindo um resultado final (ou efeitos secundários, como imprimir para a consola).

Para filtrar os utilizadores segundo uma certa condição (o que foi feito no post anterior), pode-se utilizar o método filter(), uma operação intermédia:

cs.getUsers().stream()
             .filter(User::isOnline)

Este método devolve uma nova stream, e assim, podemos aplicar mais operações:

cs.getUsers().stream()
             .filter(User::isOnline)
             .filter(user -> user.getGender() == 'f')

Obtemos assim as miúdas que estão online (don't judge me). Para imprimir os resultados, podemos utilizar a operação terminal forEach() que efetua uma ação a cada elemento do stream. Neste caso a ação é simplesmente invocar System.out.println().

cs.getUsers().stream()
             .filter(User::isOnline)
             .filter(user -> user.getGender() == 'f')
             .forEach(System.out::println);

Output:

User{name=Beatriz, gender=f, age=22, online=true}
User{name=Daniela, gender=f, age=16, online=true}
User{name=Fabiana, gender=f, age=26, online=true}

Existem outras operações interessantes como é o caso de sorted() que ordena os elementos segundo um dado comparador e map() que aplica uma função em todos os elementos. Ex:

cs.getUsers().stream()
             .filter(User::isOnline)
             .filter(user -> user.getGender() == 'f')
             .sorted((u, v) -> u.getAge() - v.getAge())
             .map(User::description)
             .forEach(System.out::println);

describe() é um método que devolve uma String com um texto bonito para descrever um utilizador. Output:

Daniela, f, 16: online
Beatriz, f, 22: online
Fabiana, f, 26: online

Dá para fazer todo o tipo de coisas porreiras com isto já com a biblioteca padrão, como por exemplo, podemos apresentar estatísticas simples sobre a idade dos utilizadores mapeando, a todos os elementos, o método que devolve a idade seguido de summaryStatistics():

System.out.println(
        cs.getUsers().stream()
        .mapToInt(User::getAge)
        .summaryStatistics());

Output:

IntSummaryStatistics{count=7, sum=148, min=16, 
average=21.142857, max=26}

Podemos usar collect() com um Collector para guardar os elementos finais numa estrutura de dados:

List<User> list;
list = cs.getUsers().stream()
         .filter(User::isOnline)
         .sorted((u, v) -> u.getAge() - v.getAge())
         .collect(Collectors.toList());

for (User u: list) {
    System.out.println(u);
}

Output:

User{name=Daniela, gender=f, age=16, online=true}
User{name=Eduardo, gender=m, age=19, online=true}
User{name=Beatriz, gender=f, age=22, online=true}
User{name=Fabiana, gender=f, age=26, online=true}

Já chega de chats e utilizadores. Para introduzir, na minha opinião, uma das melhores funcionalidades dos Streams, vou mudar de problema.

Quer-se contar quantos números primos há abaixo de 10 milhões. Como não me apetece pensar muito, vou determinar se um número é ou não primo mediante um teste exaustivo de divisibilidade (experimentando dividir por todos os números de 2 até à sua raíz quadrada):

IntPredicate isPrime;
isPrime = n -> IntStream.rangeClosed(2, (int)Math.sqrt(n))
        .allMatch(i -> n % i != 0) && n > 1;

IntPredicate é uma interface funcional em que o método test() espera um inteiro e devolve um booleano. rangeClosed() cria uma range (estilo Python) - uma sequência de números inteiros que começa no primeiro argumento até ao número dado como o segundo argumento (inclusivé - daí se chamar "Closed").

allMatch() devolve true caso a condição "número não é divisível por i (n % i != 0) se verifique para todos os i's pertencentes à sequência.

Agora para contar primos abaixo de 10 000 000, basta criar uma sequência até 10 milhões, filtrar pela condição de ser primo e contar os elementos. Ou seja,

long nPrimes = IntStream.range(0, 10_000_000)
                        .filter(isPrime)
                        .count();
System.out.println(nPrimes);

Se executarem, dá 664 579 números primos e, no meu computador, demora aproximadamente 14 segundos para devolver o resultado.

Num mundo em que os computadores são cada vez mais multi-core, é um desperdício utilizar apenas um core para fazer resolver este tipo de problemas embaraçosamente paralelos.

O Java 8 possibilita-nos paralelizar as operações das streams, automagicamente, sem haver necessidade de lidar com threads e essas coisas. Basta invocar o método parallel() numa stream já existente, ou então, parallelStream() quando se quer obter um stream a partir de um collection.

Abaixo está o programa completo, já ajustado para processar a stream em paralelo:

public class PrimeTest {
    public static void main(String[] args) {

        IntPredicate isPrime = n -> IntStream
            .rangeClosed(2, (int)Math.sqrt(n))
            .allMatch(i -> n % i != 0) && n > 1;

        long nPrimes = IntStream.range(0, 10_000_000)
            .parallel()
            .filter(isPrime)
            .count();

        System.out.println(nPrimes);
    }
}

Este programa demora-me ~7.8 segundos, ou seja, um speedup de 1.8×. Um resultado consistente com o facto de ter um computador dual-core.

E é tudo por agora. Espero que tenham gostado :)