天天看點

應用監控CAT之cat-client源碼閱讀(一)

  CAT 由大衆點評開發的,基于 Java 的實時應用監控平台,包括實時應用監控,業務監控。對于及時發現線上問題非常有用。(不知道大家有沒有在用)

  應用自然是最初級的,用完之後,還想了解下其背後的原理,是以有了源碼閱讀一說。

  今天來看看 cat-client 子產品,重在調用方。

打開檔案,首先看一下使用說明,背景,資料。ok,進入正題。

先大緻看一下目錄結構:

應用監控CAT之cat-client源碼閱讀(一)
接下來,從樣例開始着手,在這裡從單元測試開始幹活。

public class CatTest {

    @Test
    public void test() {
        Transaction trans = Cat.newTransaction("logTransaction", "logTransaction");
        Cat.newEvent("logEvent", "logEvent");
        Cat.newTrace("logTrace", "logTrace");
        Cat.newHeartbeat("logHeartbeat", "logHeartbeat");
        Throwable cause = new Throwable();
        Cat.logError(cause);
        Cat.logError("message", cause);
        Cat.logTrace("logTrace", "<trace>");
        Cat.logTrace("logTrace", "<trace>", Trace.SUCCESS, "data");
        Cat.logMetric("logMetric", "test", "test");
        Cat.logMetricForCount("logMetricForCount");
        Cat.logMetricForCount("logMetricForCount", 4);
        Cat.logMetricForDuration("logMetricForDuration", 100);
        Cat.logMetricForSum("logMetricForSum", 100);
        Cat.logMetricForSum("logMetricForSum", 100, 100);
        Cat.logEvent("RemoteLink", "Call", Message.SUCCESS, "Cat-0a010680-384736-2061");
        Cat.logEvent("EventType", "EventName");
        Cat.logHeartbeat("logHeartbeat", "logHeartbeat", Message.SUCCESS, null);
        trans.setStatus(Transaction.SUCCESS);
//        trans.setStatus(cause);
        trans.complete();

        Assert.assertEquals(true, Cat.isInitialized());
    }
}      

看得出來,cat把其主要功能都列舉在了這個單元測試裡。大概功能就是,記錄event,trace,error,metrics.

不過,咱們隻讨論下其中個别類型的處理就O了。

先來看第一個建立事務的方法:

Cat.newTransaction("logTransaction", "logTransaction");
// 進入方法檢視,1. 先擷取生産者; 2. 建立一個事務
    public static Transaction newTransaction(String type, String name) {
        return Cat.getProducer().newTransaction(type, name);
    }
// 檢視擷取生産者的方法,檢查是否已初始化,如果沒有初始化則進行初始化,深度咱們就先到這裡
    public static MessageProducer getProducer() {
        checkAndInitialize();

        return s_instance.m_producer;
    }
// 2. 建立一個事務,1.先擷取上下文如果沒有則建立; 2. 如果可以記錄消息,則立馬建立一個預設事務DefaultTransaction; 3. 開啟執行,傳回事務執行個體,供下文調用;
    @Override
    public Transaction newTransaction(String type, String name) {
        // this enable CAT client logging cat message without explicit setup
        if (!m_manager.hasContext()) {
            m_manager.setup();
        }

        if (m_manager.isMessageEnabled()) {
            DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);

            m_manager.start(transaction, false);
            return transaction;
        } else {
            return NullMessage.TRANSACTION;
        }
    }
// 2.1. 如何擷取目前上下文,
    @Override
    public void setup() {
        Context ctx;

        if (m_domain != null) {
            ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
        } else {
            ctx = new Context("Unknown", m_hostName, "");
        }

        m_context.set(ctx);
    }
// 2.2. 檢查是否已初始化上下文
    @Override
    public boolean hasContext() {
        return m_context.get() != null;
    }
// 2.3. 上下文怎麼保證線程安全,使用 ThreadLocal 線程變量
    private ThreadLocal<Context> m_context = new ThreadLocal<Context>(); 
