起因
最近在公司做到一块业务是需要创建任务然后下发任务。下发任务时需要批量给各个部门下发、涉及到数据量大,每个任务中的表格如果再配置下发规则执行起了就会更慢。本着多一事不如少一事的原则。不太想管,但扛不住PO天天催,没办法只能赶鸭子上架进行重构了。
第一版
第一版功能设计时就考虑到了数据量会很大,想到操作时会很耗时总不能前端一直挂起让等着,故同步方式肯定不行,所以最终采用异步、单线程的方式下发,反正功能时效性又不强,数据量大异步下发就等久点而已。
结果显而易见,客户要求下发任务必须要要求不能超过三个小时。而经过我自己的测试,只能用离谱🫨来形容。
测试机配置如下:
没错,就是6.5个小时,当初设计该功能时,都没想到会用这么久,那还愣着干啥,改呗。
第二版
经过评审,采用多线程的方式下发,并优化每次下发前根据规则查询表格中数据的次数。改造后不能失去事物控制(下发成功的不回滚,只对失败的线程进行回滚,提高二次下发效率),还需要对下发失败的部门进行记录,以便重新下发。
下面只展示部分核心代码。
该方法主要是查询下发任务的任务信息,并根据部门计算共用多少线程执行下发操作,每个线程具体负责多少部门,计算后创建线程池,并调用目标方法,并收集目标方法的返回值,用于判断所有线程是否全部完成,完成则更新任务状态为已下发,否则为下发失败。并记录失败部门数据,等待下次重新下发,下次重新下发时,查询该任务下是否存在下发失败的部门,如果存在优先下发失败部门,下发成功后删除失败部门记录。具体的下发方法由handleDelivery()调用。
public void handleDelivery(MissionDeliveryEvent missionDeliveryEvent) {
/**
* 判断本次下发任务中是否有下发失败的部门,如果有,则从缓存中获取,重新下发,重新下发时只下发失败的部门。
* 例如一共200部门两个线程下发,共两个线程,其中一个100部门下发成功,另一个因异常失败回滚,重新下发时只下发失败的这100个。
*/
List<BasicDataMissionDept> missionDeptList;
List<Object> redisDepts = redisUtil.lGet(MISSION_DOWN_FAIL_DEPTS.concat(missionId), 0, -1);
//判断当前有无下发失败的部门
if (CollectionUtils.isNotEmpty(redisDepts)){
missionDeptList = Convert.toList(BasicDataMissionDept.class,redisDepts);
//删除缓存避免失败的部门重复下发
redisUtil.del(MISSION_DOWN_FAIL_DEPTS.concat(missionId));
}else{
// 先获取任务关联的部门信息,根据下发部门计算下发线程数量
LambdaQueryWrapper<BasicDataMissionDept> deptWrapper = new LambdaQueryWrapper<>();
deptWrapper.eq(BasicDataMissionDept::getMissionId, missionId);
missionDeptList = basicDataMissionDeptService.list(deptWrapper);
}
//对集合进行拆分、拆分后的集合数也是线程数,代表每个线程所续下发的部门
List<List<BasicDataMissionDept>> partition = ListUtils.partition(missionDeptList, BATCH_COUNT);
int threadCount = partition.size();
ExecutorService executorService = ThreadUtil.newExecutor(threadCount);
List<Future<Boolean>> futures = new ArrayList<>();
log.info("线程池创建成功,线程数量为:{}", threadCount);
for (int i = 0; i < threadCount; i++){
int finalI = i;
Callable<Boolean> booleanCallable = () -> doDownloadMission(basicDataMission, missionDeliveryEvent, partition.get(finalI), user);
futures.add(executorService.submit(booleanCallable));
}
//收集所有线程的执行结果
boolean[] booleans = this.handleThreadResult(futures);
log.info("多线程执行结果:{}",booleans);
// 处理每个线程结果
if (BooleanUtil.and(booleans)){
// TODO业务代码,该部分可根据业务做状态修改、流程驱动等操作
}
}
executorService.shutdown();
}catch (Exception e){
log.info("任务下发前置操作出现异常",e);
updateMissionStatus(missionId, null, MissionStatusEnum.DELIVERY_FAIL);
}
/** 具体执行下发
* @param missionDeliveryEvent
* @param missionDeptList
* @param user
* @return: void
* @author: ZaNgVVB
* @date: 2024/5/15 15:14
*/
@SuppressWarnings(value={"all"})
private boolean doDownloadMission(BasicDataMission basicDataMission ,MissionDeliveryEvent missionDeliveryEvent, List<BasicDataMissionDept> missionDeptList,User user){
return transactionTemplate.execute(status -> {
Object savepoint = status.createSavepoint();
// 创建填报记录
BasicDataMissionFill missionFill = createMissionFill(basicDataMission, user);
//当前线程执行结果,成功为true,失败为false,在future中收集使用。
boolean executeflag = true;
try {
//具体业务代码
}
} catch (Exception e) {
log.error("任务下发失败,原因:{}", e.getMessage(), e);
//回滚
status.rollbackToSavepoint(savepoint);
//更新任务状态为下发失败
updateMissionStatus(missionId, null, MissionStatusEnum.DELIVERY_FAIL);
//记录下发失败部门
failedMissionFillList.add(missionFill);
//遍历存入redis中
missionDeptList.forEach(dept -> redisUtil.lSet(MISSION_DOWN_FAIL_DEPTS.concat(missionId),dept));
executeflag = false;
}
return executeflag;
});
}