Java核心技术读书笔记12-Java SE 8引入的流


1.概述

Java 8提供了一种新的API来处理数据序列,在这个API中可以将数据想象成一股数据流,流过管道,而管道中则有各种的“筛网”来对流中每个流过的数据进行处理。这些筛网就是相关的方法,该API总体是一种声明式的编程,即只需在编写代码时告知你要干什么(你要使用什么“筛网”)而无需自己编写迭代器或循环代码去对每个元素进行判断/处理。同时由于Java 8引入了lambda表达式和方法引用,使得可以用函数式编程的模式去将方法作为参数以处理流中的元素。总的来说,流与lambda表达式/方法引用的配合可以使对元素序列的复杂操作简单化,就像编写SQL一样,在实现逻辑代码时,你只需要关心要做什么就可以。

1.1 流的构成:

1.数据源:流中数据的来源。

2.中间操作(Pipelining):对流中数据的处理,返回结果仍然是流,因此可以继续处理。

3.终止操作:同样是处理数据,但返回的结果不再是流。

1.2 流的特点:

1.无数据存储以及尽可能惰性求值:仍然要把流想象成一个通道,他只有当数据流过时才会对其进行处理,因此流本身是不存储值也不会改变数据源的,同时流的相关操作会尽可能只有在使用流的结果时才会真正执行。

2.并行化处理:使用流可以很简单的进行并行化对数据序列的处理,此时,数据会分成多段交由不同线程进行处理,最后再将结果返回(这个并行操作就是使用的java 7 的fork-join框架)。

3.不支持随机查找:数据通过流进行处理,不支持随机查找流中元素。

4.聚合操作:类似于SQL的分组、avg、count

5.内部迭代:相比于外部迭代将迭代操作交给用户,内部迭代只需用户告知完成什么即可,真正的迭代操作交给JVM来完成。

2.流的创建

可以使用数组、集合、文件以及元素序列等数据结构创建流,同时Java API中也提供了大量可以创建流的类。

        List list = new ArrayList<>(){{add(1); add(2); add(4); add(5);}};
        Integer[] nums = list.toArray(list.toArray(new Integer[0]));

        Stream collection2stream = list.stream(); //Collection接口方法
        Stream collection2parallelStream1 = list.parallelStream(); //同上,不过返回的是一个可并行流

        Stream array2stream = Stream.of(nums); //数组转成流
        Stream eleSeq2Stream = Stream.of(1, 2, 3);//因为接受参数为不定参,可以直接由元素构建流
        //Arrays类中也存在多个构建流的静态方法
        Stream array2stream2 = Arrays.stream(nums, 0, 1);//使用该类带三个参数的方法,可以由数组的[begin,end)下标范围的元素作为数据源
        Stream emptyStream = Stream.empty();//返回一个空数据源的流

2.1 无限流

Stream类中有两个可以创建无限流的方法:generate与iterate。两者的参数中都需要传入一个方法(实际上分别是Supplier与UnaryOperation类型的函数式接口),前者只需要一个方法作为参数并会无限次执行这个方法,并将结果通过流,后者则需要一个种子和一个方法,同样是无限次执行这个方法,不过每次会将前一次的返回结果作为这次的参数,而第一次执行则会将种子作为参数。

        Stream generateStream = Stream.generate(Math::random); //该流具有无限的随机数
        Stream iterate = Stream.iterate(0, FirstStreamTest::addOne); //该流为0,1,2...无限流

2.2 基本数据类型流

基本数据类型流为IntStream、LongStream和DoubleStream。分别对应(byte/short/int/boolean)、long、(float/double)。除了使用与对象流相同的创建方法外(of、stream),IntStream、LongStream还可以使用range与rangeClosed方法指定一个[from,to)或[from,to]范围生成一个范围内步长为1的数值元素的流。

对象流可以通过mapToInt、mapToLong与mapToDouble方法指定一个转换元素到相应类型的参数方法生成基本数据类型流。基本数据类型流转换成对象流则使用boxed()方法。

