天天看點

大資料技術之_18_大資料離線平台_03_資料處理+工具代碼導入+業務 ETL 實作+建立資料庫表

十六、資料處理

16.1、ETL 操作

  • 功能:清洗、過濾、補全
  • 資料來源:存儲在 HDFS 上的日志檔案
  • 資料處理方式:MapReduce
  • 資料儲存位置:HBase

16.2、HBase 設計

16.2.1、每天1張表

  即按天分表,一天的資料存放于一張表中,rowkey 采用随機值,不需要有特定規律,盡可能的散列。

16.2.2、倒序或在字首上加數字

  rowkey 的設計要具體問題具體分析,有時會采取倒序的原則,有時會采取 rowkey 前加上一個随機的數字。(該數字一般要和 HregionServer 的數量求模運算)

16.2.3、預分區

  根據業務預估資料量,提前建好預分區,避免 region 頻繁拆分合并造成的性能浪費。

16.3、MapReduce 分析過程

  操作流程:HBase 讀取資料 -> InputFormat -> map -> shuffle -> reduce -> OutputFormat -> Mysql

16.4、Hive 分析過程

  • 資料源:使用 Hive external table 建立關聯 HBase 中的資料表
  • 資料結果:儲存于 HDFS 上(或者儲存到 Hive 結果表中)
  • 操作流程:Hive external table -> UDF編寫 -> HQL 分析語句編寫 -> 儲存到 Hive 結果表中(其實也就是在HDFS上) -> Sqoop - 導出資料 -> Mysql

16.5、Mysql 表結構設計

16.5.1、常用關系型資料庫表模型

  在多元分析的商業智能解決方案中,根據

事實表

次元表

的關系,又可将常見的模型分為

星型模型

雪花型模型

。在設計邏輯型資料的模型的時候,就應考慮資料是按照星型模型還是雪花型模型進行組織。

  • 星型模型

    星型架構是一種非正規化的結構,多元資料集的每一個次元都直接與事實表相連接配接,不存在漸變次元,是以資料有一定的備援,如在地域次元表中,存在國家 A 省 B 的城市 C 以及國家 A 省 B 的城市 D 兩條記錄,那麼國家 A 和省 B 的資訊分别存儲了兩次,即存在備援。

大資料技術之_18_大資料離線平台_03_資料處理+工具代碼導入+業務 ETL 實作+建立資料庫表
  • 雪花模型

    當有一個或多個維表沒有直接連接配接到事實表上,而是通過其他維表連接配接到事實表上時,其圖解就像多個雪花連接配接在一起,故稱雪花模型。雪花模型是對星型模型的擴充。它對星型模型的維表進一步階層化,原有的各維表可能被擴充為小的事實表,形成一些局部的 " 層次" 區域,這些被分解的表都連接配接到主次元表而不是事實表。如下圖,将地域維表又分解為國家、省份、城市等維表。它的優點是:通過最大限度地減少資料存儲量以及聯合較小的維表來改善查詢性能。

    雪花型結構去除了資料備援

大資料技術之_18_大資料離線平台_03_資料處理+工具代碼導入+業務 ETL 實作+建立資料庫表

雪花模型在加載資料集時,ETL 操作在設計上更加複雜,而且由于附屬模型的限制,

不能并行化

星形模型加載次元表,不需要再次元之間添加附屬模型,是以 ETL 就相對簡單,而且可以實作高度的并行化。

16.5.2、表結構

  • 次元表:dimension_table
  • 事實表:stats_table
  • 輔助表:主要用于協助 ETL、資料分析等操作擷取其他非日志資料,例如:儲存會員 id 等

十七、工具代碼導入

代碼結構圖

大資料技術之_18_大資料離線平台_03_資料處理+工具代碼導入+業務 ETL 實作+建立資料庫表

部分示例代碼如下:

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.z</groupId>
    <artifactId>transformer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>transformer</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-resourcemanager -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
            <version>2.7.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.2.1</version>
        </dependency>

        <!-- mysql start -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>
        <!-- mysql end -->

        <!-- 使用者浏覽器解析 -->
        <dependency>
            <groupId>cz.mallat.uasparser</groupId>
            <artifactId>uasparser</artifactId>
            <version>0.6.1</version>
        </dependency>

        <!-- json包 -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.6.2</version>
        </dependency>
    </dependencies>
    <profiles>
        <profile>
            <!-- 唯一id,表示本地 -->
            <id>local</id>
            <activation>
                <!-- maven編譯的時候,預設環境,該參數為true隻能存在一個 -->
                <activeByDefault>true</activeByDefault>
            </activation>
            <build>
                <!-- 插件資訊 -->
                <plugins>
                    <plugin>
                        <!-- 将指定包的java檔案進行編譯打包操作 -->
                        <groupId>org.codehaus.mojo</groupId>
                        <artifactId>build-helper-maven-plugin</artifactId>
                        <version>1.4</version>
                        <executions>
                            <execution>
                                <id>add-source</id>
                                <phase>generate-sources</phase>
                                <goals>
                                    <goal>add-source</goal>
                                </goals>
                                <configuration>
                                    <sources>
                                        <source>${basedir}/src/main/java</source>
                                    </sources>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>

        <profile>
            <!-- 需要最終形成一個jar檔案 -->
            <id>dev</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.codehaus.mojo</groupId>
                        <artifactId>build-helper-maven-plugin</artifactId>
                        <version>1.4</version>
                        <executions>
                            <execution>
                                <id>add-source</id>
                                <phase>generate-sources</phase>
                                <goals>
                                    <goal>add-source</goal>
                                </goals>
                                <configuration>
                                    <sources>
                                        <source>${basedir}/src/main/java</source>
                                    </sources>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>

                    <plugin>
                        <!-- 将第三方的依賴包,一起打入到最終形成的jar檔案中 -->
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>2.1</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                                <configuration>
                                    <artifactSet>
                                        <includes>
                                            <include>cz.mallat.uasparser:uasparser</include>
                                            <include>net.sourceforge.jregex:jregex</include>
                                            <include>mysql:mysql-connector-java</include>
                                        </includes>
                                    </artifactSet>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>

    <build>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
        <pluginManagement>
            <plugins>
                <!--This plugin's configuration is used to store Eclipse m2e settings 
                    only. It has no influence on the Maven build itself. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.codehaus.mojo</groupId>
                                        <artifactId>
                                            build-helper-maven-plugin
                                        </artifactId>
                                        <versionRange>[1.4,)</versionRange>
                                        <goals>
                                            <goal>add-source</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore></ignore>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>           