// 2.4. 開啟一個事務,1. 擷取上下文; 2. 開啟上下文事務; 3. 如果是tag類型的事務,則将其放入 m_taggedTransactions; 配置有誤,隻提示一次警告
    @Override
    public void start(Transaction transaction, boolean forked) {
        Context ctx = getContext();

        if (ctx != null) {
            ctx.start(transaction, forked);

            if (transaction instanceof TaggedTransaction) {
                TaggedTransaction tt = (TaggedTransaction) transaction;

                m_taggedTransactions.put(tt.getTag(), tt);
            }
        } else if (m_firstMessage) {
            m_firstMessage = false;
            m_logger.warn("CAT client is not enabled because it's not initialized yet");
        }
    }
// 2.4.1. 擷取上下文
    private Context getContext() {
        if (Cat.isInitialized()) {
            Context ctx = m_context.get();

            if (ctx != null) {
                return ctx;
            } else {
                if (m_domain != null) {
                    ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
                } else {
                    ctx = new Context("Unknown", m_hostName, "");
                }

                m_context.set(ctx);
                return ctx;
            }
        }

        return null;
    }
// 2.4.2. 開啟事務,1. 如果stack為空就把事務設定到m_tree上,否則處理子節點; 2. 把事務壓入棧中;
        public void start(Transaction transaction, boolean forked) {
            if (!m_stack.isEmpty()) {
                // Do NOT make strong reference from parent transaction to forked transaction.
                // Instead, we create a "soft" reference to forked transaction later, via linkAsRunAway()
                // By doing so, there is no need for synchronization between parent and child threads.
                // Both threads can complete() anytime despite the other thread.
                if (!(transaction instanceof ForkedTransaction)) {
                    Transaction parent = m_stack.peek();
                    addTransactionChild(transaction, parent);
                }
            } else {
                m_tree.setMessage(transaction);
            }

            if (!forked) {
                m_stack.push(transaction);
            }
        }
// 2.4.3. 上下文結構
        public Context(String domain, String hostName, String ipAddress) {
            m_tree = new DefaultMessageTree();        // 建立一個消息樹
            m_stack = new Stack<Transaction>();        // 存放棧資訊

            Thread thread = Thread.currentThread();
            String groupName = thread.getThreadGroup().getName();

            m_tree.setThreadGroupName(groupName);
            m_tree.setThreadId(String.valueOf(thread.getId()));
            m_tree.setThreadName(thread.getName());

            m_tree.setDomain(domain);
            m_tree.setHostName(hostName);
            m_tree.setIpAddress(ipAddress);
            m_length = 1;
            m_knownExceptions = new HashSet<Throwable>();
        }
        
// DefaultModuleInitializer

   @Override
   public void execute(ModuleContext ctx, Module... modules) {
      Set<Module> all = new LinkedHashSet<Module>();

      info(ctx, "Initializing top level modules:");

      for (Module module : modules) {
         info(ctx, "   " + module.getClass().getName());
      }

      try {
         expandAll(ctx, modules, all);

         for (Module module : all) {
            if (!module.isInitialized()) {
               executeModule(ctx, module, m_index++);
            }
         }
      } catch (Exception e) {
         throw new RuntimeException("Error when initializing modules! Exception: " + e, e);
      }
   }
