JDK8 特性(三)

2022/5/25 JDK8

# 1 Stream流-续

# 1.7 并行的Stream流

  • 串行的Stream流

    • 目前我们使用的Stream流是串行的,就是在一个线程上执行。
  • 并行的Stream流

    • parallelStream其实就是一个并行执行的流。它通过默认的ForkJoinPool,可能提高多线程任务的速度。

# 获取并行Stream流的两种方式

  1. 直接获取并行的流
public class StreamParallelTest {
    @Test
    public void parallel(){
        ArrayList<Integer> list = new ArrayList<>();     
        // 直接获取并行的流     
        Stream<Integer> stream = list.parallelStream();
    }
}
1
2
3
4
5
6
7
8
  1. 将串行流转成并行流
public class StreamParallelTest {
    @Test
    public void serialToParallel(){
        Stream.of(9, 1, 34, 5, 3, 21, 56, 9)
                // 转成并行流
                .parallel()
                .filter(i -> {
                    System.out.println(Thread.currentThread() + "::" + i);
                    return i > 50;
                })
                .forEach(System.out::println);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 串行和并行效率的比较

  • 使用for循环,串行Stream流,并行Stream流来对10亿个数字求和。看消耗的时间。
public class StreamParallelTest {
    private static final int TIMES = 1000000000;
    long start;

    @Before
    public void init(){
        start = System.currentTimeMillis();
    }

    @Test
    public void useFor(){
        // 消耗时间: 339ms
        int sum = 0;
        for (int i = 0; i < TIMES; i++) {
            sum += i;
        }
    }

    @Test
    public void useSerialStream(){
        // 获取足够长度的串行流进行加和
        // 消耗时间: 631ms
        LongStream.rangeClosed(0, TIMES)
                .reduce(0, Long::sum);
    }

    @Test
    public void useParallelStream(){
        // 获取足够长度的并行流进行加和
        // 消耗时间: 317ms
        LongStream.rangeClosed(0, TIMES)
                .parallel()
                .reduce(0, Long::sum);
    }

    @After
    public void destroy(){
        System.out.println("消耗时间: " +
                (System.currentTimeMillis() - start) +
                "ms");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  • 我们可以看到parallelStream的效率是最高的。
  • Stream并行处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作。

# ParallelStream线程安全问题

public class StreamParallelTest {
    @Test
    public void parallelStreamNotice(){
        List<Integer> list = new ArrayList<>(1000);

        // 并行线程不安全
        // list.size() = 947
        IntStream.range(0, 1000)
                .parallel()
                .forEach(i -> list.add(i));
        System.out.println("list.size() = " + list.size());

        // 解决线程安全问题方案一: 使用同步代码块
        Object lock = new Object();
        IntStream.range(0, 1000)
                .parallel()
                .forEach(i -> {
                    synchronized (lock){
                        list.add(i);
                    }
                });
        System.out.println("list.size() = " + list.size());

        // 解决线程安全问题方案二: 使用线程安全的集合
        // 使用 Vector<Integer> 集合
        Vector<Integer> v = new Vector<>();
        IntStream.range(0, 1000)
                .parallel()
                .forEach(i -> v.add(i));
        System.out.println("v.size() = " + v.size());

        // 解决线程安全问题方案二: 使用线程安全的集合
        // 使用集合工具类提供的线程安全的集合
        List<Integer> synchronizedList = Collections.synchronizedList(list);
        IntStream.range(0, 1000)
                .parallel()
                .forEach(i -> synchronizedList.add(i));
        System.out.println("synchronizedList.size() = " + synchronizedList.size());

        // 解决线程安全问题方案三: 调用Stream流的collect/toArray
        // 使用Stream流的collect
        List<Integer> collect = IntStream.range(0, 1000)
                .parallel()
                .boxed()
                .collect(Collectors.toList());
        System.out.println("collect.size() = " + collect.size());

        // 解决线程安全问题方案三: 调用Stream流的collect/toArray
        // 使用Stream流的toArray
        Integer[] integers = IntStream.range(0, 1000)
                .parallel()
                .boxed()
                .toArray(Integer[]::new);
        System.out.println("integers.length = " + integers.length);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

# 1.8 Fork/Join框架介绍

  • parallelStream使用的是Fork/Join框架。Fork/Join框架自JDK 7引入。Fork/Join框架可以将一个大任务拆分为很多小 任务来异步执行。 Fork/Join框架主要包含三个模块:
    1. 线程池:ForkJoinPool
    2. 任务对象:ForkJoinTask
    3. 执行任务的线程:ForkJoinWorkerThread

# Fork/Join原理-分治法

  • ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成 两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

# Fork/Join原理-工作窃取算法

  • Fork/Join最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个Fork/Join框架的核心理念 Fork/Join工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

  • 那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的 任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任 务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永 远从双端队列的尾部拿任务执行。

  • 工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

  • 上文中已经提到了在Java 8引入了自动并行化的概念。它能够让一部分Java代码自动地以并行的方式执行,也就是我们使用了ForkJoinPool的ParallelStream。

  • 对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。可以通过设置系统属性:java.util.concurrent.ForkJoinPool.common.parallelism=N(N为线程数量),来调整ForkJoinPool的线程数量,可以尝试调整成不同的参数来观察每次的输出结果。

# Fork/Join案例

  • 需求:使用Fork/Join计算1-10000的和,当一个任务的计算数量大于3000时拆分任务,数量小于3000时计算。

public class ForkJoinDemo {

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        ForkJoinPool pool = new ForkJoinPool();
        SumRecursiveTask task = new SumRecursiveTask(1, 10000L);
        Long result = pool.invoke(task);
        System.out.println("result = " + result);
        long end = System.currentTimeMillis();

        System.out.println("消耗的时间为: " + (end - start));
    }

}

class SumRecursiveTask extends RecursiveTask<Long> {

    private static final long THRESHOLD = 3000L;
    private final long start;
    private final long end;

    public SumRecursiveTask(long start, long end){
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long length = end - start;

        if (length <= THRESHOLD) {
            // 任务不用再拆分了.可以计算了
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            // 数量大于预定的数量,任务还需要再拆分
            long middle = (start + end) / 2;

            SumRecursiveTask left = new SumRecursiveTask(start, middle);
            left.fork();
            SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);
            right.fork();
            return left.join() + right.join();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

# 1.9 小结

  1. parallelStream是线程不安全的
  2. parallelStream适用的场景是CPU密集型的,只是做到别浪费CPU,假如本身电脑CPU的负载很大,那还到处用并行流,那并不能起到作用
  3. I/O密集型磁盘I/O、网络I/O都属于I/O操作,这部分操作是较少消耗CPU资源,一般并行流中不适用于I/O密集型的操作,就比如使用并流行进行大批量的消息推送,涉及到了大量I/O,使用并行流反而慢了很多
  4. 在使用并行流的时候是无法保证元素的顺序的,也就是即使你用了同步集合也只能保证元素都正确但无法保证其中的顺序