複制

resources 目錄下檔案

core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
       Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
    <!-- 指定HDFS中NameNode的位址 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://hadoop102:9000</value>
    </property>

    <!-- 指定Hadoop運作時産生檔案的存儲目錄 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-2.7.2/data/tmp</value>
    </property>

    <property>
        <name>hadoop.proxyuser.admin.hosts</name>
        <value>*</value>
    </property>

    <property>
        <name>hadoop.proxyuser.admin.groups</name>
        <value>*</value>
    </property>

    <property>
        <name>hadoop.proxyuser.httpfs.hosts</name>
        <value>*</value>
    </property>

    <property>
        <name>hadoop.proxyuser.httpfs.groups</name>
        <value>*</value>
    </property>

    <!-- 配置垃圾回收時間為1分鐘
    <property>
        <name>fs.trash.interval</name>
        <value>1</value>
    </property>
    -->

    <!-- 修改通路垃圾資源回收筒使用者名稱為 atguigu
    <property>
        <name>hadoop.http.staticuser.user</name>
        <value>atguigu</value>
    </property>
    -->
</configuration>           

複制

hbase-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-->
<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop102:9000/hbase</value>
    </property>

    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <!-- 0.98後的新變動,之前版本沒有.port,預設端口為16000 -->
    <property>
        <name>hbase.master.port</name>
        <value>16000</value>
    </property>

    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
    </property>

    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/opt/module/zookeeper-3.4.10/zkData</value>
    </property>

    <property>
        <name>hbase.coprocessor.region.classes</name>
        <value>com.china.hbase.CalleeWriteObserver</value>
    </property>

    <property>
        <name>zookeeper.session.timeout</name>
        <value>90000</value>
    </property>
</configuration>           

複制

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
    <!-- 指定HDFS副本的數量,預設是3個 -->
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>

    <!-- 指定Hadoop輔助名稱節點主機配置 -->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>hadoop104:50090</value>
    </property>

    <!-- 關閉權限檢查-->
    <property>
        <name>dfs.permissions.enable</name>
        <value>false</value>
    </property>

    <property>
        <name>dfs.webhdfs.enabled</name>
        <value>true</value>
    </property>

    <!-- NameNode的本地目錄可以配置成多個,且每個目錄存放内容相同,增加了可靠性。
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:///${hadoop.tmp.dir}/dfs/name1,file:///${hadoop.tmp.dir}/dfs/name2</value>
    </property>
    -->

    <!-- DataNode也可以配置成多個目錄,每個目錄存儲的資料不一樣。即:資料不是副本。
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2</value>
    </property>
    -->

    <!-- 白名單資訊
    <property>
        <name>dfs.hosts</name>
        <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value>
    </property>
    -->

    <!-- 黑名單資訊
    <property>
        <name>dfs.hosts.exclude</name>
        <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value>
    </property>
    -->
</configuration>           

複制

log4j.properties

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Define some default values that can be overridden by system properties
hadoop.root.logger=INFO,console
hadoop.log.dir=.
hadoop.log.file=hadoop.log

# Define the root logger to the system property "hadoop.root.logger".
log4j.rootLogger=${hadoop.root.logger}, EventCounter

# Logging Threshold
log4j.threshold=ALL

# Null Appender
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender

#
# Rolling File Appender - cap space usage at 5gb.
#
hadoop.log.maxfilesize=256MB
hadoop.log.maxbackupindex=20
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}

log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}

log4j.appender.RFA.layout=org.apache.log4j.PatternLayout

# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n


#
# Daily Rolling File Appender
#

log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}

# Rollver at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd

# 30-day backup
#log4j.appender.DRFA.MaxBackupIndex=30
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout

# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n


#
# console
# Add "console" to rootlogger above if you want to use this 
#

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

#
# TaskLog Appender
#

#Default values
hadoop.tasklog.taskid=null
hadoop.tasklog.iscleanup=false
hadoop.tasklog.noKeepSplits=4
hadoop.tasklog.totalLogFileSize=100
hadoop.tasklog.purgeLogSplits=true
hadoop.tasklog.logsRetainHours=12

log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}

log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n

#
# HDFS block state change log from block manager
#
# Uncomment the following to suppress normal block state change
# messages from BlockManager in NameNode.
#log4j.logger.BlockStateChange=WARN

#
#Security appender
#
hadoop.security.logger=INFO,NullAppender
hadoop.security.log.maxfilesize=256MB
hadoop.security.log.maxbackupindex=20
log4j.category.SecurityLogger=${hadoop.security.logger}
hadoop.security.log.file=SecurityAuth-${user.name}.audit
log4j.appender.RFAS=org.apache.log4j.RollingFileAppender 
log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.appender.RFAS.MaxFileSize=${hadoop.security.log.maxfilesize}
log4j.appender.RFAS.MaxBackupIndex=${hadoop.security.log.maxbackupindex}

