laitimes

Zhihu Druid cluster optimization practice

author:Flash Gene

background

With the development of the business, the scale of the Druid cluster continues to grow; One beautiful evening, I turned on the computer and was about to watch a ball game, and suddenly, I was hit by a rush of alarm calls, breaking the silence, and the cluster query failure rate soared. . . At this time, you quickly open the cluster monitoring dashboard and find that the cluster load has been run away. For students in charge of the cluster, it should be a problem that everyone will encounter when the cluster develops to the early and middle stages.

This article mainly focuses on the problems encountered by Zhihu in the construction and development of the Druid cluster and how to improve stability. By reading the article, you can harvest the governance process of the cluster from rapid development to unstable and then stable operation, and every pit you have stepped on, I hope it will be helpful to you.

First of all, let's introduce the application of Druid in Zhihu, as of now, the status quo of Druid cluster in Zhihu has 70+ nodes; 300+ data sources; Storage size 600TB+.

Zhihu Druid cluster optimization practice

As shown in the figure above, Druid in Zhihu is mainly aimed at two scenarios: one is real-time access, which meets the needs of real-time indicator display and real-time analysis; The other is to import Hive data offline to accelerate Hive data query and analysis and improve analyst work efficiency. The main types of services used are A/B testing, channel management, APM, and data mail.

Introduction to Druid

This summary is mainly for Druid beginners, if you already know the Druid architecture well, you can skip this chapter by reading the next chapter.

Zhihu Druid cluster optimization practice

The overall architecture of Druid is shown in the diagram above, and there are 3 main routes:

  1. 实时摄入的过程: 实时数据会首先按行摄入 Real-time Nodes,Real-time Nodes 会先将每行的数据加入到1个 map 中,等达到一定的行数或者大小限制时,Real-time Nodes 就会将内存中的 map 持久化到磁盘中,Real-time Nodes 会按照segmentGranularity 将一定时间段内的小文件 merge 为一个大文件,生成 Segment,然后将 Segment 上传到 Deep Storage(HDFS,S3)中,Coordinator 知道有Segment 生成后,会通知相应的 Historical Node 下载对应的 Segment,并负责该Segment 的查询。
  2. The process of offline ingestion: The process of offline ingestion is relatively simple, that is, the segment is generated directly through the MR job, and the rest of the logic is the same as that of real-time ingestion.
  3. User query process: The user's query is sent directly to the Broker Node, which distributes the query to the Real-time node and the Historical node, and then merges the results and returns them to the user.

The main responsibilities of each node are as follows:

Historical Nodes

Historical 节点是整个 Druid 集群的骨干,主要负责加载不可变的 Segment,并负责 Segment的查询(注意,Segment 必须加载到 Historical 的内存中才可以提供查询)。 Historical 节点是无状态的,所以可以轻易的横向扩展和快速恢复。 Historical 节点 load 和 drop Segment 是依赖 ZK 的,但是即使 ZK 挂掉,Historical 依然可以对已经加载的 Segment 提供查询,只是不能再 load 新Segment,drop 旧 Segment。

Broker Nodes

The broker node is the entry point of Druid queries, and is mainly responsible for the distribution and merge of queries. In addition, the broker also performs LRU caching of query results for immutable segments.

Coordinator Nodes

Coordinator 节点主要负责 Segment 的管理。 Coordinator 节点会通知 Historical 节点加载新Segment,删除旧 Segment,复制 Segment,以及 Historical 间的负载均衡。

Coordinator 节点依赖 ZK 确定 Historical 的存活和集群 Segment 的分布。

Real-time Node

Real-time nodes are mainly responsible for real-time data ingestion, real-time data query, converting real-time data into segments, and assigning segments to historical nodes.

Zookeeper

Druid relies on ZK for service discovery, data topology awareness, and coordinator selection.

Metadata Storage

Metadata storage(Mysql) 主要用来存储 Segment 和配置的元数据。 当有新 Segment 生成时,就会将Segment的元信息写入 metadata store, Coordinator 节点会监控 Metadata store 从而知道何时 load 新 Segment,何时 drop 旧 Segment。 注意,查询时不会涉及 Metadata store。

Deep Storage

Deep storage (S3 and HDFS) 是作为 Segment 的永久备份,查询时同样不会涉及 Deep storage。

Zhihu Druid platform architecture evolution