CharSequence接口的codePoints和chars方法分别按Unicode码点和UTF-16编码的代码单元生成IntStream

3.流元素的映射与过滤

3.1 映射

对流进行的中间操作会再次产生一个所有元素经过操作的流,映射指的是将一个集中的元素通过一个对应关系映射到另一个集中,这里抛开数学定义,可以简单的记为一个集合内的一个值通过方法可以得到另一个集合内的另一个值,如映射f可以表示为:f:x -> y,记作:y = f(x)。在这里原流和处理过后的流就对应两个集合,而施加操作的方法就是映射,这个映射可以对原流的每个元素进行处理得到一个新流中的元素。映射操作使用map方法完成:

        List list = new ArrayList<>(){{add(1); add(2); add(4); add(5);}};
        Integer[] nums = list.toArray(list.toArray(new Integer[0]));
        Stream stream = list.stream().map(x -> x + 1); //将原流中的元素都加1形成新流

如果一个流中的元素也是流,并且你想将这些流合并一起作为结果那么就可以使用mapflat方法

        Stream stream1 = Stream.of(1, 2, 3);
        Stream stream2 = Stream.of(4, 5, 6);
        Stream stream3 = Stream.of(7, 8, 9);
        Stream> stream = Stream.of(stream1, stream2, stream3); //((1, 2, 3), (4, 5, 6),(7, 8, 9))
        Stream flatSream = stream.flatMap(x -> x); //这个映射将结果流中的流摊开到一起形成:(1, 2, 3, 4, 5, 6, 7, 8, 9))
3.2 过滤

对流的过滤操作是filter,filter依然需要提供一个方法作为参数(实际上是Predicate接口),但是这个方法的返回值必须是boolean,之后filter会把仅能返回true的元素添加到新流中。

        Stream integerStream = flatSream.filter(x -> x >= 8);//仅要大于8的元素,结果为8、9
        long count = integerStream.count(); //2 该方法是一个终止操作,返回的是流中元素个数

4.抽取子流与合并流

对于一个流的抽取、以及两个流的合并操作

        Stream generateStream = Stream.generate(Math::random).limit(100); //抽取该无限流的前100个随机数
        Stream skipStream = Stream.of(1, 2, 3).skip(1);//跳过第一个元素
        Stream.concat(generateStream, skipStream); //合并两个流,第一个流不可以是无限的,否则合并操作也将会一直进行

5.其它中间操作

包括去重、排序以及peek方法
去重方法distinct使用的是元素的equals方法
排序方法sorted依然是两种,要么流中元素实现了comparable接口,要么传入一个比较器
peek方法返回的流与原流完全相同,但是会对每个流中元素调用方法。

6.约简操作与Optional

流的约简操作即是指对流的终止操作,这种操作一般会根据流中的元素返回一个结果,如取最大值、取流中的第一个值以及之前的count方法。
有的约简操作还会返回一个Optional对象,该对象为了可能返回的null值以及对null值处理准备的,因此返回Optional对象是一个相比直接返回值更安全的方式,其直接要么包装了T类型的返回结果对象,要么没有包装任何对象。

6.1 约简操作

你可以使用max(按方法取最大值)、count(统计元素个数)、anyMatch(若流中任何元素能满足给定条件返回true)的简单的约简操作返回结果。

若果你想进行一个累计的约简操作可以使用reduce方法,并在方法中指定一个参数方法作为约简操作,该方法能处理两个同类型元素(可以理解成将操作插入到元素中:x op y op z...),最后reduce将返回一个泛型类型为约简操作结果类型的Optional。例如,你希望得到一个元素为整型的流所有元素总和。

        Optional reduce = Stream.of(1, 2, 3, 4).reduce(Integer::sum);

通常用于约简的操作应该是可结合的:(x op y)op z == x op (y op z)
这样可以更好的使用并行流加速处理数据。