// 調用executeModule方法,初始化資料
   private synchronized void executeModule(ModuleContext ctx, Module module, int index) throws Exception {
      long start = System.currentTimeMillis();

      // set flat to avoid re-entrance
      module.setInitialized(true);

      info(ctx, index + " ------ " + module.getClass().getName());

      // execute itself after its dependencies
      module.initialize(ctx);

      long end = System.currentTimeMillis();
      info(ctx, index + " ------ " + module.getClass().getName() + " DONE in " + (end - start) + " ms.");
   }
   
 // cat初始化
 
    // this should be called during application initialization time
    public static void initialize(File configFile) {
        PlexusContainer container = ContainerLoader.getDefaultContainer();

        initialize(container, configFile);
    }
    public static void initialize(PlexusContainer container, File configFile) {
        ModuleContext ctx = new DefaultModuleContext(container);
        // 該方法會去 components.xml中查找 org.unidal.initialization.Module 的實作類,
        Module module = ctx.lookup(Module.class, CatClientModule.ID);

        if (!module.isInitialized()) {
            ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);

            ctx.setAttribute("cat-client-config-file", configFile);
            initializer.execute(ctx, module);
        }
    }      

// components.xml 中配置的 Module, 加載入 CatClientModule 

<component>
            <role>org.unidal.initialization.Module</role>
            <role-hint>cat-client</role-hint>
            <implementation>com.dianping.cat.CatClientModule</implementation>
        </component>      

// plexus.xml 中 配置日志輸出

<plexus>
    <components>
        <component>
            <role>org.codehaus.plexus.logging.LoggerManager</role>
            <implementation>org.unidal.lookup.logger.TimedConsoleLoggerManager</implementation>
            <configuration>
                <dateFormat>MM-dd HH:mm:ss.SSS</dateFormat>
                <showClass>true</showClass>
                <logFilePattern>cat_{0,date,yyyyMMdd}.log</logFilePattern>
                <baseDirRef>CAT_HOME</baseDirRef>
                <defaultBaseDir>/data/applogs/cat</defaultBaseDir>
            </configuration>
        </component>
    </components>
</plexus>      

// logEvent 舉個例子,event處理過程

Cat.logEvent("RemoteLink", "Call", Message.SUCCESS, "Cat-0a010680-384736-2061");
// 進入方法
    public static void logEvent(String type, String name, String status, String nameValuePairs) {
        Cat.getProducer().logEvent(type, name, status, nameValuePairs);
    }
    // DefaultMessageProducer, logEvent
    @Override
    public void logEvent(String type, String name, String status, String nameValuePairs) {
        Event event = newEvent(type, name);

        if (nameValuePairs != null && nameValuePairs.length() > 0) {
            event.addData(nameValuePairs);
        }

        event.setStatus(status);
        event.complete();
    }
    // DefaultEvent, complete 方法
    @Override
    public void complete() {
        setCompleted(true);

        if (m_manager != null) {
            m_manager.add(this);
        }
    }
    // DefaultMessageManager, add方法,添加到上下文中
    @Override
    public void add(Message message) {
        Context ctx = getContext();

        if (ctx != null) {
            ctx.add(message);
        }
    }
    // DefaultMessageManager, 最終添加方法
    public void add(Message message) {
        if (m_stack.isEmpty()) {
            MessageTree tree = m_tree.copy();

            tree.setMessage(message);
            flush(tree);
        } else {
            Transaction parent = m_stack.peek();

            addTransactionChild(message, parent);
        }
    }
    
// DefaultMessageManager, 發送刷寫資料

    public void flush(MessageTree tree) {
        if (tree.getMessageId() == null) {
            tree.setMessageId(nextMessageId());
        }

        MessageSender sender = m_transportManager.getSender();

        if (sender != null && isMessageEnabled()) {
            sender.send(tree);

            reset();
        } else {
            m_throttleTimes++;

            if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
                m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
            }
        }
    }
    