Platform architecture V1.0

Zhihu Druid cluster optimization practice

The early Druid cluster architecture is shown in the figure above, which functionally supports real-time data ingestion and offline batch import:

  1. Real-time data ingestion: Based on Traquility, Kafka data is consumed, Druid is ingested in real time, and real-time query services are provided
  2. Offline data import: Based on the internal offline scheduling platform, data warehouse engineers can create offline scheduled import tasks
  3. Data storage: All data of the data source is stored in a large cluster, and the historical nodes are SSD disks, and the data sheet is backed up
  4. Monitoring system:

With the increasing number of services, the following pain points have emerged in the current technical solutions:

  1. Service query timeout: With the increasing demand for services, the concurrency of query services continues to increase, and high concurrency occurs, queries time out, and services interact with each other
  2. Poor data availability: Due to this architecture, the segments of the data source are not backed up on the historical node, and any historical node is down, and the data in the entire cluster cannot be checked.
  3. Real-time data loss: Real-time data ingestion based on Traquility that is lost beyond a time window
  4. Increased storage costs: In order to improve query performance, data is stored on SSD disks in the early stage of the business

Due to the focus on business expansion in the early stage, after the development reaches a certain stage, there are pain points caused by unreasonable cluster construction; Based on the root cause analysis of the above pain points, Druid Platform V2.0 came into being.

Platform architecture V2.0

Zhihu Druid cluster optimization practice

As shown in the figure above, the platform architecture V2.0 has made the following improvements compared with the earlier clusters:

  1. Real-time data ingestion: The Kafka Indexing Service solution solves the problem of data loss beyond the time window and implements exactly-once ingestion semantics
  2. Service isolation, hot and cold data tiering:

What is the basis for hot and cold stratification? Before determining the rules, the business query scenarios and query history were analyzed, and the data query in the last month accounted for about 97%. The proportion of data queries outside the month is relatively low, and the usage characteristics are used by users to query data at the end of the month or quarter

Zhihu Druid cluster optimization practice

As shown in the preceding figure, the core business and other services are stored in isolation, and hot and cold tiering is carried out internally.

  • Adopt a unified double copy
  • Data storage within one month: One copy of Hot Tier storage; Cold Tier stores one copy
  • One year of data: Cold Tier stores two copies

Earnings:

  • Dual copies of data: Improves data availability and prevents data from being unavailable due to the downtime of a HIstorical node
  • Service isolation: The query response speed is improved and the interaction between service queries is reduced
  • Hot and cold data tiering: Reduced data storage costs, with approximately 40% lower cluster costs

3. Monitoring System:

The main goal is to monitor the data loading and query behavior, so that problems can be found before, during, and after problems. As problems continue to emerge, the monitoring panel continues to iterate, and so far, the existing monitoring panel has been able to find and judge problems in a timely manner, and the core monitoring points are as follows: Coordinator scheduling monitoring

Zhihu Druid cluster optimization practice

Function: After the data source is loaded offline, the data cannot be viewed and queried for a long time. When a problem occurs, this metric is not output for a long time.

指标: 此监控指标代表着每次 Coordinator 对数据源 Segment 管理调度周期以及 Assign 到 Historical 的 SegmentRouter 查询并发监控

Zhihu Druid cluster optimization practice

Function: Set the concurrent query threshold alarm to detect high concurrency in time to prevent continuous high concurrency from affecting the cluster query response

Real-time monitoring of query time

Zhihu Druid cluster optimization practice

Function: The CPU load of the cluster continues to soar, affecting the query response of the cluster. Quickly locate which data source is caused by the query

In fact, at the beginning of the construction of the monitoring system, generally in accordance with the official documents, the basic core monitoring indicators will be displayed, due to the limited space, there is no detailed introduction here, but with the operation of the system, the problems are found to continue to iterate out of the indicators, they have typical characteristics:

a. Early warning before problems, such as concurrent query monitoring, can detect problems earlier than the business, intervene in advance, and reduce the impact on the business;

b. Stop losses in time in the problem, such as real-time CPU time monitoring for queries, and locate data sources in time when the cluster load continues to soar and affect business queries, and manually intervene to deal with them.

4. Cluster Management System:

The main functions of this system are to manage Druid clusters, record node status, display import task records, search query logs, detect query behavior, etc., as shown in the following figure:

