上节初步介绍了 Java 8 中的函数式数据处理,对于 collect 方法,我们只是演示了其最基本的应用,它还有很多强大的功能,比如,可以分组统计汇总,实现类似数据库查询语言 SQL 中的 group by 功能。
具体都有哪些功能?有什么用?如何使用?基本原理是什么?本节进行详细讨论,我们先来进一步理解下 collect 方法。
理解 collect
在上节中,过滤得到 90 分以上的学生列表,代码是这样的:
- List<Student> above90List = students.stream()
- .filter(t->t.getScore()>90)
- .collect(Collectors.toList());
最后的 collect 调用看上去很神奇,它到底是怎么把 Stream 转换为 List<Student> 的呢?先看下 collect 方法的定义:
- <R, A> R collect(Collector<? super T, A, R> collector)
它接受一个收集器 collector 作为参数,类型是 Collector,这是一个接口,它的定义基本是:
- public interface Collector<T, A, R> {
- Supplier<A> supplier();
- BiConsumer<A, T> accumulator();
- BinaryOperator<A> combiner();
- Function<A, R> finisher();
- Set<Characteristics> characteristics();
- }
在顺序流中,collect 方法与这些接口方法的交互大概是这样的:
- //首先调用工厂方法supplier创建一个存放处理状态的容器container,类型为A
- A container = collector.supplier().get();
- //然后对流中的每一个元素t,调用累加器accumulator,参数为累计状态container和当前元素t
- for (T t : data)
- collector.accumulator().accept(container, t);
- //最后调用finisher对累计状态container进行可能的调整,类型转换(A转换为R),并返回结果
- return collector.finisher().apply(container);
combiner 只在并行流中有用,用于合并部分结果。characteristics 用于标示收集器的特征,Collector 接口的调用者可以利用这些特征进行一些优化,Characteristics 是一个枚举,有三个值:CONCURRENT, UNORDERED 和 IDENTITY_FINISH,它们的含义我们后面通过例子简要说明,目前可以忽略。
Collectors.toList() 具体是什么呢?看下代码:
- public static <T>
- Collector<T, ?, List<T>> toList() {
- return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
- (left, right) -> { left.addAll(right); return left; },
- CH_ID);
- }
它的实现类是 CollectorImpl,这是 Collectors 内部的一个私有类,实现很简单,主要就是定义了两个构造方法,接受函数式参数并赋值给内部变量。对 toList 来说:
也就是说,collect(Collectors.toList()) 背后的伪代码如下所示:
- List<T> container = new ArrayList<>();
- for (T t : data)
- container.add(t);
- return container;
与 toList 类似的容器收集器还有 toSet, toCollection, toMap 等,我们来看下。
容器收集器
toSet
toSet 的使用与 toList 类似,只是它可以排重,就不举例了。toList 背后的容器是 ArrayList,toSet 背后的容器是 HashSet,其代码为:
- public static <T>
- Collector<T, ?, Set<T>> toSet() {
- return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add,
- (left, right) -> { left.addAll(right); return left; },
- CH_UNORDERED_ID);
- }
CH_UNORDERED_ID 是一个静态变量,它的特征有两个,一个是 IDENTITY_FINISH,表示返回结果即为 Supplier 创建的 HashSet,另一个是 UNORDERED,表示收集器不会保留顺序,这也容易理解,因为背后容器是 HashSet。
toCollection
toCollection 是一个通用的容器收集器,可以用于任何 Collection 接口的实现类,它接受一个工厂方法 Supplier 作为参数,具体代码为:
- public static <T, C extends Collection<T>>
- Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) {
- return new CollectorImpl<>(collectionFactory, Collection<T>::add,
- (r1, r2) -> { r1.addAll(r2); return r1; },
- CH_ID);
- }
比如,如果希望排重但又希望保留出现的顺序,可以使用 LinkedHashSet,Collector 可以这么创建:
- Collectors.toCollection(LinkedHashSet: :new)
toMap
toMap 将元素流转换为一个 Map,我们知道,Map 有键和值两部分,toMap 至少需要两个函数参数,一个将元素转换为键,另一个将元素转换为值,其基本定义为:
- public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(
- Function<? super T, ? extends K> keyMapper,
- Function<? super T, ? extends U> valueMapper)
返回结果为 Map<K,U>,keyMapper 将元素转换为键,valueMapper 将元素转换为值。比如,将学生流转换为学生名称和分数的 Map,代码可以为:
- Map < String,
- Double > nameScoreMap = students.stream().collect(Collectors.toMap(Student: :getName, Student: :getScore));
这里,Student::getName 是 keyMapper,Student::getScore 是 valueMapper。
实践中,经常需要将一个对象列表按主键转换为一个 Map,以便以后按照主键进行快速查找,比如,假定 Student 的主键是 id,希望转换学生流为学生 id 和学生对象的 Map,代码可以为:
- Map < String,
- Student > byIdMap = students.stream().collect(Collectors.toMap(Student: :getId, t - >t));
t->t 是 valueMapper,表示值就是元素本身,这个函数用的比较多,接口 Function 定义了一个静态函数 identity 表示它,也就是说,上面的代码可以替换为:
- Map < String,
- Student > byIdMap = students.stream().collect(Collectors.toMap(Student: :getId, Function.identity()));
上面的 toMap 假定元素的键不能重复,如果有重复的,会抛出异常,比如:
- Map < String,
- Integer > strLenMap = Stream.of("abc", "hello", "abc").collect(Collectors.toMap(Function.identity(), t - >t.length()));
希望得到字符串与其长度的 Map,但由于包含重复字符串 "abc",程序会抛出异常。这种情况下,我们希望的是程序忽略后面重复出现的元素,这时,可以使用另一个 toMap 函数:
- public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(
- Function<? super T, ? extends K> keyMapper,
- Function<? super T, ? extends U> valueMapper,
- BinaryOperator<U> mergeFunction)
相比前面的 toMap,它接受一个额外的参数 mergeFunction,它用于处理冲突,在收集一个新元素时,如果新元素的键已经存在了,系统会将新元素的值与键对应的旧值一起传递给 mergeFunction 得到一个值,然后用这个值给键赋值。
对于前面字符串长度的例子,新值与旧值其实是一样的,我们可以用任意一个值,代码可以为:
- Map<String,Integer> strLenMap = Stream.of("abc","hello","abc").collect(
- Collectors.toMap(Function.identity(),
- t->t.length(), (oldValue,value)->value));
有时,我们可能希望合并新值与旧值,比如一个联系人列表,对于相同的联系人,我们希望合并电话号码,mergeFunction 可以定义为:
- BinaryOperator < String > mergeFunction = (oldPhone, phone) - >oldPhone + "," + phone;
toMap 还有一个更为通用的形式:
- public static <T, K, U, M extends Map<K, U>> Collector<T, ?, M> toMap(
- Function<? super T, ? extends K> keyMapper,
- Function<? super T, ? extends U> valueMapper,
- BinaryOperator<U> mergeFunction,
- Supplier<M> mapSupplier)
相比前面的 toMap,多了一个 mapSupplier,它是 Map 的工厂方法,对于前面两个 toMap,其 mapSupplier 其实是 HashMap::new。我们知道,HashMap 是没有任何顺序的,如果希望保持元素出现的顺序,可以替换为 LinkedHashMap,如果希望收集的结果排序,可以使用 TreeMap。
toMap 主要用于顺序流,对于并发流,Collectors 有专门的名称为 toConcurrentMap 的收集器,它内部使用 ConcurrentHashMap,用法类似,具体我们就不讨论了。
字符串收集器
除了将元素流收集到容器中,另一个常见的操作是收集为一个字符串。比如,获取所有的学生名称,用逗号连接起来,传统上,代码看上去像这样:
- StringBuilder sb = new StringBuilder();
- for (Student t: students) {
- if (sb.length() > 0) {
- sb.append(",");
- }
- sb.append(t.getName());
- }
- return sb.toString();
针对这种常见的需求,Collectors 提供了 joining 收集器:
- public static Collector<CharSequence, ?, String> joining()
- public static Collector<CharSequence, ?, String> joining(CharSequence delimiter)
- public static Collector<CharSequence, ?, String> joining(
- CharSequence delimiter, CharSequence prefix, CharSequence suffix)
第一个就是简单的把元素连接起来,第二个支持一个分隔符,第三个更为通用,可以给整个结果字符串加个前缀和后缀。比如:
- String result = Stream.of("abc","老马","hello")
- .collect(Collectors.joining(",", "[", "]"));
- System.out.println(result);
输出为:
- [abc, 老马, hello]
joining 的内部也利用了 StringBuilder,比如,第一个 joining 函数的代码为:
- public static Collector<CharSequence, ?, String> joining() {
- return new CollectorImpl<CharSequence, StringBuilder, String>(
- StringBuilder::new, StringBuilder::append,
- (r1, r2) -> { r1.append(r2); return r1; },
- StringBuilder::toString, CH_NOID);
- }
supplier 是 StringBuilder::new,accumulator 是 StringBuilder::append,finisher 是 StringBuilder::toString,CH_NOID 表示特征集为空。
分组
分组类似于数据库查询语言 SQL 中的 group by 语句,它将元素流中的每个元素分到一个组,可以针对分组再进行处理和收集,分组的功能比较强大,我们逐步来说明。
为便于举例,我们先修改下学生类 Student,增加一个字段 grade,表示年级,改下构造方法:
- public Student(String name, String grade, double score) {
- this.name = name;
- this.grade = grade;
- this.score = score;
- }
示例学生列表 students 改为:
- static List<Student> students = Arrays.asList(new Student[] {
- new Student("zhangsan", "1", 91d),
- new Student("lisi", "2", 89d),
- new Student("wangwu", "1", 50d),
- new Student("zhaoliu", "2", 78d),
- new Student("sunqi", "1", 59d)});
基本用法
最基本的分组收集器为:
- public static <T, K> Collector<T, ?, Map<K, List<T>>>
- groupingBy(Function<? super T, ? extends K> classifier)
参数是一个类型为 Function 的分组器 classifier,它将类型为 T 的元素转换为类型为 K 的一个值,这个值表示分组值,所有分组值一样的元素会被归为同一个组,放到一个列表中,所以返回值类型是 Map<K, List<T>>。 比如,将学生流按照年级进行分组,代码为:
- Map < String,
- List < Student >> groups = students.stream().collect(Collectors.groupingBy(Student: :getGrade));
学生会分为两组,第一组键为 "1",分组学生包括 "zhangsan", "wangwu" 和 "sunqi",第二组键为 "2",分组学生包括 "lisi", "zhaoliu"。
这段代码基本等同于如下代码:
- Map < String,
- List < Student >> groups = new HashMap < >();
- for (Student t: students) {
- String key = t.getGrade();
- List < Student > container = groups.get(key);
- if (container == null) {
- container = new ArrayList < >();
- groups.put(key, container);
- }
- container.add(t);
- }
- System.out.println(groups);
显然,使用 groupingBy 要简洁清晰的多,但它到底是怎么实现的呢?
基本原理
groupingBy 的代码为:
- public static <T, K> Collector<T, ?, Map<K, List<T>>>
- groupingBy(Function<? super T, ? extends K> classifier) {
- return groupingBy(classifier, toList());
- }
它调用了第二个 groupingBy 方法,传递了 toList 收集器,其代码为:
- public static <T, K, A, D>
- Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
- Collector<? super T, A, D> downstream) {
- return groupingBy(classifier, HashMap::new, downstream);
- }
这个方法接受一个下游收集器 downstream 作为参数,然后传递给下面更通用的函数:
- public static <T, K, D, A, M extends Map<K, D>>
- Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
- Supplier<M> mapFactory,
- Collector<? super T, A, D> downstream)
classifier 还是分组器,mapFactory 是返回 Map 的工厂方法,默认是 HashMap::new,downstream 表示下游收集器,下游收集器负责收集同一个分组内元素的结果。
对最通用的 groupingBy 函数返回的收集器,其收集元素的基本过程和伪代码为:
- //先创建一个存放结果的Map
- Map map = mapFactory.get();
- for (T t: data) {
- // 对每一个元素,先分组
- K key = classifier.apply(t);
- // 找存放分组结果的容器,如果没有,让下游收集器创建,并放到Map中
- A container = map.get(key);
- if (container == null) {
- container = downstream.supplier().get();
- map.put(key, container);
- }
- // 将元素交给下游收集器(即分组收集器)收集
- downstream.accumulator().accept(container, t);
- }
- // 调用分组收集器的finisher方法,转换结果
- for (Map.Entry entry: map.entrySet()) {
- entry.setValue(downstream.finisher().apply(entry.getValue()));
- }
- return map;
在最基本的 groupingBy 函数中,下游收集器是 toList,但下游收集器还可以是其他收集器,甚至是 groupingBy,以构成多级分组,下面我们来看更多的示例。
分组计数、找最大 / 最小元素
将元素按一定标准分为多组,然后计算每组的个数,按一定标准找最大或最小元素,这是一个常见的需求,Collectors 提供了一些对应的收集器,一般用作下游收集器,比如:
- //计数
- public static <T> Collector<T, ?, Long> counting()
- //计算最大值
- public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator)
- //计算最小值
- public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> comparator)
还有更为通用的名为 reducing 的归约收集器,我们就不介绍了,下面,看一些例子。
为了便于使用 Collectors 中的方法,我们将其中的方法静态导入,即加入如下代码:
- import static java.util.stream.Collectors. * ;
统计每个年级的学生个数,代码可以为:
- Map < String,
- Long > gradeCountMap = students.stream().collect(groupingBy(Student: :getGrade, counting()));
统计一个单词流中每个单词的个数,按出现顺序排序,代码示例为:
- Map<String, Long> wordCountMap =
- Stream.of("hello","world","abc","hello").collect(
- groupingBy(Function.identity(), LinkedHashMap::new, counting()));
获取每个年级分数最高的一个学生,代码可以为:
- Map<String, Optional<Student>> topStudentMap = students.stream().collect(
- groupingBy(Student::getGrade,
- maxBy(Comparator.comparing(Student::getScore))));
需要说明的是,这个分组收集结果是 Optional<Student>,而不是 Student,这是因为 maxBy 处理的流可能是空流,但对我们的例子,这是不可能的,为了直接得到 Student,可以使用 Collectors 的另一个收集器 collectingAndThen,在得到 Optional<Student> 后调用 Optional 的 get 方法,如下所示:
- Map<String, Student> topStudentMap = students.stream().collect(
- groupingBy(Student::getGrade,
- collectingAndThen(
- maxBy(Comparator.comparing(Student::getScore)),
- Optional::get)));
关于 collectingAndThen,我们待会再进一步讨论。
分组数值统计
除了基本的分组计数,还经常需要进行一些分组数值统计,比如求学生分数的和、平均分、最高分 / 最低分等,针对 int,long 和 double 类型,Collectors 提供了专门的收集器,比如:
- //求平均值,int和long也有类似方法
- public static <T> Collector<T, ?, Double>
- averagingDouble(ToDoubleFunction<? super T> mapper)
- //求和,long和double也有类似方法
- public static <T> Collector<T, ?, Integer>
- summingInt(ToIntFunction<? super T> mapper)
- //求多种汇总信息,int和double也有类似方法
- //LongSummaryStatistics包括个数、最大值、最小值、和、平均值等多种信息
- public static <T> Collector<T, ?, LongSummaryStatistics>
- summarizingLong(ToLongFunction<? super T> mapper)
比如,按年级统计学生分数信息,代码可以为:
- Map<String, DoubleSummaryStatistics> gradeScoreStat =
- students.stream().collect(
- groupingBy(Student::getGrade,
- summarizingDouble(Student::getScore)));
分组内的 map
对于每个分组内的元素,我们感兴趣的可能不是元素本身,而是它的某部分信息,在上节介绍的 Stream API 中,Stream 有 map 方法,可以将元素进行转换,Collectors 也为分组元素提供了函数 mapping,如下所示:
- public static <T, U, A, R>
- Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
- Collector<? super U, A, R> downstream)
交给下游收集器 downstream 的不再是元素本身,而是应用转换函数 mapper 之后的结果。比如,对学生按年级分组,得到学生名称列表,代码可以为:
- Map<String, List<String>> gradeNameMap =
- students.stream().collect(
- groupingBy(Student::getGrade,
- mapping(Student::getName, toList())));
- System.out.println(gradeNameMap);
输出为:
- {1=[zhangsan, wangwu, sunqi], 2=[lisi, zhaoliu]}
分组结果处理 (filter/sort/skip/limit)
对分组后的元素,我们可以计数,找最大 / 最小元素,计算一些数值特征,还可以转换后 (map) 再收集,那可不可以像上节介绍的 Stream API 一样,进行排序 (sort)、过滤(filter)、限制返回元素(skip/limit) 呢?Collector 没有专门的收集器,但有一个通用的方法:
- public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(
- Collector<T,A,R> downstream, Function<R,RR> finisher)
这个方法接受一个下游收集器 downstream 和一个 finisher,返回一个收集器,它的主要代码为:
- return new CollectorImpl<>(downstream.supplier(),
- downstream.accumulator(),
- downstream.combiner(),
- downstream.finisher().andThen(finisher),
- characteristics);
也就是说,它在下游收集器的结果上又调用了 finisher。利用这个 finisher,我们可以实现多种功能,下面看一些例子。
收集完再排序,可以定义如下方法:
- public static < T > Collector < T,
- ?,
- List < T >> collectingAndSort(Collector < T, ?, List < T >> downstream, Comparator < ?super T > comparator) {
- return Collectors.collectingAndThen(downstream, (r) - >{
- r.sort(comparator);
- return r;
- });
- }
比如,将学生按年级分组,分组内学生按照分数由高到低进行排序,利用这个方法,代码可以为:
- Map<String, List<Student>> gradeStudentMap =
- students.stream().collect(
- groupingBy(Student::getGrade,
- collectingAndSort(toList(),
- Comparator.comparing(Student::getScore).reversed())));
针对这个需求,也可以先对流进行排序,然后再分组。
收集完再过滤,可以定义如下方法:
- public static < T > Collector < T,
- ?,
- List < T >> collectingAndFilter(Collector < T, ?, List < T >> downstream, Predicate < T > predicate) {
- return Collectors.collectingAndThen(downstream, (r) - >{
- return r.stream().filter(predicate).collect(Collectors.toList());
- });
- }
比如,将学生按年级分组,分组后,每个分组只保留不及格的学生 (低于 60 分),利用这个方法,代码可以为:
- Map<String, List<Student>> gradeStudentMap =
- students.stream().collect(
- groupingBy(Student::getGrade,
- collectingAndFilter(toList(), t->t.getScore()<60)));
针对这个需求,也可以先对流进行过滤,然后再分组。
收集完,只返回特定区间的结果,可以定义如下方法:
- public static <T> Collector<T, ?, List<T>> collectingAndSkipLimit(
- Collector<T, ?, List<T>> downstream, long skip, long limit) {
- return Collectors.collectingAndThen(downstream, (r) -> {
- return r.stream().skip(skip).limit(limit).collect(Collectors.toList());
- });
- }
比如,将学生按年级分组,分组后,每个分组只保留前两名的学生,代码可以为:
- Map<String, List<Student>> gradeStudentMap =
- students.stream()
- .sorted(Comparator.comparing(Student::getScore).reversed())
- .collect(groupingBy(Student::getGrade,
- collectingAndSkipLimit(toList(), 0, 2)));
这次,我们先对学生流进行了排序,然后再进行了分组。
分区
分组的一个特殊情况是分区,就是将流按 true/false 分为两个组,Collectors 有专门的分区函数:
- public static <T> Collector<T, ?, Map<Boolean, List<T>>>
- partitioningBy(Predicate<? super T> predicate)
- public static <T, D, A> Collector<T, ?, Map<Boolean, D>>
- partitioningBy(Predicate<? super T> predicate,
- Collector<? super T, A, D> downstream)
第一个的下游收集器为 toList(),第二个可以指定一个下游收集器。
比如,将学生按照是否及格 (大于等于 60 分) 分为两组,代码可以为:
- Map < Boolean,
- List < Student >> byPass = students.stream().collect(partitioningBy(t - >t.getScore() >= 60));
按是否及格分组后,计算每个分组的平均分,代码可以为:
- Map<Boolean, Double> avgScoreMap = students.stream().collect(
- partitioningBy(t->t.getScore()>=60,
- averagingDouble(Student::getScore)));
多级分组
groupingBy 和 partitioningBy 都可以接受一个下游收集器,而下游收集器又可以是分组或分区。
比如,按年级对学生分组,分组后,再按照是否及格对学生进行分区,代码可以为:
- Map<String, Map<Boolean, List<Student>>> multiGroup =
- students.stream().collect(
- groupingBy(Student::getGrade,
- partitioningBy(t->t.getScore()>=60)));
小结
本节主要讨论了各种收集器,包括容器收集器、字符串收集器、分组和分区收集器等。
对于分组和分区,它们接受一个下游收集器,对同一个分组或分区内的元素进行进一步收集,下游收集器还可以是分组或分区,以构建多级分组,有一些收集器主要用于分组,比如 counting, maxBy, minBy, summarizingDouble 等。
mapping 和 collectingAndThen 也都接受一个下游收集器,mapping 在把元素交给下游收集器之前先进行转换,而 collectingAndThen 对下游收集器的结果进行转换,组合利用它们,可以构造更为灵活强大的收集器。
至此,关于 Java 8 中的函数式数据处理 Stream API,我们就介绍完了,Stream API 提供了集合数据处理的常用函数,利用它们,可以简洁地实现大部分常见需求,大大减少代码,提高可读性。
对于并发编程,Java 8 也提供了一个新的类 CompletableFuture,类似于 Stream API 对集合数据的流水线式操作,使用 CompletableFuture,可以实现对多个异步任务进行流水线式操作,它具体是什么呢?
(与其他章节一样,本节所有代码位于 https://github.com/swiftma/program-logic,位于包 shuo.laoma.java8.c93 下)
----------------
未完待续,查看最新文章,敬请关注微信公众号 "老马说编程"(扫描下方二维码),从入门到高级,深入浅出,老马和你一起探索 Java 编程及计算机技术的本质。用心原创,保留所有版权。
来源: http://www.cnblogs.com/swiftma/p/7409468.html