laitimes

Hive SQL tuning skills that big data practitioners must know

Hive SQL tuning skills that big data practitioners must know

Abstract:In the field of big data, Hive SQL is widely used in data query and analysis in data warehouses. However, due to the large amount of data and complex query requirements, the performance of Hive SQL queries is often unsatisfactory. This paper conducts an in-depth study on the performance optimization of Hive SQL, proposes a series of feasible tuning schemes, and gives the corresponding optimization cases and SQL code before and after optimization. Appropriate optimization strategies and techniques can significantly improve the execution efficiency and response speed of Hive SQL.

Keywords: Hive SQL; performance optimization; tuning schemes; Optimize the case

1. Introduction

With the advent of the era of big data, data analysis and mining are becoming more and more important. Hive plays an important role as a data warehouse tool in the Hadoop ecosystem. However, due to the large amount of data and the complexity of the query, Hive SQL queries tend to be less efficient to execute. Therefore, an in-depth understanding of Hive SQL tuning techniques is essential for data engineers and data analysts.

2. Start with self-reflection

In many cases, Hive SQL is slow because developers don't know enough about the data they use and some bad usage habits.

For example, for the sales detail transaction table, the computation and IO overhead of scanning a year's partition and scanning a week's partition are completely two orders of magnitude, and the time taken must be different. As developers, we need to carefully consider the needs of the business and try not to waste computing and storage resources.

For example, select coll, col2 from your_table , and where conditions should be added as much as possible to remove irrelevant data rows, so as to reduce the amount of data that needs to be processed and distributed in the entire MapReduce task.

Is the public aggregation layer developed by the data warehouse team already meet your needs? For general, manager-cockpit-related metrics and other usually well-designed data warehouse public layers must already be included, and they can be used directly.

Hive SQL tuning skills that big data practitioners must know

3 Query optimization

3.1 Try to be as atomic as possible

Try to avoid a SQL statement containing complex logic, and use intermediate tables to complete complex logic. It is recommended to reasonably split the operation to reduce the waste of resources and the impact of downstream timeliness when the operation is re-run due to problems.

3.2 Use appropriate data types

Choosing the right data type can reduce storage space and improve query efficiency. For example, converting a string type to an integer type can save storage space and speed up queries.

Optimize the case

Before:

SELECT * FROM table WHERE age = '30';            

Optimized:

SELECT * FROM table WHERE age = 30;           

3.3 Avoid full table scanning

To avoid scanning the entire table, you can use the WHERE clause to filter out the required data rows, or use the LIMIT clause to limit the number of returned results.

Negative case

All historical data is calculated by scanning the entire meter every day. The number of maps exceeded 200,000.

Select * from table where dt<=’{TX_DATE}’           

Optimization Case 1

Before:

-- If the filter condition of the secondary table before optimization is written after where, the full table will be associated first, and then the partition will be filtered.

select a.* from test1 a left join test2 b on a.uid = b.uid where a.ds='2020-08-10' and b.ds='2020-08-10'            

Optimized:

select a.* from test1 a left join test2 b on (b.uid is not null and a.uid = b.uid and b.ds='2020-08-10') where a.ds='2020-08-10'           

Optimization case 2

Use the max function to select the largest partition of the table to scan the entire table.

Before:

Select max(dt) from table           

Optimized:

使用自定义(show partition 或 hdfs dfs –ls )的方式替代max(dt)           

3.4 Using Partitions

Data partitioning is a technology that groups and stores data according to a field, which can effectively reduce the amount of data scanned during queries. You can use partition fields to filter data and query only the target partition, speeding up the query.

Optimize the case

Before:

SELECT * FROM table WHERE date = '2021-01-01' AND region = 'A';           

Optimized:

SELECT * FROM table WHERE partition_date = '2021-01-01' AND partition_region = 'A';           

Negative case

The code is written to the death date, and the log data is scanned for 2 years at one time. The number of maps is more than 200,000, and it will get bigger and bigger until it can't run out.

Select * from table where src_mark=’23’ and dt between ‘2020-05-16’ and ‘{TX_DATE}’ and scr_code is not null           

3.5 Using Indexes

In Hive SQL, you can create indexes to speed up query operations. By creating indexes on key fields, you can reduce the time spent scanning and filtering data and improve query performance.

