天天看點

Zipkin原理學習 -- Druid 追蹤多種資料庫 SQL 執行

在上一篇部落格 《Zipkin原理學習–日志追蹤 MySQL 執行語句》 中我們已經了解學習到 Zipkin 官方提供的針對 MySQL 資料庫 sql 語句執行的追蹤攔截器,現在我們基于資料庫連接配接池 Druid 的 Filter 機制 寫一個能支援多種資料庫(mysql,postgresql、oracle、sql server 等)類型 SQL 執行日志追蹤攔截器。

Druid 過濾器 Filter

DruidDataSource支援通過Filter-Chain模式進行擴充,類似Serlvet的Filter,擴充十分友善,你可以攔截任何JDBC的方法。

有兩種配置Filter的方式,一種是配置filters屬性,一種是配置proxyFilters屬性。filters和proxyFilters的配置是組合關系,而不是替換關系。

配置filters屬性

配置filters屬性比較簡單,filters的類型是字元串,多個filter使用逗号隔開。例如:

<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"
      init-method="init" destroy-method="close">
      <property name="url" value="jdbc:derby:memory:spring-test;create=true" />
      <property name="initialSize" value="1" />
      <property name="maxActive" value="20" />
      <property name="filters" value="stat,log4j" />
  </bean>      

filters屬性的配置使用别名或者全類名,stat是com.alibaba.druid.filter.stat.StatFilter的别名。​​ 内置Filter的别名 ​​ 檢視内置Filter的别名。

配置proxyFilters屬性

proxyFilters的類型是List,使用proxyFilters配置,可以有更多的配置選項。

<bean id="stat-filter" class="com.alibaba.druid.filter.stat.StatFilter">
  </bean>

  <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"
      init-method="init" destroy-method="close">
      <property name="url" value="jdbc:derby:memory:spring-test;create=true" />
      <property name="initialSize" value="1" />
      <property name="maxActive" value="20" />
      <property name="proxyFilters">
          <list>
              <ref bean="stat-filter" />
          </list>
      </property>
  </bean>      

Zipkin 日志追蹤過濾器 TracingStatementFilter

TracingStatementFilter 繼承抽象類 FilterEventAdapter 實作以下接口:

protected void statementExecuteUpdateBefore(StatementProxy statement, String sql) {
    }

    protected void statementExecuteUpdateAfter(StatementProxy statement, String sql, int updateCount) {
    }

    protected void statementExecuteQueryBefore(StatementProxy statement, String sql) {
    }

    protected void statementExecuteQueryAfter(StatementProxy statement, String sql, ResultSetProxy resultSet) {
    }

    protected void statementExecuteBefore(StatementProxy statement, String sql) {
    }

    protected void statementExecuteAfter(StatementProxy statement, String sql, boolean result) {
    }

    protected void statementExecuteBatchBefore(StatementProxy statement) {
    }

    protected void statementExecuteBatchAfter(StatementProxy statement, int[] result) {
    }

    protected void statement_executeErrorAfter(StatementProxy statement, String sql, Throwable error) {
    }      

實作以上接口,在執行SQL語句之前、之後或抛出異常時都會執行上面接口方法,這樣就可以追蹤 SQL 語句的執行過程了。

完整 Zipkin 日志追蹤代碼:

public class TracingStatementFilter extends FilterEventAdapter {


    private String zipkinServiceName;

    public TracingStatementFilter(String zipkinServiceName){
        this.zipkinServiceName = zipkinServiceName;
    }

    public TracingStatementFilter(){

    }


    @Override
    protected void statementExecuteUpdateBefore(StatementProxy statement, String sql) {

        super.statementExecuteUpdateBefore(statement,sql);

        Before(statement,sql);

    }

    @Override
    protected void statementExecuteUpdateAfter(StatementProxy statement, String sql, int updateCount) {

        super.statementExecuteUpdateAfter(statement,sql,updateCount);

        After(statement,sql);

    }

    @Override
    protected void statementExecuteQueryBefore(StatementProxy statement, String sql) {

        super.statementExecuteQueryBefore(statement,sql);

        Before(statement,sql);

    }