当流中元素为空时,返回的Optional也将为空。在这里可以提供一个参数作为幺元值e,该值应该满足e op other = other。此时返回的不再是Optional而是e的类型流为空时将返回这个值。

        Integer reduce = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);

当使用并行流时,对于多组操作返回的结果应该进行合并,这是需要提供第三个方法参数,该方法接收两个参数,作用是合并不同线程的最终结果。

        Integer reduce = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum, (res1, res2) -> res1 + res2);

第三种reduce是为了多线程组合结果的,主要说一下第一种和第二种。第一种reduce中的方法必须是两个和流元素类型相同的参数,而之后本次计算的结果又会当做下一次计算的第一个参数,这就意味着你无法使用这个方法返回不同类型的结果,如:(str1, str2)-> str1.length() + str2.length()
返回的整型结果将无法作为下次计算的字符串类型的str1。
想要满足这种需求只能使用提供初始值的reduce,因为返回的结果为初值类型且其累计方法第一个参数类型和该类型相同,即为每次累计的结果,而第二个参数才为流中元素:(total, str)-> total + str.length()。

6.2 Optional对象对null值处理

使用Optional的get方法可以返回约简操作的结果,但结果不存在的时候会抛出异常。所以最好使用该对象提供一些能够对null值的处理方法,包括
orElse:该方法值为空的时候提供一个默认值
orElseGet:该方法提供一个方法参数(Supplier函数式接口),值为空时调用这个方法并返回犯法结果
orElseThrow:该方法提供一个方法参数(exceptionSupplier函数式接口),值为空时抛出这个方法结果
ifPresent:该方法提供一个方法参数(Consumer函数式接口),值不空时将值传递给这个方法,且不返回任何值
map:该方法提供一个方法参数(Function函数式接口),值不空时调用这个方法并返回其结果,该方法结果也是一个Optional

6.3 创建Optional对象以及处理Optional的值

Optional对象的创建可以使用of、empty、与ofNullable方法。
of方法:有一个参数,返回的是将这个参数包装成值的Optional对象,参数若为null则抛出异常
empty方法无参,返回的是空Optional对象
ofNullable方法:与of类似,参数若为null则返回空Optional对象

对Optional值进行处理的方法包括map与flatMap方法
map方法:参数为一个方法,返回的结果是对参数方法结果自动包装后的Optional,即参数方法为:返回值类型 -> Optional<返回值类型>
flatMap方法:参数为一个方法,返回的结果是参数方法的结果,结果必须是Optional类型,即参数方法为:Optional<返回值类型> -> Optional<返回值类型>

6.4 基本数据类型流的操作与相应的Optional

基本数据类型流在使用上与对象流类似,不过有几点差异:
1.toArray方法:该方法不需要提供参数,且可以直接返回基本数据类型数组。
2.提供了无参的约简方法(终止操作)avg/max/min/sum。
3.这些方法会返回OptionalInt|Long|Double,这三个类与Optional类似,但提供的是getAsInt/getAsLong/getAsDouble方法而不是get。
4.提供了summaryStatistics方法,可以直接返回Int|Long|DoubleSummaryStatistics对象。他们可以报告流的综合、最值与均值。

Random类具有ints/longs/doubles方法可以返回由随机数构成的基本数据类型流。

7.元素的访问与收集

7.1 访问元素

对于流中的元素可以使用forEach与forEachOrdered方法直接访问,这两个方法都是终止方法并且参数都是方法(Consumer函数接口),该参数方法可以处理每个元素,例如你可以将System.out.println方法作为参数传入以打印每个元素。两个方法的不同之处主要是对于并行流的,Order方法可能会按任意顺序处理并行流元素,若要求必须顺序处理则可以使用forEachOrdered方法,但使用该方法会丧失并行处理的部分乃至全部优势。
当然对于返回流元素也可以使用iterator方法返回一个传统的迭代器遍历元素。