// TcpSocketSender, 發送資料

    // 先插入 BlockingQueue<MessageTree> m_queue 阻塞隊列中,如果插入失敗,則進行日志隊列檢查
    @Override
    public void send(MessageTree tree) {
        if (isAtomicMessage(tree)) {
            boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);
            }
        } else {
            boolean result = m_queue.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);
            }
        }
    }
    // 日志隊列檢查
    private void logQueueFullInfo(MessageTree tree) {
        if (m_statistics != null) {
            m_statistics.onOverflowed(tree);
        }

        int count = m_errors.incrementAndGet();

        if (count % 1000 == 0 || count == 1) {
            m_logger.error("Message queue is full in tcp socket sender! Count: " + count);
        }

        tree = null;
    }

    // 如果隊列不為空,則插入到上一節點之後
    private void addTransactionChild(Message message, Transaction transaction) {
        long treePeriod = trimToHour(m_tree.getMessage().getTimestamp());
        long messagePeriod = trimToHour(message.getTimestamp() - 10 * 1000L); // 10 seconds extra time allowed

        if (treePeriod < messagePeriod || m_length >= m_configManager.getMaxMessageLength()) {
            m_validator.truncateAndFlush(this, message.getTimestamp());
        }

        transaction.addChild(message);
        m_length++;
    }
    // DefaultTransaction, addChild, 添加子節點,完成添加操作
    @Override
    public DefaultTransaction addChild(Message message) {
        if (m_children == null) {
            m_children = new ArrayList<Message>();
        }

        if (message != null) {
            m_children.add(message);
        } else {
            Cat.logError(new Exception("null child message"));
        }
        return this;
    }
          

// Transaction 的 complete 實作,最終的送出

trans.complete();
// 進入方法,如果已經結束,則認為是異常情況
    @Override
    public void complete() {
        try {
            if (isCompleted()) {
                // complete() was called more than once
                DefaultEvent event = new DefaultEvent("cat", "BadInstrument");

                event.setStatus("TransactionAlreadyCompleted");
                event.complete();
                addChild(event);
            } else {
                m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L;

                setCompleted(true);        // 防止下次再進入

                if (m_manager != null) {
                    m_manager.end(this);
                }
            }
        } catch (Exception e) {
            // ignore
        }
    }
    // DefaultMessageManager, end 方法
    @Override
    public void end(Transaction transaction) {
        Context ctx = getContext();

        if (ctx != null && transaction.isStandalone()) {
            if (ctx.end(this, transaction)) {
                m_context.remove();
            }
        }
    }
    // DefaultMessageManager, end transaction 進行校驗
    public boolean end(DefaultMessageManager manager, Transaction transaction) {
        if (!m_stack.isEmpty()) {
            Transaction current = m_stack.pop();

            if (transaction == current) {
                m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);
            } else {
                while (transaction != current && !m_stack.empty()) {
                    m_validator.validate(m_stack.peek(), current);

                    current = m_stack.pop();
                }
            }

            if (m_stack.isEmpty()) {
                MessageTree tree = m_tree.copy();

                m_tree.setMessageId(null);
                m_tree.setMessage(null);

                if (m_totalDurationInMicros > 0) {
                    adjustForTruncatedTransaction((Transaction) tree.getMessage());
                }

                manager.flush(tree);
                return true;
            }
        }

        return false;
    }
    // 驗證事務的正确性,對嵌套的 transaction 進行驗證
    public void validate(Transaction parent, Transaction transaction) {
        if (transaction.isStandalone()) {
            List<Message> children = transaction.getChildren();
            int len = children.size();

            for (int i = 0; i < len; i++) {
                Message message = children.get(i);

                if (message instanceof Transaction) {
                    validate(transaction, (Transaction) message);
                }
            }

            if (!transaction.isCompleted() && transaction instanceof DefaultTransaction) {
                // missing transaction end, log a BadInstrument event so that
                // developer can fix the code
                markAsNotCompleted((DefaultTransaction) transaction);
            }
        } else if (!transaction.isCompleted()) {
            if (transaction instanceof DefaultForkedTransaction) {
                // link it as run away message since the forked transaction is not completed yet
                linkAsRunAway((DefaultForkedTransaction) transaction);
            } else if (transaction instanceof DefaultTaggedTransaction) {
                // link it as run away message since the forked transaction is not completed yet
                markAsRunAway(parent, (DefaultTaggedTransaction) transaction);
            }
        }
    }
    // 适應事務時間段
    private void adjustForTruncatedTransaction(Transaction root) {
        DefaultEvent next = new DefaultEvent("TruncatedTransaction", "TotalDuration");
        long actualDurationInMicros = m_totalDurationInMicros + root.getDurationInMicros();

        next.addData(String.valueOf(actualDurationInMicros));
        next.setStatus(Message.SUCCESS);
        root.addChild(next);

        m_totalDurationInMicros = 0;
    }
