一、需求場景
很多資料開發者在使用MaxCompute開發過程中需要統計每個賬号所屬任務的費用使用情況以及每個任務耗時來做任務的合理性規劃和調整。但是在使用MaxCompute的時候通常情況下大多數使用者通過DataWorks标準模式下使用MaxCompute,這樣在MaxCompute提供的中繼資料視圖資訊中将記錄所有的生産作業執行賬号為同一個主賬号,隻有小部分的開發作業執行賬号為個人RAM子賬号。那麼如何去做到各個賬戶的費用分攤和任務時間成本的統計是大部分MaxCompute使用者所關注的問題。本文主要介紹如何通過MaxCompute中繼資料統計賬号費用及任務耗時,同時定時通過釘釘推送到客戶群。
二、需求實作
1、目前任務的費用可以通過賬單詳情中的用量明細來查詢,但是沒有歸屬到對應的子賬号。我們需要通過中繼資料Information_Schema視圖中的曆史使用資訊TASKS_HISTORY來統計。
2、任務耗時需要通過中繼資料來統計。
三、MaxCompute賬号費用及任務耗時TOPN統計
1、中繼資料介紹
MaxCompute的Information Schema提供了項目中繼資料及使用曆史資料等資訊
注意:
(1)目前Information Schema提供的是目前項目的中繼資料視圖,不支援跨項目的中繼資料通路。如果需要對多個項目的中繼資料進行統一查詢、分析,需要分别擷取各個項目中的中繼資料并整合在一起進行跨項目中繼資料分析。
(2)中繼資料及作業曆史資料儲存在Information_Schema空間下,如需對曆史資料進行快照備份或獲得超過14天的作業曆史,您可以定期将Information Schema的資料儲存備份到使用者指定項目空間。
2、如何根據中繼資料去實作賬号費用TOPN統計
(1)中繼資料下載下傳
中繼資料Information_Schema視圖中的曆史使用資訊TASKS_HISTORY記錄MaxCompute項目内已完成的作業曆史,保留近14天資料。需要通過中繼資料來做任務費用統計,是以需要定期将Information Schema的資料儲存備份到使用者指定項目空間。
開始使用前,需要以Project Owner身份安裝Information Schema的權限包,獲得通路本項目中繼資料的權限。安裝方式有如下兩種:
a、在MaxCompute用戶端(odpscmd)中執行如下指令。
odps@myproject1>install package information_schema.systables;
b、在DataWorks中的資料開發 > 臨時查詢中執行如下語句。
install package information_schema.systables;
Information Schema的視圖包含了項目級别的所有使用者資料,預設Project Owner可以通路檢視。如果項目内其他使用者或角色通路檢視,需要進行授權。
文法如下。
grant actions on package <pkgName> to user <username>;
grant actions on package <pkgName> to role <role_name>;
(2)中繼資料下載下傳備份
--建立中繼資料資料備份表information_history
use project1;
CREATE TABLE IF NOT EXISTS project1.information_history
(
task_catalog STRING
,task_schema STRING
,task_name STRING
,task_type STRING
,inst_id STRING
,`status` STRING
,owner_id STRING
,owner_name STRING
,result STRING
,start_time DATETIME
,end_time DATETIME
,input_records BIGINT
,output_records BIGINT
,input_bytes BIGINT
,output_bytes BIGINT
,input_tables STRING
,output_tables STRING
,operation_text STRING
,signature STRING
,complexity DOUBLE
,cost_cpu DOUBLE
,cost_mem DOUBLE
,settings STRING
,ds STRING
)
STORED AS ALIORC
;
--定時将資料寫入備份表information_history
use project1;
insert into table project1.information_history
select * from information_schema.tasks_history where ds ='${datetime1}';
注意:${datetime1}為DataWorks排程參數,參數配置如下:datetime1=${yyyymmdd}
如果要統計多個項目空間的中繼資料需要分别去各個項目空間安裝中繼資料package。之後在其他工作空間執行相同的操作。把各個工作空間的中繼資料備份資料插入到同一個表中做集中統計分析。
本文将所有project中繼資料維護在project1.information_history表中。
use project2;
insert into table project1.information_history
select * from information_schema.tasks_history where ds ='${datetime1}';
備注:${datetime1}為DataWorks排程參數,參數配置如下:datetime1=${yyyymmdd}之後遇到的所有參數都是如此後續不在重複描述。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5CZzITMlNjYwQGO3EzMhhjYxUzN0UmNxAjM4EWZ1IWZy8CX5d2bs92Yl1iclB3bsVmdlR2LcNWaw9CXt92Yu4GZjlGbh5yYjV3Lc9CX6MHc0RHaiojIsJye.png)
(3)通過中繼資料計算賬号所屬任務費用
通過DataWorks标準模式下使用MaxCompute,這樣在MaxCompute提供的中繼資料視圖資訊中将記錄所有的生産作業執行賬号為同一個主賬号,隻有小部分的開發作業執行賬号為個人RAM子賬号。中繼資料視圖TASKS_HISTORY中的字段settings記錄上層排程或使用者傳入的資訊,以JSON格式存儲。包含字段:useragent、bizid、skynet_id和skynet_nodename。通過該字段可以具體到建立任務的子賬号資訊。
a、維護一張子賬号明細表user_ram,記錄需要統計的賬号及賬号ID
CREATE TABLE IF NOT EXISTS project1.user_ram
(
user_id STRING
,user_name STRING
)
STORED AS ALIORC
;
b、賬号所屬任務消費(按量付費)TOPN統計
CREATE TABLE IF NOT EXISTS project1.cost_topn
(
cost_sum DECIMAL(38,5)
,task_owner STRING
)
PARTITIONED BY
(
ds STRING
)
STORED AS ALIORC
;
---中繼資料TOPN按量付費消費統計
set odps.sql.decimal.odps2=true;
insert into table project1.cost_topn PARTITION (ds = '${datetime1}')
SELECT
nvl(cost_sum,0) cost_sum
,CASE WHEN a.task_owner='13************' OR a.task_owner='23************' OR a.task_owner='21************' THEN b.user_name
ELSE a.task_owner
END task_owner
---注釋部分為賬号ID
FROM (
SELECT inst_id
,owner_name
,task_type
,a.input_bytes
,a.cost_cpu
,a.STATUS
,CASE WHEN a.task_type = 'SQL' THEN CAST(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 AS DECIMAL(18,5) )
WHEN a.task_type = 'SQLRT' THEN CAST(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 AS DECIMAL(18,5) )
WHEN a.task_type = 'CUPID' AND a.STATUS='Terminated'THEN CAST(a.cost_cpu/100/3600 * 0.66 AS DECIMAL(18,5) )
ELSE 0
END cost_sum
,a.settings
,GET_JSON_OBJECT(settings, "$.SKYNET_ONDUTY") OWNER
,CASE WHEN GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY") IS NULL THEN owner_name
ELSE GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY")
END task_owner
FROM project1.information_history a
WHERE ds = '${datetime1}'
) a
LEFT JOIN project1.user_ram b
ON a.task_owner = b.user_id
;
備注:
task_type = 'SQL' 為SQL任務、task_type = 'SQLRT' 為查詢加速(MCQA)任務,task_type = 'CUPID' 為Spark任務,其他計費任務如MapReduce、Lightning(互動式分析)、Mras計算公式如下,詳細介紹請參考
計算費用(按量計費)MapReduce作業的計費公式為:
MapReduce作業當日計算費用=當日總計算時×單價(0.46元/計算時)
一個執行成功的MapReduce作業計算時=作業運作時間(小時)×作業調用的Core數量。
Lightning查詢作業的計費公式為:
一次Lightning查詢作業費用=查詢輸入資料量×單價(0.03元/GB)
Mars作業的計費公式為:
Mars作業當日計算費用=當日總計算時×單價(0.66元/計算時)
3、如何根據中繼資料去實作任務耗時TOPN統計
---建立任務耗時TOPN表time_topn
CREATE TABLE IF NOT EXISTS project1.time_topn
(
inst_id STRING
,cost_time BIGINT
,task_owner STRING
)
PARTITIONED BY
(
ds STRING
)
STORED AS ALIORC
;
---任務耗時TOPN統計
INSERT INTO TABLE project1.time_topn PARTITION(ds = '${datetime1}')
SELECT inst_id
,cost_time
,CASE WHEN a.task_owner='13**********' OR a.task_owner='23**********' OR a.task_owner='21**********' THEN b.user_name
ELSE a.task_owner
END task_owner
FROM (
SELECT inst_id
-- ,task_type
-- ,status
,datediff(a.end_time, a.start_time, 'ss') AS cost_time
,CASE WHEN GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY") IS NULL THEN owner_name
ELSE GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY")
END task_owner
FROM project1.information_history a
WHERE ds = '${datetime1}'
) a
LEFT JOIN project1.user_ram b
ON a.task_owner = b.user_id
;
4、通過釘釘機器人推送到釘群
(1)群機器人開發API
a、擷取自定義機器人webhook
打開機器人管理頁面。以PC端為例,打開PC端釘釘,點選頭像,選擇“機器人管理”。
在機器人管理頁面選擇“自定義”機器人,輸入機器人名字并選擇要發送消息的群,同時可以為機器人設定機器人頭像。
完成必要的安全設定(至少選擇一種),勾選 我已閱讀并同意《自定義機器人服務及免責條款》,點選“完成”。安全設定目前有3種方式,設定說明參考
安全設定完成安全設定後,複制出機器人的Webhook位址,可用于向這個群發送消息,格式如下:
https://oapi.dingtalk.com/robot/send?access_token=XXXXXX
注意:請保管好此Webhook 位址,不要公布在外部網站上,洩露後有安全風險。
(2)釘群消息推送demo
a、代碼實作
package com.alibaba.sgri.message;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ResultSet;
import com.aliyun.odps.task.SQLTask;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;
import com.dingtalk.api.response.OapiRobotSendResponse;
import com.taobao.api.ApiException;
/**
* @class: OdpsMessageSendNew
* @description:
* @author: Liujianwei
* @date: 2020-10-16 18:26:12
**/
public class test {
public static void main(String[] args) throws ApiException {
if (args.length < 1) {
System.out.println("請輸入日期參數");
System.exit(0);
}
System.out.println("開始讀取資料");
DingTalkClient client = new DefaultDingTalkClient(
"https://oapi.dingtalk"
+ ".com/robot/send?access_token=Webhook位址\n");
OapiRobotSendRequest request = new OapiRobotSendRequest();
request.setMsgtype("markdown");
OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
//這裡的日期作為參數
markdown.setText(getContent(args[0]));
markdown.setTitle("任務消費TOPN");
request.setMarkdown(markdown);
OapiRobotSendResponse response = client.execute(request);
System.out.println("消息發送成功");
}
/**
* 讀取ODPS,擷取要發送的資料
*
* @return
*/
public static String getContent(String day) {
Odps odps = createOdps();
StringBuilder sb = new StringBuilder();
try {
//==================這是任務消費=====================
String costTopnSql = "select sum(cost_sum)cost_sum,task_owner from cost_topn where ds='" + day + "' " + "group by task_owner order by cost_sum desc limit 5;";
Instance costInstance = SQLTask.run(odps, costTopnSql);
costInstance.waitForSuccess();
ResultSet costTopnRecords = SQLTask.getResultSet(costInstance);
sb.append("<font color=#FF0000 size=4>").append("任務消費TOPN(").append(day).append(
")[按照阿裡雲按量付費計算]").append("</font>").append("\n\n");
AtomicInteger costIndex = new AtomicInteger(1);
costTopnRecords.forEach(item -> {
sb.append(costIndex.getAndIncrement()).append(".").append("賬号:");
sb.append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>");
sb.append(" ").append(" ").append("消費:").append("<font color=#2E64FE>").append(item.get("cost_sum"))
.append("元").append(
"</font>").append("\n\n")
.append("</font>");
});
//==================這是任務耗時=====================
String timeTopnSql = "select * from time_topn where ds='" + day + "' ORDER BY cost_time DESC limit 5;";
Instance timeInstance = SQLTask.run(odps, timeTopnSql);
timeInstance.waitForSuccess();
ResultSet timeTopnRecords = SQLTask.getResultSet(timeInstance);
sb.append("<font color=#FF8C00 size=4>").append("任務耗時TOPN(").append(day).append(")")
.append("\n\n").append("</font>");
AtomicInteger timeIndex = new AtomicInteger(1);
timeTopnRecords.forEach(item -> {
sb.append(timeIndex.getAndIncrement()).append(".").append("任務:");
sb.append("<font color=#2E64FE>").append(item.getString("inst_id")).append("\n\n").append("</font>");
sb.append(" ").append("賬号:").append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>");
sb.append(" ").append("耗時:").append("<font color=#2E64FE>").append(item.get("cost_time"))
.append("秒").append(
"</font>").append("\n\n");
});
} catch (OdpsException | IOException e) {
e.printStackTrace();
}
return sb.toString();
}
/**
* 建立ODPS
*
* @return
*/
public static Odps createOdps() {
String project = "******";
String access_id = ""******";";
String access_key = ""******";";
String endPoint = "http://service.odps.aliyun.com/api";
Account account = new AliyunAccount(access_id, access_key);
Odps odps = new Odps(account);
odps.setEndpoint(endPoint);
odps.setDefaultProject(project);
return odps;
}
}
備注:自定義釘釘群機器人開發API參考:
釘釘開發平台b、pom檔案參考
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>DingTalk_Information</groupId>
<artifactId>DingTalk_Information</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.35.5-public</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibaba-dingtalk-service-sdk</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-jdbc</artifactId>
<version>3.0.1</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- MainClass in mainfest make a executable jar -->
<archive>
<manifest>
<mainClass>com.alibaba.sgri.message.test</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
c、定時排程執行
程式排程可以打包送出伺服器去設定定時排程。本文打包程式送出至DataWorks執行定時排程,同時中繼資料擷取也是每日排程執行。
上傳jar包為MaxCompute資源,然後引用資源執行jar包且配置定時排程。
上傳MaxCompute資源及引用參考
MaxCompute資源,這裡不做詳細介紹
各項目中繼資料采集、任務消費和耗時TOPN計算及釘群機器人推送上下遊節點排程配置如下:
DataWorks節點上下遊配置參考
節點上下文四、每日釘群推送任務消費及耗時TOPN效果展示
五、相關費用統計參考文檔
1、MaxCompute賬單分析最佳實踐:
MaxCompute賬單分析最佳實踐2、檢視賬單詳情:
檢視賬單詳情3、在DataWorks标準模式下統計個人賬号使用資源情況:
在DataWorks标準模式下統計個人賬号使用資源情況4、利用InformationSchema與阿裡雲交易和賬單管理API實作MaxCompute費用對賬分攤統計:
利用InformationSchema與阿裡雲交易和賬單管理API實作MaxCompute費用對賬分攤統計六、MaxCompute開發者社群交流群
歡迎加入“MaxCompute開發者社群2群”,點選連結
MaxCompute開發者社群2群申請申請加入或掃描以下二維碼加入。