Optimize the case

Before:

SELECT * FROM table WHERE region = 'A' AND status = 'ACTIVE';           

Optimized:

CREATE INDEX idx_region_status ON table (region, status);
SELECT * FROM table WHERE region = 'A' AND status = 'ACTIVE';           

3.6 Query Rewrite

Query rewriting is a technique that improves the performance of a query by changing the structure of a query statement or using an optimized query method. You can optimize the query by rewriting the subquery or using JOIN instead of the IN/EXISTS subquery.

Optimize the case

Before:

SELECT * FROM table1 WHERE id IN (SELECT id FROM table2 WHERE region = 'A');           

Optimized:

SELECT * FROM table1 t1 JOIN (SELECT id FROM table2 WHERE region = 'A') t2 ON t1.id = t2.id;           

3.7 Predicate push-down

Predicate pushdown is a technique that applies filter conditions to query plans as early as possible (i.e., the WHERE predicate logic in SQL statements is executed as far in advance as possible) to reduce the amount of data processed downstream. By pushing filter conditions to the data source, you can reduce the amount of query data and improve query performance.

Optimize the case

Before:

select a.*,b.* from a join b  on a.name=b.name where a.age>30           

Optimized:

SELECT a.*, b.* FROM ( SELECT * FROM a WHERE age > 30 ) a JOIN b ON a.name = b.name           

3.8 不要用COUNT DISTINCT

The COUNT DISTINCT operation needs to be completed with a Reduce Task, and the amount of data that needs to be processed by this Reduce is too large, which will make it difficult to complete the whole job.

Optimize the case

Before:

select count(distinct uid) from test where ds='2020-08-10' and uid is not null           

Optimized:

select count(a.uid) from (select uid from test where uid is not null and ds = '2020-08-10' group by uid) a           

3.9 使用with as

In addition to the shuffle generated by join, another thing that slows down the efficiency of hive query is subquery, which minimizes subqueries in SQL statements. with as extracts the subqueries used in the statement in advance (similar to temporary tables), so that all modules in the entire query can call the query results. Use with as to avoid Hive from repeating the same subquery in different parts.

Optimize the case

Before:

select a.* from test1 a left join test2 b on a.uid = b.uid where a.ds='2020-08-10' and b.ds='2020-08-10'           

Optimized:

with b as select uid from test2 where ds = '2020-08-10' and uid is not null select a.* from test1 a left join b on a.uid = b.uid where a.ds='2020-08-10' and a.uid is not null           

3.10 Large Tables Join Small Tables

When writing a query statement with a Join operation, there is an important rule to follow: a table or subquery with fewer records should be placed to the left of the Join operator. Doing so can help reduce the amount of data, improve query efficiency, and effectively reduce the probability of memory overflow errors.

If a MapJoin is not specified, or if the conditions for MapJoin are not met, the Hive parser will convert the Join operation to a Common Join. This means that the join operation will be completed in the Reduce phase, which may cause data skew. To avoid this, you can use MapJoin to fully load the small table into memory and perform the Join operation on the Map side, thus avoiding leaving the Join operation to the Reducer stage. This strategy effectively reduces the risk of data skew.

Optimize the case

--Set the automatic selection of Mapjoin

set hive.auto.convert.join = true; 默认为true           

--Threshold setting for large tables and small tables (by default, below 25M is considered to be small tables):

set hive.mapjoin.smalltable.filesize=25000000;           

3.11 Large TableJoin Large Table

3.11.1 空key过滤

Sometimes, the connection operation timed out because the amount of data corresponding to certain keys is too large. Data from the same key is sent to the same reducer, resulting in insufficient memory. In this case, we need to carefully analyze the keys of these exceptions. In general, the data corresponding to these keys may be exceptional, so we need to filter appropriately in the SQL statement.

3.11.2 Empty key conversion

If a key is empty, it is not an exception, even though the corresponding data is abundant. This data must be included in the result set when performing a join operation. To achieve this, consider assigning random values to the fields in Table A where the key is empty to ensure that the data can be evenly and randomly distributed across different reducers.

3.12 Avoid Cartesian products