#
# Daily Rolling Security appender
#
log4j.appender.DRFAS=org.apache.log4j.DailyRollingFileAppender 
log4j.appender.DRFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
log4j.appender.DRFAS.layout=org.apache.log4j.PatternLayout
log4j.appender.DRFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.appender.DRFAS.DatePattern=.yyyy-MM-dd

#
# hadoop configuration logging
#

# Uncomment the following line to turn off configuration deprecation warnings.
# log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN

#
# hdfs audit logging
#
hdfs.audit.logger=INFO,NullAppender
hdfs.audit.log.maxfilesize=256MB
hdfs.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}

#
# mapred audit logging
#
mapred.audit.logger=INFO,NullAppender
mapred.audit.log.maxfilesize=256MB
mapred.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.mapred.AuditLogger=${mapred.audit.logger}
log4j.additivity.org.apache.hadoop.mapred.AuditLogger=false
log4j.appender.MRAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.MRAUDIT.File=${hadoop.log.dir}/mapred-audit.log
log4j.appender.MRAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.MRAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
log4j.appender.MRAUDIT.MaxFileSize=${mapred.audit.log.maxfilesize}
log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}

# Custom Logging levels

#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
#log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=DEBUG

# Jets3t library
log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR

# AWS SDK & S3A FileSystem
log4j.logger.com.amazonaws=ERROR
log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR
log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN

#
# Event Counter Appender
# Sends counts of logging messages at different severity levels to Hadoop Metrics.
#
log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter

#
# Job Summary Appender 
#
# Use following logger to send summary to separate file defined by 
# hadoop.mapreduce.jobsummary.log.file :
# hadoop.mapreduce.jobsummary.logger=INFO,JSA
# 
hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
hadoop.mapreduce.jobsummary.log.maxfilesize=256MB
hadoop.mapreduce.jobsummary.log.maxbackupindex=20
log4j.appender.JSA=org.apache.log4j.RollingFileAppender
log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
log4j.appender.JSA.MaxFileSize=${hadoop.mapreduce.jobsummary.log.maxfilesize}
log4j.appender.JSA.MaxBackupIndex=${hadoop.mapreduce.jobsummary.log.maxbackupindex}
log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false

#
# Yarn ResourceManager Application Summary Log 
#
# Set the ResourceManager summary log filename
yarn.server.resourcemanager.appsummary.log.file=rm-appsummary.log
# Set the ResourceManager summary log level and appender
yarn.server.resourcemanager.appsummary.logger=${hadoop.root.logger}
#yarn.server.resourcemanager.appsummary.logger=INFO,RMSUMMARY

# To enable AppSummaryLogging for the RM, 
# set yarn.server.resourcemanager.appsummary.logger to 
# <LEVEL>,RMSUMMARY in hadoop-env.sh

# Appender for ResourceManager Application Summary Log
# Requires the following properties to be set
#    - hadoop.log.dir (Hadoop Log directory)
#    - yarn.server.resourcemanager.appsummary.log.file (resource manager app summary log filename)
#    - yarn.server.resourcemanager.appsummary.logger (resource manager app summary log level and appender)

log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=${yarn.server.resourcemanager.appsummary.logger}
log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=false
log4j.appender.RMSUMMARY=org.apache.log4j.RollingFileAppender
log4j.appender.RMSUMMARY.File=${hadoop.log.dir}/${yarn.server.resourcemanager.appsummary.log.file}
log4j.appender.RMSUMMARY.MaxFileSize=256MB
log4j.appender.RMSUMMARY.MaxBackupIndex=20
log4j.appender.RMSUMMARY.layout=org.apache.log4j.PatternLayout
log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n

# HS audit log configs
#mapreduce.hs.audit.logger=INFO,HSAUDIT
#log4j.logger.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=${mapreduce.hs.audit.logger}
#log4j.additivity.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=false
#log4j.appender.HSAUDIT=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.HSAUDIT.File=${hadoop.log.dir}/hs-audit.log
#log4j.appender.HSAUDIT.layout=org.apache.log4j.PatternLayout
#log4j.appender.HSAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
#log4j.appender.HSAUDIT.DatePattern=.yyyy-MM-dd

# Http Server Request Logs
#log4j.logger.http.requests.namenode=INFO,namenoderequestlog
#log4j.appender.namenoderequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.namenoderequestlog.Filename=${hadoop.log.dir}/jetty-namenode-yyyy_mm_dd.log
#log4j.appender.namenoderequestlog.RetainDays=3

#log4j.logger.http.requests.datanode=INFO,datanoderequestlog
#log4j.appender.datanoderequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.datanoderequestlog.Filename=${hadoop.log.dir}/jetty-datanode-yyyy_mm_dd.log
#log4j.appender.datanoderequestlog.RetainDays=3

#log4j.logger.http.requests.resourcemanager=INFO,resourcemanagerrequestlog
#log4j.appender.resourcemanagerrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.resourcemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-resourcemanager-yyyy_mm_dd.log
#log4j.appender.resourcemanagerrequestlog.RetainDays=3

#log4j.logger.http.requests.jobhistory=INFO,jobhistoryrequestlog
#log4j.appender.jobhistoryrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.jobhistoryrequestlog.Filename=${hadoop.log.dir}/jetty-jobhistory-yyyy_mm_dd.log
#log4j.appender.jobhistoryrequestlog.RetainDays=3

#log4j.logger.http.requests.nodemanager=INFO,nodemanagerrequestlog
#log4j.appender.nodemanagerrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.nodemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-nodemanager-yyyy_mm_dd.log
#log4j.appender.nodemanagerrequestlog.RetainDays=3           

