一、由于具有多張寬表且字段較多,每個寬表資料大概為4000萬條,根據業務邏輯拼接别名,并每張寬表的固定字段進行left join 拼接SQL。這樣就能根據每個寬表的主列,根據每個寬表的不同字段關聯出一張新的集合。由于下來要進行分頁查詢,如果要使用SparkSQL進行分頁查詢,需要增加序号列,那麼就在剛才的Sql之前增加一句 create table tableName as SELECT ROW_NUMBER() OVER() as id,* from (拼接的SQL) 就可建立一張帶自增序列的,業務需要字段的幾張寬表的關聯集合,友善下來分頁。
for(int i=0;i<ColumnNames.size();i++){
SiCustomerLabelInfoModel Column = ColumnNames.get(i);
List<CiMdaSysTable> ciMdaSysTable = ciCustomerJDao.getMdaSysTableName(Column.getColumnName());
String alias = "t_" + ciMdaSysTable.get(0).getTableId();
String aliasColumn = alias + "." + Column.getColumnName();
String aliasTable = ciMdaSysTable.get(0).getTableName() +" "+ alias;
if(mainTable == null){
mainTable = aliasTable;
}
if(ciMdaSysTable.get(0).getUpdateCycle() == 1){
mainTable = aliasTable;
}
ColumnNameList.add(aliasColumn);
tableNameList.add(aliasTable);
}
String[] keyAlias = mainTable.split(" ");
String mainKeyColumn = keyAlias[1] + "." + keyColumn;
selectResult.append("select ").append(mainKeyColumn);
if(StringUtil.isNotEmpty(mainTable)){
fromTableName.append(" from ").append(mainTable);
}
Iterator<String> table = tableNameList.iterator();
while(table.hasNext()){
String tableName = table.next();
String[] tableAlias = tableName.split(" ");
String[] mainAlias = mainTable.split(" ");
String alias = tableAlias[1];
String mAlias = mainAlias[1];
if(!mainTable.equals(tableName)){
fromTableName.append(" left join ").append(tableName).append(" on ").append(mAlias).append(".").append(keyColumn)
.append(" = ").append(alias).append(".").append(keyColumn).append(" ");
}
}
fromTableName.append(" ) a");
Iterator<String> column = ColumnNameList.iterator();
while(column.hasNext()){
String columnName = column.next();
selectResult.append(",").append(columnName);
}
selectResult.append(fromTableName);
Createtable.append("create table ").append(cocDwName).append(" as SELECT ROW_NUMBER() OVER() as id,* from").append(" (").append(selectResult);
二、由于業務場景,需要将4000萬條資料最終寫入10個檔案,這裡通過聲明線程池pool,使用多線程的方法執行,有些人會擔心那不會資料錯亂嗎,不會。因為後面要用分頁sql,根據循環傳入的 i 的值進行處理。
private ExecutorService pools = Executors.newFixedThreadPool(15);
if(result = true){
String queryCount = "select count(*) from "+cocDwName;
int count = ciCustomerJDao.getDwTotolCount(queryCount);
log.info(""+keyColumn);
try {
for(int i=0;i<10;i++){
CreateDwFileThread jd = new CreateDwFileThread(jndiName,keyColumn,num,cocDwName,count,sysId,i);
Future fu = pools.submit(jd);
fus.add(fu);
}
long start = System.currentTimeMillis();
while (true) {
boolean done = true;
for (Future f : fus) {
if (!f.isDone()) {
done = false;
break;
}
}
if (!done) {
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
log.error("sleep error", e);
e.printStackTrace();
}
continue;
} else {
break;
}
}
log.debug("wait tasks finish cost:" + (System.currentTimeMillis() - start));
}catch(Exception e){
result = false;
log.error("error", e);
}
}
三、根據第一步建立的表中的自增序列ID進行分頁,由于要多線程并發執行,是以不能使用傳統分頁的begin與end,根據步驟二中傳入的 i (這裡參數為partNumber)進行處理,根據循環,每條線程執行的開始資料必定以上條資料結束的條數為開始,每次将查詢出來的結果集通過list2File寫入檔案。這裡還有個while循環,因為分成10份還是有400萬條資料啊,還是覺得大,于是就又分成了10次~就是說每次查詢出40萬條寫入檔案,直到新加入400萬條flag傳回true退出循環。
while(flag == false){
pager.setPageSize(bufferedRowSize);
pager.setPageNum(pageNumber);
int begin = (pager.getPageNum() - 1) * pager.getPageSize()+createFileCount*partNumber;
int end = begin + pager.getPageSize();
if(end >= createFileCount*(partNumber+1)){
end = createFileCount*(partNumber+1);
}
StringBuffer sql = new StringBuffer() ;
sql.append(" select ").append(columns).append(" from ").append(cocDwName).append(" where id > ").append(begin).append(" and ").append(" id < ").append(end+1);
JdbcBaseDao jdbcBaseDao = (JdbcBaseDao) SystemServiceLocator.getInstance().getService("jdbcBaseDao");
String BackjndiName = PropertiesUtils.getProperties("JNDI_CI_BACK");
final String file = fileLocalPath + File.separator + dwName+ "_" + String.valueOf(partNumber)+ ".csv";
Log.info("---------sql;:"+ sql + "-------fileName:"+file);
List<Map<String, Object>> dataList = jdbcBaseDao.getBackSimpleJdbcTemplate().queryForList(sql.toString());
if (dataList.size() > 0) {
list2File(dataList, title, columns, file, encode, null, null);
pageNumber++;
}
if(end == createFileCount * partNumber + createFileCount){
flag = true;
}
有人會問你為啥不用ResultSet 直接放入400萬條資料 為啥還要分開每40萬條資料再分頁寫~ 我想說 我就是想這麼幹~ 啊哈哈。。。不過程式中貌似是有問題的 沒有考慮到的情景,是以還在推敲。。(Resultset 查出來400萬條不還是放在記憶體中,還是有可能記憶體溢出的,分頁寫大不了通過thriftserver多連接配接幾次spark嘛~ 不過代碼寫的很爛,還在提高哈~)