Spring 负责所有底层事务管理细节 , 并为不同的事务 API 提供一致的编程模型 , 但有多少人真正了解它在多线程环境中的行为方式?是否可以在多个线程中打开事务并写入数据?
让我们退一步思考一下EntityManager 。
EntityManager的工作是与一个会话或被它管理的对象的缓存一起进行的 。 这意味着它有一个状态 , 而在几个线程之间共享状态会导致竞争条件;
所以 , 第一条规则是每个线程使用一个EntityManager 。
事实上 , Spring负责保持每个线程的事务性上下文 。
假设我们想并行处理一个对象列表并将其存储在数据库中 。 我们想把这些对象分成专门的小块 , 并把每个小块传递给一个单独线程中的处理方法 。 然后 , 每个线程中处理的结果应该被收集起来并呈现给用户 。
我将从定义一个负责处理的服务接口开始:
<font><i>/**
* Service interface defining the contract for object identifiers processing
*/</i></font><font><b>public</b> <b>interface</b> ProcessingService { </font><font><i>/**
* Processes the list of objects identified by id and returns a an identifiers
* list of the successfully processed objects
*
* @param objectIds List of object identifiers
*
* @return identifiers list of the successfully processed objects
*/</i></font><font>
List<Integer> processObjects(List objectIds);
</font>
这项服务的默认实现是基于数据库存储的 。 然而 , 这个例子是非常简单的:
<font><i>/**
* Service implementation for database related ids processing
*/</i></font><font>@Service(</font><font>\"ProcessingDBService\"</font><font>)<b>public</b> <b>class</b> ProcessingDBService implements ProcessingService {
<b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Transactional
@Override <b>public</b> List processObjects(List objectIds) { </font><font><i>// Process and save to DB</i></font><font>
logger.info(</font><font>\"Running in thread \"</font><font> + Thread.currentThread().getName() + </font><font>\" with object ids \"</font><font> + objectIds.toString());
<b>return</b> objectIds.stream().collect(Collectors.toList());
</font>
现在 , 我们希望有一个选项 , 可以在块chunk(类似队列一样数据块)和并行进程中运行这个处理 。
为了保持代码的干净和与运行时环境的解耦 , 我们将使用装饰器模式 , 如下所示 。
<font><i>/**
* Service implementation for parallel chunk processing
*/</i></font><font>
@Service
@Primary
@ConditionalOnProperty(prefix = </font><font>\"service\"</font><font> name = </font><font>\"parallel\"</font><font> havingValue = https://mparticle.uc.cn/api//"true\"</font><font>)
<b>public</b> <b>class</b> ProcessingServiceParallelRunDecorator implements ProcessingService {
<b>private</b> ProcessingService delegate;
<b>public</b> ProcessingServiceParallelRunDecorator(ProcessingService delegate) {
<b>this</b>.delegate = delegate;
</font><font><i>/**
* In a real scenario it should be an external configuration
*/</i></font><font>
<b>private</b> <b>int</b> batchSize = 10;
@Override
<b>public</b> List<Integer> processObjects(List objectIds) { List<List<Integer>> chuncks = getBatches(objectIds batchSize); List<List<Integer>> processedObjectIds = chuncks.parallelStream().map(delegate::processObjects)
.collect(Collectors.toList());
<b>return</b> flatList(processedObjectIds);
- 小米科技|性价比爆炸!2000元内唯一搭载联发科天玑8100的手机来了
- 一加科技|不止一代神U!最好的天玑8100手机怎么打造?一加交出答卷
- 河北网络广播电视台 |武汉飞训科技新计划,即将亮相北京教育装备展览会!
- 小米科技|小米11跌成“红米价”,2K屏+256GB大存储,还值得买吗?
- 小米科技|小米POCO C40将发布?搭载JR510处理器和支持MIUI Go操作系统!
- 高通骁龙|中国科技再次“突破”,创造业界最远纪录!外媒:不可思议!
- 一加科技|击败海尔和美的,连续12年全球出口第一,国产冰箱诞生隐形冠军
- 小米科技|iQOONeo6与iQOONeo5详细对比:有什么区别?
- 小米科技|都是3999元,小米11 Ultra对比荣耀Magic4,差距不是一星半点
- 小米科技|京东单方面宣布延迟发货之后,为什么网友们的怨气这么大?