// 發送最後的資料
    public void flush(MessageTree tree) {
        if (tree.getMessageId() == null) {
            tree.setMessageId(nextMessageId());
        }

        MessageSender sender = m_transportManager.getSender();

        if (sender != null && isMessageEnabled()) {
            sender.send(tree);

            reset();
        } else {
            m_throttleTimes++;

            if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
                m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
            }
        }
    }
    // 可以記錄的前提是,所有條件均滿足
    @Override
    public boolean isMessageEnabled() {
        return m_domain != null && m_domain.isEnabled() && m_context.get() != null && m_configManager.isCatEnabled();
    }
// 發送messageTree到 LinkedBlockingQueue<MessageTree> m_tree
    @Override
    public void send(MessageTree tree) {
        if (isAtomicMessage(tree)) {
            boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);
            }
        } else {
            boolean result = m_queue.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);
            }
        }
    }
// 發送資料完成後,需要将原來的資料清空還原,以便下次可用
    @Override
    public void reset() {
        // destroy current thread local data
        Context ctx = m_context.get();

        if (ctx != null) {
            if (ctx.m_totalDurationInMicros == 0) {
                ctx.m_stack.clear();
                ctx.m_knownExceptions.clear();
                m_context.remove();
            } else {
                ctx.m_knownExceptions.clear();
            }
        }
    }
    // 上下文的移除,其他連結清單結構各自移除    
     public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
     }
    // 為保證上下文絕對移除,再次操作
    @Override
    public void end(Transaction transaction) {
        Context ctx = getContext();

        if (ctx != null && transaction.isStandalone()) {
            if (ctx.end(this, transaction)) {
                m_context.remove();
            }
        }
    }      

// 寫入隊列後,由 TcpSocketSender 線程進行輪詢發送到cat背景

@Override
    public void run() {
        m_active = true;

        while (m_active) {
            ChannelFuture channel = m_manager.channel();

            if (channel != null && checkWritable(channel)) {
                try {
                    MessageTree tree = m_queue.poll();

                    if (tree != null) {
                        sendInternal(tree);
                        tree.setMessage(null);
                    }

                } catch (Throwable t) {
                    m_logger.error("Error when sending message over TCP socket!", t);
                }
            } else {
                long current = System.currentTimeMillis();
                long oldTimestamp = current - HOUR;

                while (true) {
                    try {
                        MessageTree tree = m_queue.peek();

                        if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {
                            MessageTree discradTree = m_queue.poll();

                            if (discradTree != null) {
                                m_statistics.onOverflowed(discradTree);
                            }
                        } else {
                            break;
                        }
                    } catch (Exception e) {
                        m_logger.error(e.getMessage(), e);
                        break;
                    }
                }
                
                try {
                    Thread.sleep(5);
                } catch (Exception e) {
                    // ignore it
                    m_active = false;
                }
            }
        }
    }      

  如此,整個cat埋點的過程就搞定了。關鍵技術就是:

    1. ThreadLocal 用于儲存上下文埋點,保證線程安全。

    2.  LinkedBlockingQueue 用于儲存消息樹,作為生産線程與消費線的溝通橋梁!

    3. AtomicInteger 用于計數,保證準确性。

    4. 心跳線和用于發送本機的狀态到cat背景。

    5. 懶加載,單例模式的使用。

  等等,來個圖: 

應用監控CAT之cat-client源碼閱讀(一)

不要害怕今日的苦,你要相信明天,更苦!

上一篇: JSON