If you use a where condition instead of adding a valid ON condition or using an invalid ON condition when you perform a join operation, you may encounter situations where the correlation column contains a large number of null values or duplicate values. This can cause Hive to use only one reducer to complete operations, causing Cartesian product and data bloat problems. Therefore, when making joins, it is important to ensure that you use valid association conditions so that operational performance is not affected by null or duplicate values of the data.

Optimize the case

Before:

SELECT * FROM A, B;            

-- In the pre-optimization SQL code, implicit joins were used, and the join conditions were not explicitly specified, resulting in Cartesian products

After optimization;

SELECT * FROM A CROSS JOIN B;           

IN THE OPTIMIZED SQL CODE, EXPLICIT CROSS JOINS ARE USED TO ENSURE THAT ONLY ALL COMBINATIONS IN TABLES A AND B ARE RETURNED, WITHOUT DUPLICATE ROWS. By explicitly specifying the connection mode, you can avoid unnecessary Cartesian product operations and improve query efficiency.

4. Data Loading and Transformation

4.1 Use Compression Format

During the data loading process, select the appropriate data storage format (for structured data, you can choose a columnar storage format such as Parquet or ORC; For unstructured data, you can choose formats such as TextFile or SequenceFile), which can improve query performance and reduce storage space.

Optimize the case

Before:

LOAD DATA INPATH '/path/to/data' INTO TABLE table;           

Optimized:

LOAD DATA INPATH '/path/to/data' INTO TABLE table STORED AS ORC;           

4.2 Data Transformation and Filtering

Transforming and filtering the data before it loads can reduce the amount of data and speed up queries. For example, you can use Hive's built-in functions to cleanse and transform data to meet specific query requirements.

Optimize the case

Before:

SELECT * FROM table WHERE name LIKE '%John%';            

Optimized:

SELECT * FROM table WHERE name = 'John';           

4.3 Multiple INSERTs in a single scan of the table

By default, Hive performs multiple table scans. Therefore, if you want to perform multiple operations in a hive table, it is recommended to use one scan and use the scan to perform multiple operations.

For example, you can query the data of one table multiple times and load it into another table. As shown in the following example, the table my_table is a partitioned table, the partition field is dt, if you need to query 2 specific partition date data in the table, and load the records into 2 different tables.

INSERT INTO temp_table_20201115 SELECT * FROM my_table WHERE dt ='2020-11-15';
INSERT INTO temp_table_20201116 SELECT * FROM my_table WHERE dt ='2020-11-16';           

In the above query, Hive will scan the table 2 times, in order to avoid this, we can use the following method:

FROM my_table
INSERT INTO temp_table_20201115 SELECT * WHERE dt ='2020-11-15'
INSERT INTO temp_table_20201116 SELECT * WHERE dt ='2020-11-16'           

This ensures that only one scan is performed on the my_table table, which can greatly reduce the time and resources for execution.

5. Performance evaluation and optimization

5.1 Use the EXPLAIN command

Use the EXPLAIN command to analyze the query plan and evaluate the performance of the query. By looking at the resource consumption in your query plan, you can identify potential performance issues and optimize accordingly.

Optimize the case

Before:

EXPLAIN SELECT * FROM table WHERE age = 30;            

Optimized:

EXPLAIN SELECT * FROM table WHERE age = 30 AND partition = 'partition1';            

5.2 Adjust the degree of parallelism and resource allocation

Adjusting the parallelism and resource allocation of Hive queries based on the configuration and resources of the cluster can improve the concurrency and overall performance of Hive queries. By setting the hive.exec.parallel parameter to true, you can enable concurrent execution. However, in a shared cluster, it should be noted that if there are more parallel phases in the job, the cluster utilization will increase. It is recommended to use it when the amount of data is large and the SQL statement is very long, because the data volume is small, and the opening of the small SQL statement may not be as fast as before.

Optimize the case

Before:

SET hive.exec.parallel=true;            

Optimized:

SET hive.exec.parallel=false; SET hive.exec.reducers.max=10;           

6. Data skew

The task progress is maintained at 99% (or 100%) for a long time, and only a few (1 or several) reduce subtasks are found to be incomplete after checking the task monitoring page. These unfinished Reduce subtasks are significantly different from other Reduce subtasks due to the amount of data processed. Specifically, there is a significant difference between the number of records and the average number of records for a single reduce subtask, which can often be 3 times or more. In addition, the maximum duration of the uncompleted Reduce subtask significantly exceeded the average duration. The main reasons can be boiled down to the following:

