案例一:
场景: 当表中数据量过大,一次性获取,效率低。改用多线程查询,这里用到了业务字段“月份”,每年12个月份,起12个线程,分别获取对应月份下的数据。
List<String> monthList = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12");
// 定义线程数
int threadNum = monthList.size();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(threadNum);
CountDownLatch curEndLock = new CountDownLatch(threadNum);
BlockingQueue<Future<List<CurrentStaffCostCpInfoDTO>>> queue = new LinkedBlockingQueue<>();
for (String month : monthList) {
// 使用Future接收每个线程查询对应月份的结果
Future<List<CurrentStaffCostCpInfoDTO>> future = fixedThreadPool.submit(() -> {
long l1 = System.currentTimeMillis();
List<CurrentStaffCostCpInfoDTO> queryList = currentStaffCostComputeMapper.listDetail(year, calVersion, month);
curEndLock.countDown();
log.warn("人力成本-生成法人报表-多线程查询现员明细,耗时:{},ThreadName:{},当前执行顺序:{}", (System.currentTimeMillis() - l1) / 1000, Thread.currentThread().getName(), curEndLock.getCount());
return queryList;
});
queue.add(future);
}
// 等待所有查询线程执行完毕
boolean await;
try {
await = curEndLock.await(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw ExceptionUtils.create("获取现员人力成本核算明细数据await中断异常.", e, null, null);
} finally {
fixedThreadPool.shutdown();
}
if (!await) {
log.warn("获取核算数据超时了!");
}
/* 汇总结果 */
for (Future<List<CurrentStaffCostCpInfoDTO>> future : queue) {
List<CurrentStaffCostCpInfoDTO> currentThreadList = null;
try {
currentThreadList = future.get();
} catch (InterruptedException e) {
throw ExceptionUtils.create("获取现员人力成本核算明细数据get中断异常.", e, null, null);
} catch (ExecutionException e) {
throw ExceptionUtils.create("获取现员人力成本核算明细数据get执行异常.", e, null, null);
}
if (CollectionUtils.isNotEmpty(currentThreadList)) {
currentStaffGroupList.addAll(currentThreadList);
}
}
案例二:
场景: 当做报表时,需要从多个数据源取数,并进行汇总计算时,顺时查询各个数据源,数据量大,会导致查询过慢,使用多线程并发查询各个数据源。
// 4个数据源,定义4个线程
int threadNum = 4;
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
CountDownLatch endLock = new CountDownLatch(threadNum);
// 定义4个接收查询结果的List
List<CurrentStaffCostCpInfoDTO> currentStaffGroupList = new ArrayList<>();
List<StuLaborCostInfoDTO> studentDetailGroupList = new ArrayList<>();
List<NewOrgLaborCostDTO> newOrgGroupList = new ArrayList<>();
List<OutsourceLaborCostDTO> outsourceGroupList = new ArrayList<>();
executor.execute(() -> {
// 现员人力成本核算明细
long l = System.currentTimeMillis();
putMap(year, calVersion, currentStaffGroupList);
log.warn("人力成本-生成法人报表-查询现员核算明细耗时:{}", (System.currentTimeMillis() - l) / 1000);
endLock.countDown();
});
executor.execute(() -> {
// 大学生人力成本核算明细
List<StuLaborCostInfoDTO> stuList = stuLaborCostComputeMapper.listDetail(year, calVersion);
if (CollectionUtils.isNotEmpty(stuList)) {
studentDetailGroupList.addAll(stuList);
}
endLock.countDown();
});
executor.execute(() -> {
// 新增组织成本核算
List<NewOrgLaborCostDTO> newOrgList = newOrgLaborCostMapper.listDetail(year, calVersion);
if (CollectionUtils.isNotEmpty(newOrgList)) {
newOrgGroupList.addAll(newOrgList);
}
endLock.countDown();
});
executor.execute(() -> {
// 外包成本核算
List<OutsourceLaborCostDTO> outsourceList = outsourceLaborCostMapper.listDetail(year, calVersion);
if (CollectionUtils.isNotEmpty(outsourceList)) {
outsourceGroupList.addAll(outsourceList);
}
endLock.countDown();
});
// 等待子线程的逻辑执行完毕
boolean await;
try {
await = endLock.await(5, TimeUnit.MINUTES);
log.warn("获取核算数据完成,总耗时:{}s", (System.currentTimeMillis() - beginTime) / 1000);
} catch (InterruptedException e) {
throw ExceptionUtils.create("获取核算数据await中断异常.", e, null, null);
} finally {
// 关闭线程池
executor.shutdown();
}
if (!await) {
log.warn("获取核算数据超时了!");
}
案例三:
场景: 当处理导入等大数据量入库时:mybatis的分批分量入库明显效率过低,应改用 多线程jdbc批量提交事务 来优化数据入库过慢问题
public void batchInsertStuLaborCostComputeItem(List<StuLaborCostComputeItem> stuLaborCostComputeItems) {
// 切分,每1200条数据执行一条线程
Map<Integer, List<StuLaborCostComputeItem>> hashMap = DataSplit.splitData(stuLaborCostComputeItems, 1200);
ExecutorService service = Executors.newFixedThreadPool(55);
for (Map.Entry<Integer, List<StuLaborCostComputeItem>> entity : hashMap.entrySet()) {
List<StuLaborCostComputeItem> splitEntity = entity.getValue();
try {
// 发起多个线程进行保存
service.execute(() -> {
if (CollectionUtils.isNotEmpty(splitEntity)) {
laborCostComputeJDBC.insertStuLaborCostComputeItem(splitEntity);
}
});
} catch (Exception e) {
log.error("保存异常", e);
}
}
service.shutdown();
}
public void insertStuLaborCostComputeItem(List<StuLaborCostComputeItem> list) {
Connection conn = null;
PreparedStatement pst = null;
try {
conn = connectionSource.getConnection();
String sql = "INSERT INTO STU_LABOR_COST_COMPUTE_ITEM" +
"(ID, VERSION, CREATE_USER, CREATION_DATE, IS_DELETED, LAST_UPDATE_DATE, LAST_UPDATE_USER, " +
"CODE, VALUE, SLC_ID)" +
" values(?,0,?,sysdate,0,sysdate,?," +
" ?,?,?)";
pst = conn.prepareStatement(sql);
boolean autoCommit = conn.getAutoCommit();
// 关闭自动提交
conn.setAutoCommit(false);
long l = System.currentTimeMillis();
int batchCount = 800;// 提交数量,到达这个数量就提交
for (int index = 0; index < list.size(); index++) {
StuLaborCostComputeItem stuLaborCostComputeItem = list.get(index);
pst.setString(1, stuLaborCostComputeItem.getId());
pst.setString(2, stuLaborCostComputeItem.getCreater());
pst.setString(3, stuLaborCostComputeItem.getLastUpdated());
// CODE, VALUE, SLC_ID
pst.setString(4, stuLaborCostComputeItem.getCode());
pst.setString(5, stuLaborCostComputeItem.getValue() == null ? null : getEncryptProvider().encrypt(stuLaborCostComputeItem.getValue()));
pst.setString(6, stuLaborCostComputeItem.getStuLaborCostCompute().getId());
pst.addBatch();
if (index != 0 && index % batchCount == 0) {
pst.executeBatch();
// 达到batchCount时,提交事务
conn.commit();
}
}
pst.executeBatch();
// 未达到batchCount时,提交事务
conn.commit();
conn.setAutoCommit(autoCommit);
log.warn("JDBC提交耗时:"+(System.currentTimeMillis()-l));
} catch (Exception e) {
log.error("保存出现异常:", e);
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException throwables) {
log.error("回滚异常:", e);
throw new RuntimeException("回滚异常:", e);
}
throw new RuntimeException("保存出现异常:", e);
} finally {
connectionSource.close(pst, conn);
}
}
案例四:
场景: 当多线程处理不同月份业务数据计算时,共享对象,变量,会被不同子线程所修改,导致计算结果错乱。多线程中,每份业务对象,业务变量应是独立的,否则会被修改
Integer threadNum = 12;
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
CountDownLatch endLock = new CountDownLatch(threadNum);
// 定义月份
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
for (Integer month : list) {
executor.execute(()->{
//利用Gson实现深拷贝。(共享对象userMiddleAtom,并发下会被多个线程修改)
Gson gson = new Gson();
UserMiddleAtom userMiddleAtom2 = gson.fromJson(gson.toJson(userMiddleAtom), UserMiddleAtom.class);
Map<String, MiddleAtom> mp = userMiddleAtom2.getMiddleAtomHashMap();
// 写入月份
MiddleAtom atomR005 = mp.get("R005");
if (null != atomR005) {
String value = String.valueOf(month);
atomR005.setValue(value);
mp.put("R005", atomR005);
computingMiddle("R005", value, mp, userMiddleAtom2.getJumMaps(), false);
}
// 写入对应人数
double sumEmpNum = 0;
sumEmpNum = this.getSumEmpNum(costCompute, month, sumEmpNum);
MiddleAtom atomR003 = mp.get("R003");
if (null != atomR003) {
String value = String.valueOf(sumEmpNum);
atomR003.setValue(value);
mp.put("R003", atomR003);
computingMiddle("R003", value, mp, userMiddleAtom2.getJumMaps(), false);
}
// 跳表计算
for (LCDataAtom fore : lcDataAtom.getDatas()) {
batchComputing(fore, userMiddleAtom2, mp);
}
computingCache.set(getKey(lcDataAtom, String.valueOf(month)), userMiddleAtom2, 30, TimeUnit.DAYS);
endLock.countDown();
});
}
//等待子线程的逻辑执行完毕
boolean await = false;
try {
await = endLock.await(3, TimeUnit.MINUTES);
} catch (InterruptedException e) {
log.warn("人力成本核算工号:{},计算12个月awaitTermination异常:",userCode,e);
}finally {
//关闭线程池
executor.shutdown();
}
if(!await){
log.warn("复制12个月份的模板缓存,时间超时了.");
}
五种常见的深拷贝方式:
Java对象深拷贝详解(List深拷贝)
Q.E.D.