如何安全地在并行流中收集数据:避免同步 Lambda,改用 Collector

java 并行流中直接对非线程安全集合(如 arraylist)执行 `foreach` 会导致竞态条件;强行用 `synchronized` 包裹 lambda 不仅违背流式编程原则,还会严重损害性能。正确做法是使用线程安全的 `collectors.tolist()` 等内置收集器。

在 Java 中,IntStream.parallel().forEach(...) 是典型的有副作用(side-effecting)操

——它试图从并行线程中修改共享的 ArrayList 实例。而 ArrayList 本身不是线程安全的,多个线程同时调用 add() 可能导致元素丢失、索引越界,甚至 ConcurrentModificationException,因此最终 data.size() 很可能小于 100(如输出 97、92 等非确定值)。

虽然技术上可通过 synchronized 强制同步(例如 forEach(s -> synchronized(data) { data.add(s); })),但这是反模式

  • ❌ 违反 Stream 设计哲学:流应是无状态、不可变、函数式的数据处理管道;
  • ❌ 引入锁竞争:所有线程排队等待同一把锁,彻底抵消并行优势,性能甚至不如串行;
  • ❌ 掩盖根本问题:用副作用收集数据本就不该是首选方案。

✅ 正确解法是消除副作用,转为声明式收集——使用 collect() 配合线程安全的 Collector:

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Q03 {
    public static void main(String[] args) {
        List data = IntStream.range(0, 100)
                .parallel()                 // 支持并行
                .boxed()                    // int → Integer
                .collect(Collectors.toList()); // 线程安全的归约操作
        System.out.println(data.size()); // 稳定输出:100
    }
}

Collectors.toList() 内部采用分段收集(fork-join 分治):每个线程先构建局部列表,再合并,全程无共享写冲突,天然支持并行且高效。类似地,还可选用:

  • Collectors.toUnmodifiableList()(返回不可变列表,更安全);
  • Collectors.toCollection(ArrayList::new)(若需特定实现类);
  • 自定义 Collector(适用于复杂聚合逻辑)。

⚠️ 注意事项:

  • forEachOrdered() 虽能保证顺序,但会禁用并行优化,不解决线程安全问题;
  • Vector 或 Collections.synchronizedList(new ArrayList()) 仍不能用于 parallel().forEach() —— 同步仅保护单次 add(),无法保证流操作整体原子性;
  • 若必须使用副作用(如日志调试),请限定为 System.out.println() 等无状态操作,并避免修改共享可变状态。

总结:不要同步 lambda,而要重构逻辑。Stream 的力量在于声明“做什么”,而非“怎么做”;让 collect() 承担并发协调职责,代码更简洁、健壮且高性能。