6.1 Data skew caused by null values

When there are a large number of null values (NULL) in the data warehouse, the data is unevenly distributed. This data skew can have a negative impact on data analysis and computation. When a field in a data warehouse has a large number of null values, these null values can cause imbalances in data calculation and aggregation operations. For example, when you use aggregate functions (such as SUM, COUNT, AVG, etc.) to calculate this field, null values are not included, resulting in inconsistent results. Data skew causes some reduce subtasks to be overloaded while others are lightly loaded, which affects the overall performance of the task. This can result in a task progress of 99% (or 100%) remaining uncompleted for a long time, but still a small number of reduce subtasks are not completed.

Optimize the solution

First, you can directly prevent null values from participating in join operations, that is, null values from having shuffle phases.

Second: because the hash result of the null value is the same when it participates in the shuffle, then we can randomly assign the null value to the null value, so that their hash results are different and will be entered into different reduce.

6.2 Data skew caused by different data types

In a data warehouse, fields of different data types may have different value ranges and distributions. For example, a field may be of the enumeration type, with only a few fixed values; The other field may be a continuous value with a wide range of values. When performing data calculation and aggregation operations, data skew occurs when there are significant differences in data distribution between fields of different data types. Data skew causes some reduce subtasks to be overloaded while others are lightly loaded, which affects the overall performance of the task. This can result in a task progress of 99% (or 100%) remaining uncompleted for a long time, but still a small number of reduce subtasks are not completed.

Optimize the solution

If the key field has both a string type and an int type, the default hash will be assigned according to the int type, then we can just change the int type to string, so that the key field is all string, and the hash is assigned according to the string type.

6.3 Data skew caused by large files that cannot be split

In the Hadoop distributed computing framework, data is typically split into multiple chunks for parallel processing. However, when encountering some large files that cannot be sharded, these large files will be assigned to one reduce task as a whole for processing, while other reduce tasks may get a smaller amount of data. This causes some reduce tasks to be overloaded and others to be lightly loaded, which affects the overall performance of the task.

Optimize the solution

There is no good solution to this data skew problem, and the only way to do this is to convert files that do not support file splitting, such as GZIP compression, to bzip, zip, and other compression methods that support file splitting.

Therefore, when compressing files, in order to avoid the skew of data reading caused by large files that cannot be split, we can use compression algorithms such as bzip2 and Zip that support file segmentation when compressing data.

6.4 Data skew caused by data bloat

Data bloat is usually caused by a lot of redundancy, duplication, or splitting of some data in the warehouse. When this data is used for computing and aggregation operations, some reduce subtasks are overloaded and others are lightly loaded, which affects the overall performance of the task.

Optimize the solution

In Hive, you can automatically control the dismantling of a job by configuring the hive.new.job.grouping.set.cardinality parameter, which is 30 by default. Indicates that for multi-dimensional aggregations such as grouping sets/rollups/cubes, if the final disassembled key combination is greater than this value, a new task will be enabled to handle the combination outside of the larger value. If the columns of a grouping aggregation have a large skew when processing data, you can adjust the value appropriately.

6.5 Data skew caused by table joining

In a data warehouse, table joins are common operations used to associate and merge data from different tables. However, when the data of connection keys is not evenly distributed in different tables, the amount of data corresponding to some connection keys in the connection result is much larger than that of other connection keys. This causes some reduce tasks to be overloaded and others to be lightly loaded, which affects the overall performance of the task.

Optimize the solution

In general, skewed data is stored in a distributed cache and distributed to the nodes where each Map task is located. The join operation is done in the Map phase, i.e., MapJoin, which avoids Shuffle and thus data skew.

6.6 It is true that there is no way to reduce the amount of data that causes data skew

In some cases, the amount of data itself is very large, such as large data sets in certain business scenarios, or the accumulation of historical data. In this case, even if measures such as data preprocessing and data partitioning are taken, the amount of data cannot be reduced.

Optimize the solution

The most straightforward way to do this is to adjust the memory size of the reduce execution.

调整reduce的内存大小使用mapreduce.reduce.memory.mb这个配置。

7. Merge small files

