产业气象站|并行流ParallelStream中隐藏的陷阱


产业气象站|并行流ParallelStream中隐藏的陷阱
文章图片
前提
这篇文章介绍一下日常开发中并行流ParallelStream中隐藏的陷阱 , 这个问题其实离我们很近 , 特别是喜欢使用JDK1.8+的流式编程的伙伴 , 应该会深有感触 。 标题中所谓的"陷阱" , 其实并不是ParallelStream自身的陷阱 , 而一般是开发者错误使用ParallelStream给自己埋下的陷阱 。
一个故意而为的例子下面举一个故意而为的例子 , 实际上应该不会有类似的业务代码:
publicclassParallelStreamMain{publicstaticvoidmain(String[]args)throwsException{List&ltList&gtarray=newArrayList&lt&gt()Listitem1=newArrayList&lt&gt()Listitem2=newArrayList&lt&gt()Listtarget=newArrayList&lt&gt(100)array.add(item1)array.add(item2)array.parallelStream().forEach(x-&gt{for(inti=0i&lt100000i++){target.add(i)}})System.out.println(target.size())}}
某一次执行结果为:163913 。 如果不停地执行这个main方法 , 最终都会得到一个非200000的结果 , 这里的问题就在于使用了并行流parallelStream()方法 。 ParallelStream底层使用了Fork/Join框架实现 , 也就是应用了线程池ForkJoinPool把并行流中的节点抽象为ForkJoinTask进行计算 , 背后用到的"任务窃取"等原理这里就不进行展开 , 只需要明确:
ForkJoinPool一般使用Runtime.getRuntime().availableProcessors()(此值一般认为是物理机器的逻辑核心数量)作为并行度(parallelism) , 简单认为是可并发执行的任务数 , 并不是工作线程数 。 多核机器中 , 使用ParallelStream在流的节点中的所有操作都相当于在「一个多线程环境中」进行操作 , 里面的所有操作都会产生不可预期的结果 , 例如可能会数组越界、添加元素丢失、部分下标index的引用为NULL等等 。 一个仿真例子写这篇文章不是有意为之 , 其实很早之前笔者曾经遇到一个比较隐蔽的生产故障 , 其中有一段访问量比较低的代码大致如下:
@DataprivatestaticclassOrderDTO{privateStringorderIdprivateOrderStatusorderStatusprivateBigDecimalamountprivateLongcustomerId}@DataprivatestaticclassOrder{privateLongidprivateStringorderIdprivateIntegerorderStatusprivateBigDecimalamountprivateLongcustomerIdprivateOffsetDateTimecreateTimeprivateOffsetDateTimeeditTime}publicvoidgroupByOrderStatus(LongcustomerId){Listorders=orderDao.selectByCustomerId(customerId)ListorderDTOList=newArrayList&lt&gt()orders.parallelStream().forEach(order-&gt{OrderDTOdto=newOrderDTO()......orderDTOList.add(dto)})Map&ltString,List&gtcollect=orderDTOList.stream().collect(Collectors.groupingBy(item-&gtitem.getOrderStatus().getCode()))......}
该方法的功能是通过客户ID查询订单列表 , 然后把订单列表转化为OrderDTO列表 , 然后再按照订单状态字段进行分组 。 通过生产日志和测试回归发现 , 上面的代码段中groupByOrderStatus()方法会偶发空指针异常 。
【产业气象站|并行流ParallelStream中隐藏的陷阱】初次出现问题的时候 , 由于开发者通过Lambda表达式把多处代码压缩为1行 , 所以从异常栈比较难排查具体发生问题的代码 , 后面把Lambda表达式以句点起点拆分为多行上线后观察一段时间 , 最终定位到发生空指针异常的代码段为Collectors.groupingBy(item-&gtitem.getOrderStatus().getCode()) , 也就是OrderDTO实例中的orderStatus为空对象 。 这里显然 , groupByOrderStatus()方法其实是被封闭在线程栈中调用 , 本不应该有多个线程去并发修改其中的内容 , 这里只剩下一个疑点:使用了parallelStream() 。 后来直接把parallelStream()修改为stream()重新上线 , 该空指针问题不再复现 。