现在的位置: 首页 > 综合 > 正文

大数据量情况程序处理技巧

2013年06月26日 ⁄ 综合 ⁄ 共 4396字 ⁄ 字号 评论关闭

批量更新:

PreparedStatement并不能减少sql的执行数目. 参数传入多少次,就会执行多少条sql.

在一个事务中,由于Statement与PrestatedStatement都使用的同一个连接. Statement不会再象过去每次都去获取连接. 这种情况下,反而可以利用Statement优化PreparedStatement. 减少sql的执行条数. 但是每条sql会编译,获得执行计划.




示例1:质量度更新:

比如关键词在审核的过程中先去查询A表,得到该词的历史质量度, 如果有,就更新B表中的质量度值.

	public void checkQuality() {
		// 先取得词的历史质量度
		Map<String, Long> qhmap = getQualityWithAccountidAndKey(
				needCheckQualityKeys, accountid);
		// QualityHistory表中能根据groupid-key查到历史质量度.则需要更新keyspend的值.否则什么都不做.
		if (qhmap != null && qhmap.size() > 0) {
			
                        List<KeySpend> kslist = new ArrayList<KeySpend>();
			for (CpcKeyDTO key : needCheckQualityKeys) {
				if (qhmap.get(key.getKey()) != null) {
					KeySpend keyspend = new KeySpend();
					keyspend.setCpcid(key.getCpcid());
					keyspend.setQuality(qhmap.get(key.getKey()));
					kslist.add(keyspend);
				}
			}
                        //会在server端执行kslist.size()条sql.
			batchProcessKeySpend(kslist);
		}

	}

调优后的方法:

		if (qhmap != null && qhmap.size() > 0) {
	
			HashMap<Long, ArrayList<Long>> keySpendMap = new HashMap<Long, ArrayList<Long>>();
                        //质量度有5个值, 所以每个值对应一条sql. 将来只需要5条sql
			for (CpcKeyDTO key : needCheckQualityKeys) {
				if (qhmap.get(key.getKey()) != null) {
					Long degree = qhmap.get(key.getKey());					
					
					ArrayList<Long> list = keySpendMap.get(degree);
					if(list == null)
					{
						list = new ArrayList<Long>();
						keySpendMap.put(degree, list);
					}
					list.add(key.getCpcid());							
				}
			}
			
			//使用IN进行优化。
 			for(Long degree : keySpendMap.keySet())
			{
				auditPersistentService.batchProcessKeySpend(degree, keySpendMap.get(degree));
			}			
		}	

上面调优之后,由以前的kslist条sql变成了现在的5条sql,但是这5条sql都会重新编译.

经测试:10个Key感觉差别不是特别明显. 都在16ms. 结论待定.


示例2: 在广告系统中配对的批量插入与批量更新的一个优化示例:

批量广告插入最初使用的是merge into.但是即使使用的PreparedStatement的sql,仍然要执行25W次.

比如应用程序中形成的了一个配对的集合,包含了25W个元素.通过jdbcTemplate的batchUpdate批量插入这些元素,并且还需要判重,使用的sql如下:

/* Formatted on 2011-8-2 18:54:00 (QP5 v5.114.809.3010) */
MERGE INTO CPC.CPCPARTNERAUDIT cpa
USING (SELECT ? AS OPID,? AS ACCOUNTID,? AS GROUPID,? AS IDEAID,? AS KEYID,? AS CHECKSTATUS,? AS CREATEDATE,? AS REFUSEREASON, ? AS ADMINUSERID,? AS ADMINUSERNAME,? AS AUDITREASON,? AS BACKUPIDEAID  FROM DUAL) cpai
ON (cpa.keyid=cpai.keyid AND cpa.ideaid=cpai.ideaid)
WHEN NOT MATCHED THEN
INSERT
VALUES (cpai.OPID, cpai.ACCOUNTID, cpai.GROUPID, cpai.IDEAID, cpai.KEYID,
cpai.CHECKSTATUS, cpai.CREATEDATE, cpai.REFUSEREASON, cpai.ADMINUSERID, cpai.ADMINUSERNAME,
cpai.AUDITREASON,cpai.BACKUPIDEAID)

为了在同时使用PreparedStatement的时候,将这25W条sql降下来,重构应用程序如下:在应用程序中形成ideaid-keylist的集合,针对每个idea对应的key做下面的操作,执行sql数目=idea的数目.