複制

十八、業務 ETL 實作

18.1、功能

  • 過濾内容:過濾無效資料,比如缺少 uuid,缺少會話 ip,訂單事件中缺少訂單 id。
  • 補全内容:IP 位址資訊補全地域資訊(國家、省份、城市等)、浏覽器相關資訊補全,伺服器時間補全等等。

18.2、資料

18.2.1、上傳方式

  • Flume: 在Flume 工作正常的情況下,所有的日志均由 Flume 上傳寫入。(詳見第13.4章節)
  • Shell 手動:當 Flume 程序出現異常,需要手動執行腳本的上傳。(詳見第十五章節)

18.2.2、流程

  • 使用 MapReduce 通過 TextInputFormat 的方式将 HDFS 中的資料讀取到 map 中,最終通過 TableOutputFormat 到 HBase 中。

18.2.3、細節分析

日志解析

  日志存儲于 HDFS 中,一行一條日志,解析出操作行為中具體的 key-value 值,然後進行解碼操作。

IP位址解析/補全

浏覽器資訊解析

HBase rowkey 設計

注意規則

:盡可能的短小,占用記憶體少,盡可能的均勻分布。(即散列)

HBase 表的建立

  使用 Java API 建立。

18.3、代碼實作

關鍵類:

  LoggerUtil.java

示例代碼如下:

package com.z.transformer.util;

import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import com.z.transformer.common.EventLogConstants;
import com.z.transformer.util.IPSeekerExt.RegionInfo;

import cz.mallat.uasparser.UserAgentInfo;

public class LoggerUtil {

    // 日志輸出提示
    private static final Logger logger = Logger.getLogger(LoggerUtil.class);

    /**
     * 解析給定的日志行,如果解析成功傳回一個有值的 map 集合,如果解析失敗,傳回一個 empty 集合
     * 
     * @param logText
     * @return
     */
    public static Map<String, String> handleLogText(String logText) {
        Map<String, String> result = new HashMap<String, String>();
        // 1、開始解析
        // hadoop 叢集中預設隻有 org.apache.commons.lang.StringUtils 所在的 jar 包,如果使用其他
        // StringUtils,hadoop 叢集中需要導入該 StringUtils 依賴的 jar 包方可使用
        if (StringUtils.isNotBlank(logText)) {
            // 日志行非空,可以進行解析
            String[] splits = logText.trim().split(EventLogConstants.LOG_SEPARTIOR); // 日志分隔符
                                                                                        // ^A
            // 192.168.25.102^A1555318954.798^A/what.png?u_nu=1&u_sd=6D4F89C0-E17B-45D0-BFE0-059644C1878D&c_time=......
            if (splits.length == 3) {
                // 日志格式是正确的,進行解析
                String ip = splits[0].trim();
                // 将 ip 位址封裝進 Map 集合中
                result.put(EventLogConstants.LOG_COLUMN_NAME_IP, ip);
                long serverTime = TimeUtil.parseNginxServerTime2Long(splits[1].trim());
                if (serverTime != -1L) {
                    // 表示伺服器時間解析正确,而且 serverTime 就是對于的毫秒級的時間戳
                    // 将 serverTime 封裝進 Map 集合中
                    result.put(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, String.valueOf(serverTime));
                }

                // 擷取請求體
                String requestBody = splits[2].trim();
                int index = requestBody.indexOf("?"); // ? 符号所在的索引位置
                if (index >= 0 && index != requestBody.length() - 1) {
                    // 在請求參數中存在 ?,而且 ? 不是最後一個字元的情況,則截取?後面的内容
                    requestBody = requestBody.substring(index + 1);
                } else {
                    requestBody = null;
                }

                if (StringUtils.isNotBlank(requestBody)) {
                    // 非空,開始處理請求參數
                    handleRequestBody(result, requestBody);

                    // 開始補全 ip 位址
                    RegionInfo info = IPSeekerExt.getInstance().analysisIp(result.get(EventLogConstants.LOG_COLUMN_NAME_IP)); // 使用者ip位址
                    if (info != null) {
                        result.put(EventLogConstants.LOG_COLUMN_NAME_COUNTRY, info.getCountry()); // 國家
                        result.put(EventLogConstants.LOG_COLUMN_NAME_PROVINCE, info.getProvince()); // 省份
                        result.put(EventLogConstants.LOG_COLUMN_NAME_CITY, info.getCity()); // 城市
                    }

                    // 開始補全浏覽器資訊
                    UserAgentInfo uaInfo = UserAgentUtil.analyticUserAgent(result.get(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT)); // 浏覽器user agent參數
                    if (uaInfo != null) {
                        // 浏覽器名稱
                        result.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, uaInfo.getUaFamily()); // 浏覽器名稱
                        // 浏覽器版本号
                        result.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, uaInfo.getBrowserVersionInfo()); // 浏覽器版本
                        // 浏覽器所在作業系統
                        result.put(EventLogConstants.LOG_COLUMN_NAME_OS_NAME, uaInfo.getOsFamily()); // 作業系統名稱
                        // 浏覽器所在作業系統的版本
                        result.put(EventLogConstants.LOG_COLUMN_NAME_OS_VERSION, uaInfo.getOsName()); // 作業系統版本
                    }

                } else {
                    // logger
                    logger.debug("請求參數為空:" + logText);
                    result.clear(); // 清空
                }
            } else {
                // log記錄一下
                logger.debug("日志行内容格式不正确:" + logText);
            }
        } else {
            logger.debug("日志行内容為空,無法進行解析:" + logText);
        }
        return result;
    }

    /**
     * 處理請求參數<br/>
     * 處理結果儲存到參數 result 集合(Map 集合)
     * 
     * @param clientInfo
     *            儲存最終使用者行為資料的 map 集合
     * @param requestBody
     *            請求參數中,使用者行為資料,格式為:
     *            u_nu=1&u_sd=6D4F89C0-E17B-45D0-BFE0-059644C1878D&c_time=
     *            1450569596991&ver=1&en=e_l&pl=website&sdk=js&b_rst=1440*900&
     *            u_ud=4B16B8BB-D6AA-4118-87F8-C58680D22657&b_iev=Mozilla%2F5.0%
     *            20(Windows%20NT%205.1)%20AppleWebKit%2F537.36%20(KHTML%2C%
     *            20like%20Gecko)%20Chrome%2F45.0.2454.101%20Safari%2F537.36&l=
     *            zh-CN&bf_sid=33cbf257-3b11-4abd-ac70-c5fc47afb797_11177014
     */
    private static void handleRequestBody(Map<String, String> clientInfo, String requestBody) {
        // 将請求參數體按照 & 切割
        String[] parameters = requestBody.split("&");
        for (String parameter : parameters) {
            // 循環處理參數,parameter 格式為: c_time=1450569596991  = 隻會出現一次
            String[] params = parameter.split("=");
            String key, value = null;
            try {
                // 使用 utf-8 解碼
                key = URLDecoder.decode(params[0].trim(), "utf-8");
                value = URLDecoder.decode(params[1].trim(), "utf-8");
                // 添加到結果集合  Map 中
                clientInfo.put(key, value);
            } catch (Exception e) {
                logger.warn("解碼失敗:" + parameter, e);
            }
        }
    }
}           

