多线程执行批量任务并支持事物-随记

起因

最近在公司做到一块业务是需要创建任务然后下发任务。下发任务时需要批量给各个部门下发、涉及到数据量大,每个任务中的表格如果再配置下发规则执行起了就会更慢。本着多一事不如少一事的原则。不太想管,但扛不住PO天天催,没办法只能赶鸭子上架进行重构了。

第一版

第一版功能设计时就考虑到了数据量会很大,想到操作时会很耗时总不能前端一直挂起让等着,故同步方式肯定不行,所以最终采用异步、单线程的方式下发,反正功能时效性又不强,数据量大异步下发就等久点而已。

结果显而易见,客户要求下发任务必须要要求不能超过三个小时。而经过我自己的测试,只能用离谱🫨来形容。

测试机配置如下:

CPU

内存

CPU占用

下发部门

下发表格数据

耗时

i9-13900H

32G

95%⬆️

1498

160000

8小时

没错,就是6.5个小时,当初设计该功能时,都没想到会用这么久,那还愣着干啥,改呗。

第二版

经过评审,采用多线程的方式下发,并优化每次下发前根据规则查询表格中数据的次数。改造后不能失去事物控制(下发成功的不回滚,只对失败的线程进行回滚,提高二次下发效率),还需要对下发失败的部门进行记录,以便重新下发。

下面只展示部分核心代码。

该方法主要是查询下发任务的任务信息,并根据部门计算共用多少线程执行下发操作,每个线程具体负责多少部门,计算后创建线程池,并调用目标方法,并收集目标方法的返回值,用于判断所有线程是否全部完成,完成则更新任务状态为已下发,否则为下发失败。并记录失败部门数据,等待下次重新下发,下次重新下发时,查询该任务下是否存在下发失败的部门,如果存在优先下发失败部门,下发成功后删除失败部门记录。具体的下发方法由handleDelivery()调用。

public void handleDelivery(MissionDeliveryEvent missionDeliveryEvent)  {
      
            /**
             * 判断本次下发任务中是否有下发失败的部门,如果有,则从缓存中获取,重新下发,重新下发时只下发失败的部门。
             * 例如一共200部门两个线程下发,共两个线程,其中一个100部门下发成功,另一个因异常失败回滚,重新下发时只下发失败的这100个。
             */

            List<BasicDataMissionDept> missionDeptList;
            List<Object> redisDepts = redisUtil.lGet(MISSION_DOWN_FAIL_DEPTS.concat(missionId), 0, -1);
            //判断当前有无下发失败的部门
            if (CollectionUtils.isNotEmpty(redisDepts)){
                missionDeptList =  Convert.toList(BasicDataMissionDept.class,redisDepts);
                //删除缓存避免失败的部门重复下发
                redisUtil.del(MISSION_DOWN_FAIL_DEPTS.concat(missionId));
            }else{
                // 先获取任务关联的部门信息,根据下发部门计算下发线程数量
                LambdaQueryWrapper<BasicDataMissionDept> deptWrapper = new LambdaQueryWrapper<>();
                deptWrapper.eq(BasicDataMissionDept::getMissionId, missionId);
                missionDeptList = basicDataMissionDeptService.list(deptWrapper);
            }
            //对集合进行拆分、拆分后的集合数也是线程数,代表每个线程所续下发的部门
            List<List<BasicDataMissionDept>> partition = ListUtils.partition(missionDeptList, BATCH_COUNT);

            int threadCount = partition.size();

            ExecutorService executorService = ThreadUtil.newExecutor(threadCount);
            List<Future<Boolean>> futures = new ArrayList<>();
            log.info("线程池创建成功,线程数量为:{}", threadCount);
            for (int i = 0; i < threadCount; i++){
                int finalI = i;
                Callable<Boolean> booleanCallable = () -> doDownloadMission(basicDataMission, missionDeliveryEvent, partition.get(finalI), user);
                futures.add(executorService.submit(booleanCallable));
            }

            //收集所有线程的执行结果
            boolean[] booleans = this.handleThreadResult(futures);

            log.info("多线程执行结果:{}",booleans);
            // 处理每个线程结果
            if (BooleanUtil.and(booleans)){
                // TODO业务代码,该部分可根据业务做状态修改、流程驱动等操作
                
                }
            }
            executorService.shutdown();
        }catch (Exception e){
            log.info("任务下发前置操作出现异常",e);
            updateMissionStatus(missionId, null, MissionStatusEnum.DELIVERY_FAIL);
        }


	/** 具体执行下发
	     * @param missionDeliveryEvent
	     * @param missionDeptList
	     * @param user
	     * @return: void
	     * @author: ZaNgVVB
	     * @date: 2024/5/15 15:14
	     */
	    @SuppressWarnings(value={"all"})
	    private boolean doDownloadMission(BasicDataMission basicDataMission ,MissionDeliveryEvent missionDeliveryEvent, List<BasicDataMissionDept> missionDeptList,User user){
	      
	        return transactionTemplate.execute(status -> {
	            Object savepoint = status.createSavepoint();
	            // 创建填报记录
	            BasicDataMissionFill missionFill = createMissionFill(basicDataMission, user);
	            //当前线程执行结果,成功为true,失败为false,在future中收集使用。
	            boolean executeflag = true;
	            try {
	               
					//具体业务代码
	                }
	            } catch (Exception e) {
	                log.error("任务下发失败,原因:{}", e.getMessage(), e);
	                //回滚
	                status.rollbackToSavepoint(savepoint);
	                //更新任务状态为下发失败
	                updateMissionStatus(missionId, null, MissionStatusEnum.DELIVERY_FAIL);
	                //记录下发失败部门
	                failedMissionFillList.add(missionFill);
	                //遍历存入redis中
	                missionDeptList.forEach(dept -> redisUtil.lSet(MISSION_DOWN_FAIL_DEPTS.concat(missionId),dept));
	                executeflag = false;
	            }
	            return executeflag;
	        });
	    }

LICENSED UNDER CC BY-NC-SA 4.0
Comment