laitimes

Leverage JuiceFS to implement dynamic mirroring of Flink

author:Flash Gene

1 Background

Due to its reliability and ease of use, Flink has become one of the most popular stream processing frameworks and dominates the field of stream computing. Zhihu introduced Flink as early as 18 years, and now Flink has become one of the most important components within Zhihu, accumulating more than 4,000 Flink real-time tasks and processing petabytes of data every day.

There are many ways to deploy Flink, which can be roughly divided into standalone, flink on yarn, flink on kubernetes, etc. At present, the deployment method used internally by Zhihu is native kubernetes, which is officially provided by Flink. When it comes to kubernetes, we have to talk about container images, because there are various dependencies on Flink tasks, and how to image Flink is also a headache.

2 Flink Mirroring and Dependency Processing

Flink tasks can be roughly divided into two categories, the first type is Flink SQL tasks, and the dependencies of Flink SQL tasks are roughly as follows:

  1. 官方的 connector jar 包, 如 flink-hive-connector, flink-jdbc-connector, flink-kafka-connector 等;
  2. Unofficial or internally implemented connector jar packages;
  3. The user's UDF jar package, some complex computational logic, the user may implement the UDF by himself.

The second type of Flink task is Flink's jar package task, which depends on the Flink jar package written by the user in addition to the above three dependencies.

Obviously, for each Flink task, its dependencies are different, and it is not possible to have a separate image for each Flink task, so we currently handle it as follows:

  1. Dependencies are classified into stable and unstable dependencies.
  2. Stable dependencies include components (such as Flink, JDK, etc.) and official connector packages, which are very stable and will only be changed in the case of Flink version upgrades and bug fixes, so we will put these dependencies into the image when building the image;
  3. Unstable dependencies include third-party connectors and the user's own jar packages. Because the third-party connector is not officially maintained by Flink, there is a relatively higher probability that problems need to be fixed. The user's own jar is different for each task, and the user will often change and resubmit. For such unstable dependencies, we inject them dynamically by storing the dependencies in a distributed file system and downloading them into the container using the pre command when the container starts.

After the above processing, the Flink image has the ability to dynamically load dependencies, and the startup process of a Flink job is roughly as follows:

Leverage JuiceFS to implement dynamic mirroring of Flink

3 File system selection

3.1 Pain points of HDFS storage dependencies

We have always used HDFS to store the file system that Flink depends on, but we encountered the following pain points in the process of using it:

  1. The pressure of the Namenode is too high during the peak period of the task, and the container will be stuck when requesting file metadata from the Namenode when downloading the dependency, and some small batch tasks, the task itself may only need to run for more than ten seconds, but because the Namenode is under too much pressure, it may take a few minutes to download the dependency;
  2. Currently, we deploy Flink clusters in multiple data centers, but HDFS only has a large offline data center cluster, which will pull files across data centers and consume leased line bandwidth.
  3. There are some special Flink tasks that don't depend on HDFS at all, in other words, they don't use checkpoints or read or write HDFS, but because the dependencies of Flink containers are stored on HDFS, these tasks still can't do without HDFS.

3.2 Pain points of using object storage

Later, we replaced HDFS with object storage, which solved some of the pain points of HDFS, but we soon discovered a new problem - the slow download speed of single-threaded object storage. In general, the following options are available for OBS download acceleration:

  1. Use multi-threaded download for segmented download, but the container's pre command is actually only suitable for executing some relatively simple shell commands.
  2. Add a proxy layer to the object storage for caching, and the acceleration is done by the agent, and the client can still read with a single thread. The disadvantage of this approach is that it requires the maintenance of an additional proxy component of the object storage system, and the stability of the component needs to be guaranteed.

3.3 Try JuiceFS

Coincidentally, the company is doing a POC for JuiceFS internally, there is an existing object storage proxy layer available, we have conducted a series of tests on it, and found that JuiceFS fully meets our needs in this scenario, which makes us more pleasantly surprised by the following points:

  1. JuiceFS comes with the s3 gateway, which is perfectly compatible with the s3 object storage protocol, which allows us to go online quickly without any changes, and the s3 gateway itself is stateless, which is very convenient for scaling;
  2. JuiceFS comes with a built-in cache acceleration function, and after testing, after using JuiceFS to proxy object storage, the speed of single-threaded file reading is 4 times faster.
  3. JuiceFS provides a way to mount the local file system, and then you can try to rely on the direct mount into the container directory;
  4. JuiceFS can choose to use the method of decoupling metadata and storage for storage, and we use the original object storage for storage, and the cloud vendor guarantees the availability of 11 9s; The reason why we chose TiKV is that our colleagues in the online architecture team have rich experience in the development and operation and maintenance of TiKV, and the SLA can be greatly guaranteed. In this way, the usability and scalability of JuiceFS are very strong.

3.4 JuiceFS is online

The onboarding process of JuiceFS is divided into the following stages:

  1. For data migration, we need to synchronize the data originally stored on HDFS and object storage to JuiceFS, because JuiceFS provides data synchronization tools, and the dependency on Flink is not particularly large, so we completed this part of the work quickly;
  2. Modify the address of the Flink image pull dependency, because JuiceFS is compatible with the object storage protocol, we only need to modify the original object storage endpoint to the address of the JuiceFS s3 gateway on the platform side.

After the launch of JuiceFS, the flowchart of our Flink task launch is roughly as follows:

Leverage JuiceFS to implement dynamic mirroring of Flink

Compared with using HDFS, we can get a predictable container startup time, and the speed of container download dependencies will not be affected by peak business hours; Compared to native object storage, container download dependencies are about 4x faster.

4 Outlook

It took less than half a month from the beginning of the research on JuiceFS to the launch of JuiceFS, mainly because the documentation of JuiceFS is very complete, which saved us a lot of detours, and secondly, the partners of the JuiceFS community also answered all questions, so our launch process was very smooth.

In the future, we will consider applying JuiceFS to data lake scenarios and algorithm model loading scenarios to make our data use more flexible and efficient.

Author: Hu Mengyu

Source: https://zhuanlan.zhihu.com/p/435402052