INSERT INTO CPC.CPCPARTNERAUDIT (OPID,
                                 ACCOUNTID,
                                 GROUPID,
                                 IDEAID,
                                 KEYID,
                                 CHECKSTATUS,
                                 CREATEDATE,
                                 REFUSEREASON,
                                 ADMINUSERID,
                                 ADMINUSERNAME,
                                 AUDITREASON,
                                 BACKUPIDEAID)
   (SELECT   A.*
      FROM   (SELECT   GREATEST (I.OPID, C.OPID) AS OPID,
                       I.ACCOUNTID,
                       I.CPCGRPID,
                       I.CPCIDEAID,
                       C.CPCID,
                       1 AS CHECKSTATUS,
                       SYSDATE AS CREATEDATE,
                       '自动审核通过' AS REFUSEREASON,
                       0 AS ADMINUSERID,
                       '自动审核' AS ADMINUSERNAME,
                       '自动审核通过' AS AUDITREASON,
                       NULL AS BACKUPIDEAID
                FROM   CPC.CPCIDEA I, CPC.CPC C
               WHERE       I.CPCGRPID = C.CPCGRPID
                       AND I.CPCIDEAID = ?
                       AND C.CPCID IN (?)
              UNION ALL
              SELECT   GREATEST (I.OPID, C.OPID) AS OPID,
                       C.ACCOUNTID,
                       I.GROUPID,
                       I.AUDITIDEAID AS CPCIDEAID,
                       C.CPCID,
                       1 AS CHECKSTATUS,
                       SYSDATE AS CREATEDATE,
                       '自动审核通过' AS REFUSEREASON,
                       0 AS ADMINUSERID,
                       '自动审核' AS ADMINUSERNAME,
                       '自动审核通过' AS AUDITREASON,
                       I.IDEAID AS BACKUPIDEAID
                FROM   CPC.CPCIDEAMODI I, CPC.CPC C
               WHERE       I.GROUPID = C.CPCGRPID
                       AND I.AUDITIDEAID = ?
                       AND C.CPCID IN (?)) A
     WHERE   NOT EXISTS
                (SELECT   P.KEYID, P.IDEAID
                   FROM   CPC.CPCPARTNERAUDIT P
                  WHERE   A.CPCIDEAID = P.IDEAID AND A.CPCID = P.KEYID))

大数据量查询的二种方式:

1. 一种全部查询到内存,然后使用subList, subList的场景比如in参数的限制.

.....
int group = passedPartners.size() % IDS_PER_BATCH == 0 ? passedPartners
.size()
/ IDS_PER_BATCH
: (passedPartners.size()
/ IDS_PER_BATCH + 1);

for (int i = 0; i < group; i++) {
final List<CpcPartnerDTO> batchPassedPartners;
if ((i + 1) * IDS_PER_BATCH > passedPartners.size()) {
batchPassedPartners
= passedPartners.subList(i
* IDS_PER_BATCH, passedPartners.size());
}
else {
batchPassedPartners
= passedPartners.subList(i
* IDS_PER_BATCH, (i + 1) * IDS_PER_BATCH);
}
doInFacade(batchPassedPartners, AuditElement.PARTNER,
Action.INS_AUDIT_ELEMENT);
}
......

2. 直接使用数据库的分页sql获得子集.而不在内存中操作. 这样的好处是将子集放到一个中间结果里,每次再将中间结果合并到目标结果集.避免内存中同时出现二个大的结果集.

......
for (int pageNo = 1; pageNo <= totalPageCount; pageNo++) {
doPage(sql, pageNo, set);
}

......

private void doPage(String sql, int pageNo, Set<String> set) {
long t1 = System.currentTimeMillis();
final int startIndex = PageUtil.getStartOfPage(pageNo, pageSize);
String sqlLimit
= PageUtil.getLimitString(sql, true);
Object[] obj
= new Object[] { startIndex, startIndex + pageSize };
List
<String> list = jdbcTemplateCpc.query(sqlLimit, obj,
new RowMapper() {

@Override
public Object mapRow(ResultSet rs, int arg1)
throws SQLException {
return "" + rs.getString(2) + rs.getString(1);
}

});
set.addAll(list);
long t2 = System.currentTimeMillis();
logger.info(
"" + pageNo + "次查询数据量" + list.size() + ",set中现有"
+ set.size() + "条记录,耗时" + ((t2 - t1) / 1000) + "");
}

抱歉!评论已关闭.