複制

18.3.1、日志解析

18.3.2、IP位址解析/補全

使用淘寶接口解析IP位址

  官網:http://ip.taobao.com/

  示例:REST API:http://ip.taobao.com/service/getIpInfo.php?ip=123.125.71.38

  限制:10QPS(Query Per Second)

使用第三方 IP 庫

  通過檔案中已經存放的 IP 和地區的映射進行 IP 解析,由于更新不及時,可能會導緻某些 IP 解析不正确(小機率事件)。(推薦使用:純真IP位址資料庫)

使用自己的 IP 庫

  通過第三方的 IP 庫,逐漸生成自己的 IP 庫,自主管理。

IP 庫表設計

  startip(起始ip)

  endip(結束ip)

  country(國家)

  province(省份)

  city(城市)

尖叫提示

:判斷某個 IP 是否在某個地域的起始 IP 和結束 IP 區間。

IP 與 long 的互轉的工具類:

示例代碼如下:

    // 将 127.0.0.1 形式的 IP 位址轉換成十進制整數
    public long IpToLong(String strIp){
        long[] ip = new long[4];
        int position1 = strIp.indexOf(".");
        int position2 = strIp.indexOf(".", position1 + 1);
        int position3 = strIp.indexOf(".", position2 + 1);
        // 将每個.之間的字元串轉換成整型  
        ip[0] = Long.parseLong(strIp.substring(0, position1));
        ip[1] = Long.parseLong(strIp.substring(position1 + 1, position2 - position1 - 1));
        ip[2] = Long.parseLong(strIp.substring(position2 + 1, position3 - position2 - 1));
        ip[3] = Long.parseLong(strIp.substring(position3 + 1));
        // 進行左移位處理
        return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3];
    }

    // 将十進制整數形式轉換成 127.0.0.1 形式的 ip 位址
    public String LongToIp(long ip) {
        StringBuilder sb = new StringBuilder();
        // 直接右移 24 位
        sb.append(ip >> 24);
        sb.append(".");
        // 将高 8 位置 0,然後右移 16
        sb.append((ip & 0x00FFFFFF) >> 16);
        sb.append(".");
        // 将高 16 位置0 ,然後右移 8 位
        sb.append((ip & 0x0000FFFF) >> 8);
        sb.append(".");
        // 将高 24 位置 0
        sb.append((ip & 0x000000FF));
        return sb.toString();
    }           

複制

18.3.3、浏覽器資訊解析

  • 依賴查詢:http://mvnrepository.com/
  • 依賴工具:uasparser 第三方浏覽器資訊解析工具

18.3.4、ETL代碼編寫

建立類:

  AnalysisDataMapper.java

  AnalysisDataRunner.java

  目标:讀取 HDFS 中的資料,清洗後寫入到 HBase 中。

核心思路梳理:

  • Step1、建立 AnalysisDataMapper 類,複寫 map 方法。
  • Step2、在 map 方法中通過 LoggerUtil.handleLogText 方法将目前行資料解析成

    Map<String, String>

    集合 clientInfo。
  • Step3、擷取目前行日志資訊的事件類型,并根據擷取到的事件類型去枚舉類型中比對生成 EventEnum 對象,如果沒有比對到對應的事件類型,則傳回 null。
  • Step4、判斷如果無法處理給定的事件類型,則使用 log4j 輸出。
  • Step5、如果可以處理指定事件類型,則開始處理事件,建立

    handleEventData(Map<String, String> clientInfo, EventEnum event, Context context, Text value)

    方法處理事件。
  • Step6、在 handleEventData 方法中,我們需要過濾掉那些資料不合法的 Event 事件,通過

    filterEventData(Map<String, String> clientInfo, EventEnum event)

    方法過濾。

    過濾規則:如果是 java_server 過來的資料,則會員 id 必須存在,如果是 website 過來的資料,則會話 id 和使用者 id 必須存在。

  • Step7、如果沒有通過過濾,則通過日志輸出目前資料,如果通過過濾,則開始準備輸出資料,建立方法

    outPutData(Map<String, String> clientInfo, Context context)

  • Step8、outputData 方法中,我們可以删除一些無用的資料,比如浏覽器資訊的原始資料(因為已經解析過了)。同時需要建立一個生成 rowKey 的方法

    generateRowKey(String uuid, long serverTime, Map<String, String> clientInfo)

    ,通過該方法生成的 rowKey 之後,添加内容到 HBase 表中。
  • Step9、generateRowKey 方法主要用于 rowKey 的生成,通過拼接:時間+uuid的crc32編碼+資料内容的hash碼的crc32編碼,作為 rowKey,一共 12 個位元組。