7.2 收集到数组

将流中的元素收集到数组可以使用toArray方法返回一个Object[],若想返回一个正确类型的数组,可以传入一个数组的构造器函数,如:

        Stream.of(1, 2, 3).toArray(Integer[]::new);
7.3 使用收集器

如果想将元素收集到某种数据结构中,可以使用collect方法并传入一个Collector类型的参数。
你可以直接使用Collectors的静态方法返回一个List、Set、Collection、字符串等类型。

        List list = Stream.of(1, 2, 3).collect(Collectors.toList()); //收集到list
        Set set = Stream.of(1, 2, 3).collect(Collectors.toSet()); //收集到set
        String str = Stream.of("1", "2", "3").collect(Collectors.joining()); //收集并拼接成一个字符串
        String str2 = Stream.of("1", "2", "3").collect(Collectors.joining("-")); //带有分隔符,str2为 1-2-3

若想控制收集到的集合的种类,也可以使用Collectors的toCollection方法,然后在方法中指定一个集合的构造器。

        TreeSet treeSet = Stream.of(1, 2, 3).collect(Collectors.toCollection(TreeSet::new));

若果你想对流中的元素做某方面的统计,可以将流约简为一个Int|Long|DoubleSummaryStatistics对象,这三个对象分别对应(byte/short/int)|long|(float/double)。
为了获得这个统计对象你可以使用Collectors中的summarizingInt|Long|Double方法的一个,这三个方法都需要一个将元素映射成对应类型的方法。
得到这个统计对象之后你可以调用这个对象的方法获得平均值、最大值、总值等数据。
例如,你希望对一个包含字符串的流中元素的平均长度做统计,那么你首先要提供将字符串映射成其长度的方法,然后将这些长度约简成SummaryStatistics对象,最后调用getAverage方法返回结果。

        IntSummaryStatistics intSummaryStatistics = Stream.of("", "I love you", "hua hua", "very very very much").collect(Collectors.summarizingInt(String::length));
        System.out.println(intSummaryStatistics.getAverage()); //9.0


7.4 收集元素到映射表中