    @Override
    protected void statementExecuteQueryAfter(StatementProxy statement, String sql, ResultSetProxy resultSet) {

        super.statementExecuteQueryAfter(statement,sql,resultSet);

        After(statement,sql);


    }

    @Override
    protected void statementExecuteBefore(StatementProxy statement, String sql) {

        super.statementExecuteBefore(statement,sql);
        Before(statement,sql);

    }

    @Override
    protected void statementExecuteAfter(StatementProxy statement, String sql, boolean result) {

        super.statementExecuteAfter(statement,sql,result);

        After(statement,sql);




    }

    @Override
    protected void statementExecuteBatchBefore(StatementProxy statement) {

        super.statementExecuteBatchBefore(statement);

        Before(statement , null);



    }

    @Override
    protected void statementExecuteBatchAfter(StatementProxy statement, int[] result) {

        super.statementExecuteBatchAfter(statement , result);
        After(statement , null);
    }

    @Override
    protected void statement_executeErrorAfter(StatementProxy statement, String sql, Throwable error) {

        super.statement_executeErrorAfter(statement , sql , error);
        ErrorAfter(statement , sql , error);

    }


    protected void Before(StatementProxy statement, String sql) {

        try {
            Span span = ThreadLocalSpan.CURRENT_TRACER.next();
            if (span == null || span.isNoop()) {
                return;
            }

            if(sql == null){
                sql = statement.getLastExecuteSql();
            }
            // Allow span names of single-word statements like COMMIT

            int spaceIndex = sql.indexOf(' ');
            span.kind(Span.Kind.CLIENT).name(spaceIndex == -1 ? sql : sql.substring(0, spaceIndex));
            span.tag("sql.query", sql);

            parseServerIpAndPort(statement,span);
            span.start();
        }catch (Exception e){

        }

    }

    protected void After(StatementProxy statement, String sql) {

        try {
            Span span = ThreadLocalSpan.CURRENT_TRACER.remove();
            if (span == null || span.isNoop()) {
                return;
            }
            span.finish();
            return ;
        }catch (Exception e){

        }

    }

    protected void ErrorAfter(StatementProxy statement, String sql, Throwable error) {

        try {
            Span span = ThreadLocalSpan.CURRENT_TRACER.remove();
            if (span == null || span.isNoop()) {
                return;
            }

            if (error instanceof SQLException) {
                span.tag("error", Integer.toString(((SQLException) error).getErrorCode()));
            }
            span.finish();

            return ;
        }catch (Exception e){

        }

    }


    public void parseServerIpAndPort(StatementProxy statement, Span span) {
        try {
            URI url = URI.create(statement.getConnection().getMetaData().getURL().substring(5));
            if (getZipkinServiceName() == null || "".equals(getZipkinServiceName())) {
                try {
                    zipkinServiceName = "DB"+url.getPath();
                }catch (Exception e){
                    ;
                }
            }
            span.remoteServiceName(getZipkinServiceName());
            String host = url.getHost();
            if (host != null) {
                span.remoteIpAndPort(host, url.getPort());
            }
        } catch (Exception e) {
            // remote address is optional
        }
    }

    public String getZipkinServiceName() {
        return zipkinServiceName;
    }

    public void setZipkinServiceName(String zipkinServiceName) {
        this.zipkinServiceName = zipkinServiceName;
    }
}      

使用配置示例:

<bean id="tracingStatementFilter" class="brave.druid.TracingStatementFilter">
    <property name="zipkinServiceName" value="dbServer" /></bean>

    <bean id="dataSource"  class="com.alibaba.druid.pool.DruidDataSource"
       init-method="init" destroy-method="close">
      <property name="url" value="jdbc:derby:memory:spring-test;create=true" />
      <property name="initialSize" value="1" />
      <property name="maxActive" value="20" />
      <property name="proxyFilters">
          <list>
              <ref bean="tracingStatementFilter" />
          </list>
      </property>
    </bean>