Zhihu Druid cluster optimization practice

Benefits: It greatly improves the efficiency of cluster management and provides a basis for problem decision-making

This summary mainly describes the continuous iteration process of the Druid platform, from V1.0 With the emergence of problems and pain points, after solution research and specific situation analysis, the V2.0 cluster construction plan was produced and implemented, mainly from the aspects of business isolation, hot and cold data, monitoring system building, and management system construction to optimize the cluster construction, so as to reduce the cluster cost, quickly find and solve the problem, and greatly improve the stability of the cluster.

Stability optimization

With the completion of the evolution of the cluster architecture, most of the problems have been solved, and the availability of the cluster has been improved to a level, but the problem has come again, that is, the stability of the cluster should be stable at more than 99.9%.

Load the task Pending

Problem Description:

First of all, the real-time loading task segment has been in the Hand Off stage and cannot release the load worker resources, causing the new real-time task to fail to start and be in the Pending state. Second, for offline loading tasks, the task status is displayed as successful, but the data latency query is not visible

Problem Analysis:

Let's take a look at the entire flow of events from Assign to Historical after the Segment is generated, as shown in the following figure:

Zhihu Druid cluster optimization practice

As shown in the figure above, the key processes are described as follows:

  1. Worker 将生成的 Segment 文件写入持久化文件系统 HDFS 或 S3
  2. The worker writes the segment meta information to the database Mysql
  3. Coordinator 定期调度获取 Segment 元信息和 数据源 的规则Rule
  4. Coordinator 将需要加载或删除的 Segment 消息同步到 ZK
  5. Historical gets a message from ZK to load or remove a segment
  6. Historical pulls the segment file from the persistent file system
  7. Historical deletes the corresponding synchronization message from ZK

Coordinator 通过 ZK 与 Historical 进行 Segment Load 信息调度; 当 RealTime 实时加载任务所产生的所有 Segment 加载到 Historical 时才能释放Druid MiddleManager 上的 worker;

According to the online worker log, it has been determined that steps 1 and 2 have been completed. It is necessary to further analyze the Coordinator log and scheduling thread model to identify the key process problems:

The model responsible for segment management in the coordinator is a serial processing model, so the root cause of the problem is that a large number of segments (2W+ segments according to experience) are generated in a scheduling cycle, resulting in a long-term operation of this scheduling cycle, and the segments generated in this scheduling cycle cannot be assigned to historical, which ultimately leads to the failure to release the worker for real-time loading tasks. Causing a backlog of data.

Solution:

Standardize the amount of data imported by the business, reduce the number of segments, and import data for no more than 7 days each time.

Dirty query governance

Do you still remember the tranquility that was suddenly broken by the police at the beginning? After the adjustment of the Druid architecture, this kind of problem will not appear, but the standard has been further raised, and the availability of cluster queries should not be less than 99.9%, so it is unacceptable to continue to be below the SLA standard for 10 minutes.

First, describe the definition of dirty queries: some queries cause high cluster loads, causing normal queries to fail to respond, affecting the SLA of the cluster. For Druid, the so-called dirty queries mainly include: the query has no filter, and the data is fully scanned; JavaScripe regex matching queries; Retention of the next week or the next month, etc.

Problem Description:

Occasionally, the SLA of cluster queries is lower than 99%, causing some queries to time out

Problem Analysis:

Cluster query SLA reduction monitoring

Zhihu Druid cluster optimization practice

CPU Load 监控

Zhihu Druid cluster optimization practice

Real-time monitoring of CPU query time

Zhihu Druid cluster optimization practice

Locate the data source from this monitoring dashboard, and after query and analysis, the data source is retained and analyzed in the next month within nearly a month, which consumes computing resources.

Based on the monitoring, it can be concluded that the node resources are exhausted due to some dirty queries, resulting in a decrease in the SLA of cluster queries.

Solution:

Based on the query behavior detection in the cluster management system, the query behavior in recent months was analyzed at a fine-grained level, and it was found that 99.87% of the queries could be returned in seconds, and the common feature of dirty queries was that the results were returned in minutes. Therefore, a strategy for controlling dirty queries is formulated: the query timeout time is set uniformly at the broker level, and the timeout time for hot data is 1 min. The timeout period for cold data is 3 minutes.

Arrangement:

druid.server.http.maxIdleTime=PT1m
druid.server.http.defaultQueryTimeout=60000
druid.server.http.maxQueryTimeout=60000           

踩坑:如果线上的版本低于0.13.0 需要打 Patch(issues:) defaultQueryTimeout 参数才能生效。

修复 Historical 慢启动

Problem description

At present, the Cold Data Storage Historical (12*2T SATA HDD) node stores more than 50K segments, with a data scale of about 10TB+, and it will take about half an hour to restart whenever the node OOM goes down or the service is restarted. However, the consequence is data movement, affecting real-time loading tasks, and even data loss.

Problem analysis

According to the problem analysis and solution of (issue:), through log and source code analysis, the Historical initialization process is found to read the meta information of each column in the segment in the data source, and the segment file format is shown in the following figure:

Zhihu Druid cluster optimization practice

As shown in the figure above, it is the ColumnDescriptor that needs to go to disk to read each column; It can be understood that when the data node stores a large amount of data and stores it in multiple columns, a lot of random I/O operations will be generated. The HDD itself has about 100 maximum IOPS; As a result, a lot of time is spent on random IO operations.

With the existing amount of data, a restart takes more than 30 minutes, which actually leaves a relatively large hidden danger to the stability of the system.

solution

The build introduces a PR, because the community is based on this problem solved by 0.13+ or above, so the introduction of this PR to the build mainly solves the problem of interface incompatibility; And introduced to version 0.12.1, you can pick up patches that need this version.

The core idea of the solution is the typical Lazy loading pattern, as shown in the following code:

Map<String, Column> columns = new HashMap<>(); ===》Map<String, Supplier<Column>> columns = new HashMap<>();           

The Lazy loading mode is implemented through the Supplier feature in the Guava library, and<Column> the Supplier decorated Column object is not really initialized until the first get

Configuration:

druid.segmentCache.numBootstrapThreads=10
druid.segmentCache.lazyLoadOnStart=true           

Benefits: The restart time is reduced from 30 minutes to less than 3 minutes

This summary mainly describes the work done to achieve the goal of 99.9% cluster availability, and analyzes the causes of data loading pending problems, query stability problems, and data movement problems caused by slow starts.

Druid is recommended for use

After the journey of stepping on the pit, we have precipitated suggestions for the use of offline data import and query

Druid Data Import:

  1. Druid itself supports two modes: real-time import query and batch import, which currently relies on Tranquality service management, and batch import uses Hadoop MapReduce to import Hive table data. Real-time import can support real-time query, and batch import can only check the data when the import is completed.
  2. Druid supports pre-aggregation of imported data at a time granularity to reduce the amount of data stored and improve query performance.
  3. Druid segment 的 shard 大小应该控制在 500 MB 以下,防止单个 shard 过大导致 OOM。
  4. For bulk imports, the number of segments of data imported at a time should not exceed 7 days

Druid data query

  1. 查询主要分为维度查询(topN/groupby)和非维度查询 (timeseries)
  2. In dimension queries, avoid groupby querying multiple high-cardinality dimensions (count(distinct(dimension column))), which will cause memory explosion during the query, try not to import high-cardinality dimensions into Druid, if you can't avoid it, you can only use topN to query a single dimension
  3. For queries with large time ranges, you can split them into multiple queries with small time ranges and submit them in parallel to make full use of Druid's multi-node parallel query capabilities
  4. By default, the data needs to be configured with multiple replicas, the default is 2 replicas to ensure high data availability, and the data needs to distinguish between hot and cold data in the last month, cold data (stored on SSDs) one month ago, and cold standby data (stored in HDFS and cannot be queried)

summary

First of all, this article mainly focuses on the application process of Druid cluster in Zhihu, and describes the introduction of Druid and the application scenarios in Zhihu. In the process of landing, it has experienced the process of rapid development to stability pain point challenge; Finally, through the architecture iteration from platform architecture V1.0 to platform architecture V2.0, the pain points encountered in development were solved.

Secondly, in the stability optimization process introduced in this paper, the effective monitoring of the platform can play an efficient role in analyzing the problem. Once the problem is clear, actively participate in community discussions, which can improve the efficiency of problem solving.

Finally, I hope that readers can harvest the problems and solutions encountered in the whole construction process, which will be helpful for daily work!

Author: Jacky2020

Source: https://zhuanlan.zhihu.com/p/67607200