In HDFS, each small file object occupies about 150 bytes of metadata space, and if a large number of small files exist, it will occupy a large amount of memory resources. This severely limits the memory capacity of the NameNode node, which in turn affects the ability of the entire cluster to scale. From Hive's point of view, small files lead to a large number of Map tasks, each of which needs to be executed by starting a separate JVM. The initialization, startup, and execution of these tasks consume a large amount of computing resources and seriously affect performance because each small file requires a disk IO operation.

For this reason, I strongly recommend avoiding data sources that contain a large number of small files. Instead, we should merge small files to reduce the number of disk IOs during the query process and improve query efficiency. By merging small files, we can merge multiple small files into one larger file, thus reducing the number of IO visits to the disk. This reduces the consumption of system resources and improves query performance.

Therefore, when building a data warehouse, you should use large files to store data as much as possible to avoid the creation of a large number of small files. If you already have a large number of small files, you can consider merging small files to optimize data storage and query performance. This can improve the efficiency of Hive queries, reduce the waste of resources, and ensure the stability and scalability of the system.

7.1 Hive merges small file parameters

--Whether to merge with the Map output file, the default is true

set hive.merge.mapfiles = true;           

--Whether to merge Reduce output files, default false

set hive.merge.mapredfiles = true;           

-- The size of the merged file, default is 256000000 bytes

set hive.merge.size.per.task = 256000000;           

-- When the average size of the output file is less than this value, start a separate map-reduce task to merge the file, which is 16000000 bytes by default

set hive.merge.smallfiles.avgsize = 256000000;           

7.2 The Spark engine merges small file parameters, so try to switch MR to Spark

--Whether to merge small files, the default is true

conf spark.sql.hive.mergeFiles=true;           

8. Conclusion

This paper introduces the essential Hive SQL tuning skills for big data practitioners, including query optimization, data partitioning and indexing, data loading and transformation, etc. With a deep understanding of the Hive SQL language and optimization strategies, developers can improve query efficiency and performance. Through the optimization cases and the SQL code before and after optimization, the practical application effect of each optimization scheme is demonstrated.

Attached: Practical cases

1. Background

A company's online platform generates a large amount of user data every day, including user behavior, order information, etc. To better analyze user behavior and business trends, we need to perform complex query operations on the data. The original Hive SQL statement had a performance bottleneck in execution, so we decided to optimize it.

2. Original SQL statements

The original Hive SQL statement is as follows:

SELECT * FROM user_data WHERE user_id IN (SELECT user_id FROM order_data WHERE order_date >= '2022-01-01')           

The purpose of this query statement is to extract data from the user_data table for all users who have orders in the last month in the order_data table. Due to the large amount of data in both user_data and order_data tables, the execution time of this query statement is long and there is a performance bottleneck.

3. Optimize the strategy

To solve the performance bottleneck of the original SQL statement, we adopt the following optimization strategies:

Use the Spark compute engine: Spark is an efficient distributed computing framework that can be integrated with Hive SQL to improve query efficiency. We'll use the Spark compute engine to execute the query.

Use JOIN operation: You can use JOIN operation to connect two tables to reduce data transmission and computing overhead. We'll use the JOIN operation to join user_data table and order_data table.

Use filters: During a query, you can use filters to reduce the amount of data that is processed. We'll use filters to filter out eligible user data.

4. Optimized SQL statements

Based on the above optimization strategy, we optimize the following Hive SQL statements:

SELECT u.* FROM user_data u JOIN (SELECT user_id FROM order_data WHERE order_date >= '2022-01-01') o ON u.user_id = o.user_id           

This query statement uses the JOIN operation to join the user_data table and the subquery results, and filters out the user data that meets the requirements by filtering conditions. At the same time, we used the Spark compute engine to execute the query.

5. Performance comparison

We compare the performance of the SQL statements before and after optimization. Here are the results of the performance comparison:

Execution time: The execution time of the optimized SQL statement is reduced by about 50% compared with that of the original SQL statement.

Data Transfer: The optimized SQL statement reduces the amount of data transferred and improves the efficiency of data processing.

Memory consumption: The optimized SQL statements use the Spark compute engine, which can make better use of memory resources and improve query performance.

Through comparison, it can be seen that the optimized SQL statements have achieved significant improvements in execution time, data transfer volume, and memory consumption.

Read on