示例代碼如下:

AnalysisDataMapper.java

package com.z.transformer.mr.etl;

import java.io.IOException;
import java.util.Map;
import java.util.zip.CRC32;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.EventLogConstants.EventEnum;
import com.z.transformer.util.LoggerUtil;
import com.z.transformer.util.TimeUtil;

public class AnalysisDataMapper extends Mapper<Object, Text, NullWritable, Put> {
    // Object 是偏移量,Text 表示輸入,NullWritable, Put 可以互換

    // 如果無法處理給定的事件類型,則使用 log4j 輸出, Logger 可以在運作 jar 包的控制台輸出
    private static final Logger logger = Logger.getLogger(AnalysisDataMapper.class);

    private CRC32 crc1 = null;
    private CRC32 crc2 = null;
    private byte[] family = null;
    private long currentDayInMills = -1;

    /**
     * 初始化資料
     */
    @Override
    protected void setup(Mapper<Object, Text, NullWritable, Put>.Context context)
            throws IOException, InterruptedException {
        crc1 = new CRC32();
        crc2 = new CRC32();
        this.family = EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME;
        currentDayInMills = TimeUtil.getTodayInMillis();
    }

    // 1、覆寫 map 方法
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        // 2、将原始資料通過 LoggerUtil 解析成 Map 鍵值對
        Map<String, String> clientInfo = LoggerUtil.handleLogText(value.toString());

        // 2.1、如果解析失敗,則 Map 集合中無資料,通過日志輸出目前資料
        if (clientInfo.isEmpty()) {
            logger.debug("日志解析失敗:" + value.toString());
            return;
        }

        // 3、根據解析後的資料,生成對應的 Event 事件類型(通過枚舉類型的别名來解析)
        EventEnum event = EventEnum.valueOfAlias(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME));
        if (event == null) {
            // 4、無法處理的事件,直接輸出事件類型
            logger.debug("無法比對對應的事件類型:" + clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME));
        } else {
            // 5、處理具體的事件
            handleEventData(clientInfo, event, context, value);
            // clientInfo 資料集, event 事件類型, context 上下文(通過上下文寫入到HBase), value 目前行的資料(可能會有新的過濾操作)
        }
    }

    /**
     * 處理具體的事件的方法
     * 
     * @param clientInfo
     * @param event
     * @param context
     * @param value
     * @throws InterruptedException
     * @throws IOException
     */
    public void handleEventData(Map<String, String> clientInfo, EventEnum event, Context context, Text value)
            throws IOException, InterruptedException {
        // 6、如果事件成功通過過濾,則準備處理具體事件
        if (filterEventData(clientInfo, event)) {
            outPutData(clientInfo, context);
        } else {
            // 如果事件沒有通過過濾,則通過日志輸出目前資料
            logger.debug("事件格式不正确:" + value.toString());
        }
    }

    /**
     * 6、如果事件成功通過過濾,則準備處理具體事件(我們的 HBase 隻存成功通過過濾的事件)
     * 
     * @param clientInfo
     * @param event
     * @return
     */
    public boolean filterEventData(Map<String, String> clientInfo, EventEnum event) {
        // 事件資料全局過濾(具體全局過濾條件視情況而定,這裡的 “伺服器時間” 和 “平台” 是例子)
        boolean result = StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME))
                && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_PLATFORM));
        // 後面幾乎全部是&&操作,隻要有一個 false,那麼該 Event 事件就無法處理

        // public static final String PC_WEBSITE_SDK = "website";
        // public static final String JAVA_SERVER_SDK = "java_server";

        // 先确定平台
        switch (clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)) {
        // Java Server 平台發來的資料
        case EventLogConstants.PlatformNameConstants.JAVA_SERVER_SDK:
            result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID)); // 先判斷會員 ID 是否存在
            // 再确定事件
            switch (event) {
            case CHARGEREFUND:
                // 退款事件
                // ......
                break;
            case CHARGESUCCESS:
                // 訂單支付成功事件
                result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_ID));
                break;
            default:
                logger.debug("無法處理指定事件:" + clientInfo);
                result = false;
                break;
            }
            break;

        // WebSite 平台發來的資料
        case EventLogConstants.PlatformNameConstants.PC_WEBSITE_SDK:
            // 再确定事件
            switch (event) {
            case CHARGEREQUEST:
                // 下單事件
                result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_ID))
                        && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_CURRENCY_TYPE))
                        && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_PAYMENT_TYPE))
                        && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_CURRENCY_AMOUNT));
                break;
            case EVENT:
                // Event 事件
                result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_CATEGORY))
                        && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_ACTION));
                break;
            case LAUNCH:
                // Launch 通路事件
                // ......
                break;
            case PAGEVIEW:
                // PV 事件
                result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_CURRENT_URL));
                break;
            default:
                logger.debug("無法處理指定事件:" + clientInfo);
                result = false;
                break;
            }
            break;

        default:
            result = false;
            logger.debug("無法确定的資料來源:" + clientInfo);
            break;
        }

        return result;
    }

    /**
     * 7 和 8、如果事件成功通過過濾,則輸出事件到 HBase 的方法
     * 
     * @param clientInfo
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    public void outPutData(Map<String, String> clientInfo, Context context) throws IOException, InterruptedException {
        String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
        long serverTime = Long.valueOf(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME));

        // 因為浏覽器資訊已經解析完成,是以此時删除原始的浏覽器資訊
        clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT);

        // 建立 rowKey
        byte[] rowkey = generateRowKey(uuid, serverTime, clientInfo);
        Put put = new Put(rowkey);

        for (Map.Entry<String, String> entry : clientInfo.entrySet()) {
            if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) {
                put.addColumn(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
            }
        }

        context.write(NullWritable.get(), put);
    }

    /**
     * 9、為向 HBase 中寫入資料依賴 Put 對象,Put 對象的建立依賴 RowKey,是以如下方法
     * 
     * rowKey=時間+uuid的crc32編碼+資料内容的hash碼的crc32編碼
     * 
     * @return
     */
    public byte[] generateRowKey(String uuid, long serverTime, Map<String, String> clientInfo) {
        // 先清空 crc1 和  crc2 集合中的資料内容
        crc1.reset();
        crc2.reset();

        // 時間=目前資料通路伺服器的時間-當天00:00點的時間戳 ,得到最大值是8位數字=3600*24*1000=86400000 ,可以用int來存儲,大小是 4個位元組
        byte[] timeBytes = Bytes.toBytes(serverTime - this.currentDayInMills);

        // uuid 的 crc 編碼
        if (StringUtils.isNotBlank(uuid)) {
            this.crc1.update(Bytes.toBytes(uuid));
        }
        byte[] uuidBytes = Bytes.toBytes(this.crc1.getValue());

        // 資料内容的 hash 碼的 crc 編碼
        this.crc2.update(Bytes.toBytes(clientInfo.hashCode()));
        byte[] clientInfoBytes = Bytes.toBytes(this.crc2.getValue());

        // 綜合位元組數組
        byte[] buffer = new byte[timeBytes.length + uuidBytes.length + clientInfoBytes.length];
        // 數組合并
        System.arraycopy(timeBytes, 0, buffer, 0, timeBytes.length);
        System.arraycopy(uuidBytes, 0, buffer, timeBytes.length, uuidBytes.length);
        System.arraycopy(clientInfoBytes, 0, buffer, uuidBytes.length, clientInfoBytes.length);

        return buffer;
    }
}           

