一加科技|面试必须要掌握的内容:多线程与Spring容器事务机制

一加科技|面试必须要掌握的内容:多线程与Spring容器事务机制


Spring 负责所有底层事务管理细节 , 并为不同的事务 API 提供一致的编程模型 , 但有多少人真正了解它在多线程环境中的行为方式?是否可以在多个线程中打开事务并写入数据?
让我们退一步思考一下EntityManager 。
EntityManager的工作是与一个会话或被它管理的对象的缓存一起进行的 。 这意味着它有一个状态 , 而在几个线程之间共享状态会导致竞争条件;
所以 , 第一条规则是每个线程使用一个EntityManager 。
事实上 , Spring负责保持每个线程的事务性上下文 。
假设我们想并行处理一个对象列表并将其存储在数据库中 。 我们想把这些对象分成专门的小块 , 并把每个小块传递给一个单独线程中的处理方法 。 然后 , 每个线程中处理的结果应该被收集起来并呈现给用户 。
我将从定义一个负责处理的服务接口开始:
<font><i>/**
* Service interface defining the contract for object identifiers processing
*/</i></font><font><b>public</b> <b>interface</b> ProcessingService {    </font><font><i>/**
    * Processes the list of objects identified by id and returns a an identifiers
    * list of the successfully processed objects
    *
    * @param objectIds List of object identifiers
    *
    * @return identifiers list of the successfully processed objects
    */</i></font><font>
   List<Integer> processObjects(List objectIds);
</font>

这项服务的默认实现是基于数据库存储的 。 然而 , 这个例子是非常简单的:
<font><i>/**
* Service implementation for database related ids processing
*/</i></font><font>@Service(</font><font>\"ProcessingDBService\"</font><font>)<b>public</b> <b>class</b> ProcessingDBService implements ProcessingService {    
   <b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   
   @Transactional
   @Override    <b>public</b> List processObjects(List objectIds) {        </font><font><i>// Process and save to DB</i></font><font>
       
   logger.info(</font><font>\"Running in thread \"</font><font> + Thread.currentThread().getName() + </font><font>\" with object ids \"</font><font> + objectIds.toString());        
   <b>return</b> objectIds.stream().collect(Collectors.toList());
   
</font>

现在 , 我们希望有一个选项 , 可以在块chunk(类似队列一样数据块)和并行进程中运行这个处理 。
为了保持代码的干净和与运行时环境的解耦 , 我们将使用装饰器模式 , 如下所示 。
<font><i>/**
* Service implementation for parallel chunk processing
*/</i></font><font>
@Service
@Primary
@ConditionalOnProperty(prefix = </font><font>\"service\"</font><font> name = </font><font>\"parallel\"</font><font> havingValue = https://mparticle.uc.cn/api//"true\"</font><font>)
<b>public</b> <b>class</b> ProcessingServiceParallelRunDecorator implements ProcessingService {

   <b>private</b> ProcessingService delegate;
   
   <b>public</b> ProcessingServiceParallelRunDecorator(ProcessingService delegate) {
       <b>this</b>.delegate = delegate;
   

   </font><font><i>/**
    * In a real scenario it should be an external configuration
    */</i></font><font>
   <b>private</b> <b>int</b> batchSize = 10;

   @Override
   <b>public</b> List<Integer> processObjects(List objectIds) {        List<List<Integer>> chuncks = getBatches(objectIds batchSize);        List<List<Integer>> processedObjectIds = chuncks.parallelStream().map(delegate::processObjects)
               .collect(Collectors.toList());

       <b>return</b> flatList(processedObjectIds);