使用Collectors的toMap方法可以返回一个Map,使用该方法需要提供两个方法用于为每个元素产生对应的key与value。

       Map map = Stream.of("abc", "def").collect(Collectors.toMap(Collectors.toMap(str -> str.substring(0, 1), Function.identity()));//key为串对应的首字符, value为串本身

再收集成map时,如果产生的key相同则会抛出一个异常,为了处理这种情况,可以再传入第三个参数,该参数方法是一个两个参数的方法用于处理两个key相同的情况。

第三个参数方法的两个参数为流中的元素经过第二个参数方法处理产生的值,而不是产生的key。

        Stream.of("abc", "def", "abc").collect(Collectors.toMap(str -> str.substring(0, 1), Function.identity(), (str1, str2) -> str1)); //若出现相同元素则只保留一个

如果我们想把冲突的元素合并成一个Set集,该怎么操作呢?首先,我们先用第二个参数方法将元素处理成一个单例Set,当发生key冲突时,再由第三个参数方法将冲突元素对应的集合合并起来,代码如下:

        Map> collect = Stream.of("abc", "abd", "bbq", "aaa").collect(Collectors.toMap(
                str -> str.substring(0, 1),
                str -> Collections.singleton(str), //对于每个元素都会返回单例集合
                (char1, char2) -> {
                    Set union = new HashSet<>(char1); //元素生成的key冲突,由单例集合生成Set,再将两个Set合并起来
                    union.addAll(char2);
                    return union;
                }));

如果想要得道具体类型的map,可以将构造器作为第四个参数传入。

    public static void main(String[] args) {
        Map> collect = Stream.of("abc", "abd", "bbq", "aaa").collect(Collectors.toMap(
                str -> str.substring(0, 1),
                str -> Collections.singleton(str), //对于每个元素都会返回单例集合
                (char1, char2) -> {
                    Set union = new HashSet<>(char1); //元素生成的key冲突,由单例集合生成Set,再将两个Set合并起来
                    union.addAll(char2);
                    return union;
                },
                TreeMap::new));

每一个toMap方法都有一个对应的toConcurrentMap方法获得支持并发的Map,并发Map就支持并行集合处理。当使用并行流时,共享的映射表比合并映射表更加高效。

7.5 分组收集

在上一个部分,可以看到将字符串的首字符作为key,首字母相同的串放置到一个集合中的收集map方法。这个方法比较复杂相当于手写实现了很多代码。实际上这是一个根据某种条件进行分组的功能——具有相同条件的为一组,收集器提供了groupingBy方法来支持这个功能,该方法需要传入一个方法参数作为分类函数(Function函数式接口),该方法返回的结果作为每个组的key,收集元素时,具有相同key的将作为同一个组。

        Map> collect = Stream.of("abc", "abd", "bbq", "aaa").collect(Collectors.groupingBy(str -> str.substring(0, 1)));

如果分类函数返回的是布尔值时,则组只会有true、false对应的两个。此时应该使用更高效的partitioningBy方法达到相同效果。

7.6 下游收集器

直接使用groupingBy方法将会把一个组作为List返回,若需要对这个组进一步处理,则可以将一个收集器作为第二个参数传递而groupingBy。这个过程称为下游收集器。其使用方法与之前传入到connect方法中的收集器用法类似,这个收集器将作用到一组的元素上并决定返回Map的value类型。
例如,你可以传入Collectors的toSet()方法,返回一个Set而不是List,或者counting方法统计一组的元素总数。也可以使用summingInt|Double|Long方法根据某个对元素能返回数值的方法对一组元素求和并返回一个summarizingInt|Double|Long作为Map的value。例如所有按字符串首字母分组的字符串总长度:

        Map collect = Stream.of("abc", "abd", "bbq", "aaa").collect(
                Collectors.groupingBy(str -> str.substring(0, 1),
                        Collectors.summarizingInt(String::length))); 

我们也可以使用收集器的mapping方法传入两个参数,第一个参数为一个方法参数可以对组中的所有元素进行处理,这类似与流的map方法过程。第二个参数为一个收集器,对过滤后的元素进行收集。
例如:字符串的每一组希望能够存储字符串的长度而不是字符串本身。那么就需要使用下游收集器收集组元素并使用mapping方法对组中每个元素进行处理。

        Map> collect = Stream.of("abc", "abd", "bbq", "aaa").collect(
                Collectors.groupingBy(str -> str.substring(0, 1),
                        Collectors.mapping(String::length, Collectors.toList())));

8.并行流

并行流使用java 7的fork-join框架来通过多线程并行执行任务完成对流中元素的处理。

8.1 并行流的创建

可以使用Collection接口的parallelStream方法从任何集合中获取一个并行流。
也可以通过流的parallel方法将调用者转换为一个并行流。

        List list = new ArrayList<>(){{add(1); add(2); add(4); add(5);}};
        Stream parallelStream = list.parallelStream();
        Stream parallelStream2 = Stream.of(1, 2, 3).parallel();
8.2 并行执行

只要终结方法执行时,并行流的所有中间操作都会并行化处理。
流并行执行时,其目标是返回结果的顺序与顺序执行一致,而各个并行部分的操作应该可以以任意顺序执行。
如果某些对流的操作对顺序不感兴趣可以调用一次unordered方法以使这些方法获益,如distinct、limit。
不是所有流都应该转变为并行的,使并行流可以正常工作需要满足大量条件:
1.数据应在内存中
2.流应该可以被高效的分为若干个子部分(如由数组与平衡二叉树支撑的流)
3.流操作的工作量应该有较大的规模
4.流操作不应被阻塞

不要修改在某项流操作后会将元素返回到流的数据源。