複制

AnalysisDataRunner.java

package com.z.transformer.mr.etl;

import java.io.File;
import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.GlobalConstants;
import com.z.transformer.util.TimeUtil;

public class AnalysisDataRunner implements Tool {
    private Configuration conf = null;

    public static void main(String[] args) {
        try {
            int resultCode = ToolRunner.run(new AnalysisDataRunner(), args);
            if (resultCode == 0) {
                System.out.println("Success!");
            } else {
                System.out.println("Fail!");
            }
            System.exit(resultCode);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void setConf(Configuration conf) {
        // 先執行個體化 Configuration
        this.conf = HBaseConfiguration.create(conf);
    }

    @Override
    public Configuration getConf() {
        // 全局的通路方法
        return this.conf;
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        // 處理傳入的時間參數,預設或不合法時間則直接使用昨天日期
        this.processArgs(conf, args);

        // 開始建立 Job
        Job job = Job.getInstance(conf, "Event-ETL");

        // 設定 Job 參數
        job.setJarByClass(AnalysisDataRunner.class);

        // Mapper 參數設定
        job.setMapperClass(AnalysisDataMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Put.class);

        // Reducer 參數設定
        job.setNumReduceTasks(0);

        // 設定資料輸入
        initJobInputPath(job);

        // 設定輸出到 HBase 的資訊
        initHBaseOutPutConfig(job);
        // job.setJar("target/transformer-0.0.1-SNAPSHOT.jar");

        // Job 送出
        return job.waitForCompletion(true) ? 0 : 1;
    }

    /**
     * 初始化 Job 資料輸入目錄
     * 
     * @param job
     * @throws IOException
     */
    private void initJobInputPath(Job job) throws IOException {
        Configuration conf = job.getConfiguration();
        // 擷取要執行ETL操作的那一天的資料
        String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); // 2017-08-14
        // 格式化 HDFS 檔案路徑
        String hdfsPath = TimeUtil.parseLong2String(TimeUtil.parseString2Long(date), "yyyy/MM/dd");// 2017/08/14

        if (GlobalConstants.HDFS_LOGS_PATH_PREFIX.endsWith("/")) {
            hdfsPath = GlobalConstants.HDFS_LOGS_PATH_PREFIX + hdfsPath; // /event-logs/2017/08/14
        } else {
            hdfsPath = GlobalConstants.HDFS_LOGS_PATH_PREFIX + File.separator + hdfsPath; // /event-logs/2017/08/14
            // File.separator 的作用是:根據目前作業系統擷取對應的檔案分隔符,windows中是 \ ,Linux中是 /
        }

        FileSystem fs = FileSystem.get(conf);
        Path inPath = new Path(hdfsPath);

        if (fs.exists(inPath)) {
            FileInputFormat.addInputPath(job, inPath);
        } else {
            throw new RuntimeException("HDFS 中該檔案目錄不存在:" + hdfsPath);
        }
    }

    /**
     * 設定輸出到 HBase 的一些操作選項
     * 
     * @throws IOException
     */
    private void initHBaseOutPutConfig(Job job) throws IOException {
        Configuration conf = job.getConfiguration();
        // 擷取要執行ETL操作的那一天的資料
        String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); // 2017-08-14
        // 格式化 HBase 表的字尾名
        String tableNameSuffix = TimeUtil.parseLong2String(TimeUtil.parseString2Long(date), TimeUtil.HBASE_TABLE_NAME_SUFFIX_FORMAT); // 20170814
        // 建構表名
        String tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS + tableNameSuffix; // event_logs20170814

        // 指定輸出(初始化 ReducerJob)
        TableMapReduceUtil.initTableReducerJob(tableName, null, job);

        Connection conn = null;
        Admin admin = null;

        // 使用 HBase 的新 API
        conn = ConnectionFactory.createConnection(conf);
        admin = conn.getAdmin();

        // 建立表描述器(即通過表名執行個體化表描述器)
        TableName tn = TableName.valueOf(tableName);
        HTableDescriptor htd = new HTableDescriptor(tn);

        // 設定列族
        htd.addFamily(new HColumnDescriptor(EventLogConstants.EVENT_LOGS_FAMILY_NAME));
        // 判斷表是否存在
        if (admin.tableExists(tn)) {
            // 存在,則删除
            if (admin.isTableEnabled(tn)) {
                // 先将表設定為不可用
                admin.disableTable(tn);
            }
            // 再删除表
            admin.deleteTable(tn);
        }

        // 建立表,在建立的過程中可以考慮預分區操作
        // 假設預分區為 3個分區
        // byte[][] keySplits = new byte[3][];
        // keySplits[0] = Bytes.toBytes("1"); // (-∞, 1]
        // keySplits[1] = Bytes.toBytes("2"); // (1, 2]
        // keySplits[2] = Bytes.toBytes("3"); // (2, ∞]
        // admin.createTable(htd, keySplits);

        admin.createTable(htd);
        admin.close();
    }

    /**
     * 處理時間參數,如果沒有傳遞參數的話,則預設清洗前一天的。
     * 
     * Job腳本如下: bin/yarn jar ETL.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2017-08-14
     * 
     * @param args
     */
    private void processArgs(Configuration conf, String[] args) {
        String date = null;
        for (int i = 0; i < args.length; i++) {
            if ("-date".equals(args[i])) { // 找到 "-date" 标記
                date = args[i + 1]; // 擷取時間
                break;
            }
        }

        if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
            // 如果沒有傳遞參數,預設清洗昨天的資料然後存儲到 HBase 中
            date = TimeUtil.getYesterday();
        }
        // 将要清洗的目标時間字元串儲存到 conf 對象中(這樣全局中就可以引用)
        conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
    }
}           

複制

18.4、測試

18.4.1、上傳測試資料

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir -p /event-logs/2015/12/20
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put /opt/software/20151220.log /event-logs/2015/12/20           

複制

18.4.2、打包叢集運作

方案一:

  修改 etc/hadoop/hadoop-env.sh 中的 HADOOP_CLASSPATH 配置資訊。

例如:

    export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*           

複制

方案二:

  使用 maven 插件:maven-shade-plugin,将第三方依賴的 jar 全部打包進去,需要在 pom.xml 中配置依賴。參考【章節 十七、工具代碼導入】中的 pom.xml 檔案。

參數設定:

    1、-P local clean package(不打包第三方jar)
    2、-P dev clean package install(打包第三方jar)(推薦使用這種)           

複制

打包成功後,将打好的 jar 包上傳至 Linux 上,然後執行指令,如下:

/opt/module/hadoop-2.7.2/bin/yarn jar /opt/software/transformer-0.0.1-SNAPSHOT.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2015-12-20           

複制

測試成功!截圖如下:

1、控制台

大資料技術之_18_大資料離線平台_03_資料處理+工具代碼導入+業務 ETL 實作+建立資料庫表

2、HBase 網頁端:http://hadoop102:16010/master-status

大資料技術之_18_大資料離線平台_03_資料處理+工具代碼導入+業務 ETL 實作+建立資料庫表

3、曆史伺服器:http://hadoop102:19888/jobhistory/attempts/job_1555404378493_0005/m/SUCCESSFUL

大資料技術之_18_大資料離線平台_03_資料處理+工具代碼導入+業務 ETL 實作+建立資料庫表

尖叫提示

:如果在打包的過程中 org.apache.maven.plugins 其中沒有包含所依賴的 jar 包,則需要在 HADOOP_CLASSPATH 添加所依賴的 jar 檔案。

例如:編寫代碼依賴了 HBase,但是打包 MR 任務的時候,沒有 include HBase 的相關 jar,則需要在指令行中執行如下指令:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*           

複制

在執行代碼之前,我們先手動删除 hbase 上的表和命名空間,指令如下:

hbase(main):002:0> disable 'event_logs20151220'
hbase(main):003:0> drop 'event_logs20151220'
hbase(main):005:0> drop_namespace 'ns_ct'           

複制

問題:當我們檢視曆史伺服器中的 Logs 日志時,發現一個解碼失敗異常:java.lang.IllegalArgumentException: URLDecoder: Incomplete trailing escape (%) pattern,如下圖所示:

大資料技術之_18_大資料離線平台_03_資料處理+工具代碼導入+業務 ETL 實作+建立資料庫表

解決問題連結:https://www.cnblogs.com/chenmingjun/p/10719587.html

十九、建立資料庫表

19.1、使用 Navicat 工具

前提:需要在 Linux 中對 Mysql 的通路授權。

grant all on *.* to root@'%' identified by '123456';
flush privileges;
exit;           

複制

19.2、通過 SQL 檔案建構表

  參考連結:https://www.cnblogs.com/chenmingjun/p/10185797.html