天天看點

ClickHouse 源碼閱讀 —— SQL的前世今生

ClickHouse 源碼閱讀 —— SQL的前世今生

注:以下分析基于開源 v19.15.2.2-stable 版本進行,社群最新版本代碼改動較大,但是總體思路是不變的。

使用者送出一條查詢SQL背後發生了什麼?

在傳統關系型資料庫中,SQL處理器的元件主要包括以下幾種:

• Query Parsing

負責進行詞法和文法分析,把程式從人類高可讀的格式(即SQL)轉化成機器高可讀的格式(AST,抽象文法樹)。

詞法分析指的是把SQL中的字元序列分解成一個個獨立的詞法單元——Token(<類型,值>)。

文法分析指的是從詞法分析器輸出的token中識别各類短語,并構造出一顆抽象文法樹。而按照構造抽象文法樹的方向,又可以把文法分析分成自頂向下和自底向上分析兩種。而ClickHouse采用的則是手寫一個遞歸下降的文法分析器。

• Query Rewrite

即通常我們說的"Logical Optimizer"或基于規則的優化器(Rule-Based Optimizer,即RBO)。

其負責應用一些啟發式規則,負責簡化和标準化查詢,無需改變查詢的語義。

常見操作有:謂詞和算子下推,視圖展開,簡化常量運算表達式,謂詞邏輯的重寫,語義的優化等。

• Query Optimizer

即通常我們所說的"Physical Optimizer",負責把内部查詢表達轉化成一個高效的查詢計劃,指導DBMS如何去取表,如何進行排序,如何Join。如下圖所示,一個查詢計劃可以被認為是一個資料流圖,在這個資料流圖中,表資料會像在管道中傳輸一樣,從一個查詢操作符(operator)傳遞到另一個查詢操作符。

ClickHouse 源碼閱讀 —— SQL的前世今生

一個查詢計劃

• Query Executor

查詢執行器,負責執行具體的查詢計劃,從存儲引擎中擷取資料并且對資料應用查詢計劃得到結果。

執行引擎也分為很多種,如經典的火山模型(Volcano Model),還有ClickHouse采用的向量化執行模型(Vectorization Model)。

ClickHouse 源碼閱讀 —— SQL的前世今生

(圖來自經典論文 Architecture Of Database System)

但不管是傳統的關系型資料庫,還是非關系型資料庫,SQL的解析和生成執行計劃過程都是大同小異的,而縱覽ClickHouse的源代碼,可以把使用者送出一條查詢SQL背後的過程總結如下:

1.服務端接收用戶端發來的SQL請求,具體形式是一個網絡包,Server的協定層需要拆包把SQL解析出來

2.Server負責初始化上下文與Network Handler,然後 Parser 對Query做詞法和文法分析,解析成AST

3.Interpreter的 SyntaxAnalyzer 會應用一些啟發式規則對AST進行優化重寫

4.Interpreter的 ExpressionAnalyzer 根據上下文資訊以及優化重寫後的AST生成實體執行計劃

5.實體執行計劃分發到本地或者分布式的executor,各自從存儲引擎中擷取資料,應用執行計劃

6.Server把執行後的結果以Block流的形式輸出到Socket緩沖區,Client從Socket中讀取即可得到結果

ClickHouse 源碼閱讀 —— SQL的前世今生

接收用戶端請求

我們要以服務端的視角來出發,首先來看server.cpp大概做什麼事情:

下面隻挑選重要的邏輯:

• 初始化上下文

• 初始化Zookeeper(ClickHouse的副本複制機制需要依賴ZooKeeper)

• 正常配置初始化

• 綁定服務端的端口,根據網絡協定初始化Handler,對用戶端提供服務

int Server::main()
{
    // 初始化上下文
    global_context = std::make_unique<Context>(Context::createGlobal());
    global_context->setApplicationType(Context::ApplicationType::SERVER);
     
    // zk初始化
    zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });
    
    //其他config的初始化
    //...
    
    //綁定端口,對外提供服務
    auto address = make_socket_address(host, port);
    socket.bind(address, /* reuseAddress = */ true);

    //根據網絡協定建立不同的server類型
    //現在支援的server類型有: HTTP,HTTPS,TCP,Interserver,mysql
    //以TCP版本為例:
    create_server("tcp_port", [&](UInt16 port)
    {
        Poco::Net::ServerSocket socket;
        auto address = socket_bind_listen(socket, listen_host, port);
        servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
            new TCPHandlerFactory(*this),
            server_pool,
            socket,
            new Poco::Net::TCPServerParams));
     });
    
    //啟動server
    for (auto & server : servers)
            server->start();
   
}           

用戶端發來的請求是由各自網絡協定所對應的 Handler 來進行的,server在啟動的時候 Handler 會被初始化并綁定在指定端口中。我們以TCPHandler為例,看看服務端是如何處理用戶端發來的請求的,重點關注 TCPHandler::runImpl 的函數實作:

• 初始化輸入和輸出流的緩沖區

• 接受請求封包,拆包

• 執行Query(包括整個詞法文法分析,Query重寫,實體計劃生成和生成結果)

• 把Query結果儲存到輸出流,然後發送到Socket的緩沖區,等待發送回用戶端

void TCPHandler::runImpl()
{
    //執行個體化套接字對應的輸入和輸出流緩沖區
    in = std::make_shared<ReadBufferFromPocoSocket>(socket());
    out = std::make_shared<WriteBufferFromPocoSocket>(socket());
    
    while (1){
        // 接收請求封包
        receivePacket();
        
        // 執行Query    
        state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
    
        //根據Query種類來處理不同的Query
        //處理insert Query
        processInsertQuery();
        //并發處理普通Query
        processOrdinaryQueryWithProcessors();
        //單線程處理普通Query
        processOrdinaryQuery();
    }
    
}           

那CK處理用戶端發送過來的Query的具體邏輯是怎樣的呢?

我們可以在dbms/src/Interpreters/executeQuery.cpp 中一探究竟:

具體邏輯在 executeQueryImpl 函數中,挑選核心的邏輯進行講解:

static std::tuple<ASTPtr, BlockIO> executeQueryImpl()
{
    //構造Parser
    ParserQuery parser(end, settings.enable_debug_queries);
    ASTPtr ast;

    //把Query轉化為抽象文法樹
    ast = parseQuery(parser, begin, end, "", max_query_size);

    //生成interpreter執行個體
    auto interpreter = InterpreterFactory::get(ast, context, stage);

    // interpreter解析AST,結果是BlockIO
    res = interpreter->execute();

    //傳回結果是抽象文法樹和解析後的結果組成的二進制組
    return std::make_tuple(ast, res);
}           

該函數所做的事情:

• 建構Parser,把Query解析成AST(抽象文法樹)

• InterpreterFactory根據AST生成對應的Interpreter執行個體

• AST是由Interpreter來解析的,執行結果是一個BlockIO,BlockIO是對 BlockInputStream 和 BlockOutputStream 的一個封裝。

總結:

• 服務端調用 executeQuery 來處理client發送的Query,執行後的結果儲存在state這個結構體的io成員中。

每一條Query都會對應一個state結構體,記錄了這條Query的id,處理狀态,壓縮算法,Query的文本和Query所處理資料對應的IO流等元資訊。

• 然後服務端調用 processOrdinaryQuery 等方法把輸出流結果封裝成異步的IO流,發送到回client。

ClickHouse 源碼閱讀 —— SQL的前世今生

解析請求(Parser)

CK選擇采用手寫一個遞歸下降的Parser來對SQL進行解析,生成的結果是這個SQL對應的抽象文法樹(AST),抽象文法樹由表示各個操作的節點(IAST)表示。而本節主要介紹Parser背後的核心邏輯:

詞法分析和文法分析的核心邏輯可以在parseQuery.cpp的 tryParseQuery 中一覽無餘。

該函數利用lexer将掃描Query字元流,将其分割為一個個的Token, token_iterator 即一個Token流疊代器,然後parser再對Token流進行解析生成AST抽象文法樹。

ASTPtr tryParseQuery()
{
    //Token為lexer詞法分析後的基本機關,詞法分析後生成的是Token流
    Tokens tokens(pos, end, max_query_size);
    IParser::Pos token_iterator(tokens);
    ASTPtr res;
    //Token流經過文法分析生成AST抽象文法樹
    bool parse_res = parser.parse(token_iterator, res, expected);
    return res;

}           

我們可以看到,文法分析的核心就在于parser執行的parse方法。parse 方法具體的實作在 ParserQuery.cpp 的 parseImpl 中。

bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
    ParserQueryWithOutput query_with_output_p(enable_explain);
    ParserInsertQuery insert_p(end);
    ParserUseQuery use_p;
    ParserSetQuery set_p;
    ParserSystemQuery system_p;

    bool res = query_with_output_p.parse(pos, node, expected)
        || insert_p.parse(pos, node, expected)
        || use_p.parse(pos, node, expected)
        || set_p.parse(pos, node, expected)
        || system_p.parse(pos, node, expected);

    return res;
}           

我們可以看到,這個方法粗略地把Query分為了五種,但是本質上可以歸納為兩種(第一種為有結果輸出,對應show,select,create等語句;第二種為無結果輸出,對應insert,use,set和與系統相關的語句(如exit))

• QueryWithOutput

• InsertQuery

• UseQuery

• SetQuery

• SystemQuery

每一種Query都自定義了其專屬的Parser,是以代碼邏輯是當接收到一個Query輸入的時候,會嘗試各種Query的Parser,直到成功為止。

我們可以select語句對應的parser進行分析:

核心邏輯可以總結為:

1.先給出select語句中可能出現的關鍵詞

2.在詞法分析生成的Token流中爬取這些關鍵詞

3.如果成功爬取,則 setExpression 函數會組裝該關鍵字對應的AST節點

每一種SQL語句(如select,drop,insert,create)都有對應的AST類,并且分别包含了這些語句中特有的關鍵字。

bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
    //建立AST樹節點
    auto select_query = std::make_shared<ASTSelectQuery>();
    node = select_query;
    
    //select語句中會出現的關鍵詞
    ParserKeyword s_select("SELECT");
    ParserKeyword s_distinct("DISTINCT");
    ParserKeyword s_from("FROM");
    ParserKeyword s_prewhere("PREWHERE");
    ParserKeyword s_where("WHERE");
    ParserKeyword s_group_by("GROUP BY");
    ParserKeyword s_with("WITH");
    ParserKeyword s_totals("TOTALS");
    ParserKeyword s_having("HAVING");
    ParserKeyword s_order_by("ORDER BY");
    ParserKeyword s_limit("LIMIT");
    ParserKeyword s_settings("SETTINGS");
    ParserKeyword s_by("BY");
    ParserKeyword s_rollup("ROLLUP");
    ParserKeyword s_cube("CUBE");
    ParserKeyword s_top("TOP");
    ParserKeyword s_with_ties("WITH TIES");
    ParserKeyword s_offset("OFFSET");
 
    //...
    //依次對Token流爬取上述關鍵字
    ParserTablesInSelectQuery().parse(pos, tables, expected)
   
    //根據文法分析結果設定AST的Expression屬性,可以了解為如果SQL存在該關鍵字,這個關鍵字都會轉化為AST上的一個節點
    select_query->setExpression(ASTSelectQuery::Expression::WITH, std::move(with_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::SELECT, std::move(select_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::TABLES, std::move(tables));
    select_query->setExpression(ASTSelectQuery::Expression::PREWHERE, std::move(prewhere_expression));
    select_query->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
    select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, std::move(group_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::HAVING, std::move(having_expression));
    select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(order_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, std::move(limit_by_offset));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_LENGTH, std::move(limit_by_length));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, std::move(limit_by_expression_list));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_OFFSET, std::move(limit_offset));
    select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, std::move(limit_length));
    select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings));
        
}           

整個Parser的流程圖:

ClickHouse 源碼閱讀 —— SQL的前世今生

執行請求(Interpreter)

解釋器(Interpreter)負責從抽象文法樹中建立查詢執行的流水線,整條流水線以 BlockInputStream 和 BlockOutputStream 進行組織。比方說"select"是基于"from"的Block輸出流來進行選擇的,選擇後的結果也會以Block輸出流的形式輸出到結果。 首先我們來看:

dbms/src/Interpreters/InterpreterFactory.cpp

每一種Query都會有對應的Interpreter,這個工廠方法就是根據AST的種類來執行個體化其對應的Interpreter,由其來具體執行對應AST的執行計劃:

std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage)
{
    //舉個例子,如果該AST是由select語句轉化過來,
    if (query->as<ASTSelectQuery>())
    {
        /// This is internal part of ASTSelectWithUnionQuery.
        /// Even if there is SELECT without union, it is represented by ASTSelectWithUnionQuery with single ASTSelectQuery as a child.
        return std::make_unique<InterpreterSelectQuery>(query, context, SelectQueryOptions(stage));
    }
}           

我們就以 InterpreterSelectQuery 為例,了解其執行個體化的核心邏輯:

InterpreterSelectQuery::InterpreterSelectQuery()
 {
     //擷取AST
    auto & query = getSelectQuery();
    
    //對AST做進一步文法分析,對文法樹做優化重寫
    syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
        query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());
    
    //每一種Query都會對應一個特有的表達式分析器,用于爬取AST生成執行計劃(操作鍊)
    query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
        query_ptr, syntax_analyzer_result, context,
        NameSet(required_result_column_names.begin(), required_result_column_names.end()),
        options.subquery_depth, !options.only_analyze); 
 }           

文法分析直接生成的AST轉化成執行計劃可能性能上并不是最優的,是以需要SyntaxAnalyzer 對其進行優化重寫,在其源碼中可以看到其涉及到非常多 基規則優化(rule based optimization) 的trick。

SyntaxAnalyzer 會逐個針對這些規則對查詢進行檢查,确定其是否滿足轉換規則,一旦滿足就會對其進行轉換。

SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze()
{
     // 剔除備援列
     removeDuplicateColumns(result.source_columns);
     
     // 根據settings中enable_optimize_predicate_expression配置判斷是否進行謂詞下移
     replaceJoinedTable(node);
     
     // 根據settings中distributed_product_mode配置重寫IN 與 JOIN 表達式
     InJoinSubqueriesPreprocessor(context).visit(query);
     
     // 優化Query内部的布爾表達式
     LogicalExpressionsOptimizer().perform();

     // 建立一個從别名到AST節點的映射字典 
     QueryAliasesVisitor(query_aliases_data, log.stream()).visit(query);
    
     // 公共子表達式的消除
     QueryNormalizer(normalizer_data).visit(query);
    
     // 消除select從句後的備援列
     removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
     
     // 執行标量子查詢,并且用常量替代标量子查詢結果
     executeScalarSubqueries(query, context, subquery_depth);

     // 如果是select語句還會做下列優化:
    
     // 謂詞下移優化
     PredicateExpressionsOptimizer(select_query, settings, context).optimize();
     
     /// GROUP BY 從句的優化
     optimizeGroupBy(select_query, source_columns_set, context);
     
     /// ORDER BY 從句的備援項剔除
     optimizeOrderBy(select_query);
     
     /// LIMIT BY 從句的備援列剔除
     optimizeLimitBy(select_query);
     
     /// USING語句的備援列剔除
     optimizeUsing(select_query);
   
}           

這裡挑選幾個簡單介紹一下:

• 公共子表達式消除(Common Subexpression Elimination)

如果表達式 x op y 先前被計算過,并且從先前的計算到現在其計算表達式對應的值沒有改變,那麼 x op y 就稱為公共子表達式。公共子表達式消除會搜尋所有相同計算表達式的執行個體,并分析是否值得用儲存計算值的單個變量來替換它們,以減少計算的開銷。

• 标量子查詢(Scala Subquery)的常量替換

标量子查詢就是傳回單一值的子查詢,和公共子表達式消除相似,可以用常量來替換SQL中所有的标量子查詢結果以減少計算開銷。

• 謂詞下移(Predicate Pushdown)

把外層查詢塊中的WHERE子句的謂詞下移到較低層查詢塊如視圖,以盡可能把過濾資料的操作移動到靠近資料源的位置。提前進行資料過濾能夠大幅減少網絡傳輸或者記憶體讀取通路的資料量,以提高查詢效率。

而 query_analyzer 的作用可以了解為解析優化重寫後的AST,然後對所要進行的操作組成一條操作鍊,即實體執行計劃,如:

ExpressionActionsChain chain;
analyzer.appendWhere(chain);
chain.addStep();
analyzer.appendSelect(chain);
analyzer.appendOrderBy(chain);
chain.finalize();           

上述代碼把where,select,orderby操作都加入到操作鍊中,接下來就可以從Storage層讀取Block,對Block資料應用上述操作鍊的操作。而執行的核心邏輯,就在對應Interpreter的 executeImpl 方法實作中,這裡以select語句的Interpreter來了解下讀取Block資料并且對block資料進行相應操作的流程。

void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input)
{
     // 對應Query的AST
     auto & query = getSelectQuery();
    
     AnalysisResult expressions;
     // 實體計劃,判斷表達式是否有where,aggregate,having,order_by,litmit_by等字段
     expressions = analyzeExpressions(
                getSelectQuery(),
                *query_analyzer,
                QueryProcessingStage::FetchColumns,
                options.to_stage,
                context,
                storage,
                true,
                filter_info);
    
     // 從Storage讀取資料
     executeFetchColumns(from_stage, pipeline, sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);

     // eg:根據SQL的關鍵字在BlockStream流水線中執行相應的操作, 如where,aggregate,distinct都分别由一個函數負責執行
     executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
     
     executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
     
     executeDistinct(pipeline, true, expressions.selected_columns);    
    
}           

既然我們知道了執行計劃AnalysisResult(即實體執行計劃),接下來就需要從storage層中讀取資料來執行對應的操作,核心邏輯在 executeFetchColumns 中: 核心操作就是從storage層讀取所要處理列的Block,并組織成BlockStream。

void InterpreterSelectQuery::executeFetchColumns(
        QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
        const SortingInfoPtr & sorting_info, const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere)
{   
    // 執行個體化Block Stream
    auto streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams)
    // 讀取列對應的Block,并且組織成Block Stream
    streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))};
    streams.back() = std::make_shared<ExpressionBlockInputStream>(streams.back(), query_info.prewhere_info->remove_columns_actions); 
}           

讀取完Block Stream之後就是對其執行各種execute操作如 executeAggregation , executeWhere 操作,詳見 InterpreterSelectQuery::executeImpl 的代碼。

是以Interpreter的處理過程可以總結為:

• 對AST進行優化重寫

• 解析重寫後的AST并生成操作鍊(執行計劃)

• 從存儲引擎中讀取要處理的Block資料

• 對讀取的Block資料應用操作鍊上的操作

那我們讀取Block Stream并進行處理後,生成的結果如何寫回到storage層呢? 我們這裡以insert語句的Interpreter來了解下:

BlockIO InterpreterInsertQuery::execute()
{
     // table為存儲引擎接口
    StoragePtr table = getTable(query);
    BlockOutputStreamPtr out;
    
    // 從存儲引擎讀取Block Stream
    auto query_sample_block = getSampleBlock(query, table);
    out = std::make_shared<AddingDefaultBlockOutputStream>(
        out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
    
    //執行結果封裝成BlockIO
    BlockIO res;
    res.out = std::move(out);    
}           

上面代碼中的StoragePtr實際上就是IStorage這個存儲引擎的接口

using StoragePtr = std::shared_ptr<IStorage>;           

無論是寫入還是讀取操作都是依靠底層存儲引擎(如MergeTree)的write和read接口來實作的,關于存儲引擎的細節實作這裡暫時不贅述,這裡我們隻需要知道我們從存儲引擎接口中以流方式讀取Block資料,而結果組織成BlockIO流輸出。Interpreter的流程總結如下:

ClickHouse 源碼閱讀 —— SQL的前世今生

傳回請求結果

TCPHandler::runImpl 中,執行完 executeQuery 之後需要調用各種processQuery的方法來給client傳回執行SQL後的結果。

我們以 TCPHandler::processOrdinaryQuery 為例做簡單分析:

void TCPHandler::processOrdinaryQuery()
{
    //把BlockStream封裝成異步的Stream,那麼從流中讀取資料将會是異步操作
    AsynchronousBlockInputStream async_in(state.io.in);
    
    while(true){
         Block block;
         //從IO流讀取block資料
         block = async_in.read();
         //發送block資料
         sendData(block);
    }
}           

Server負責在 sendData 函數中把輸出結果寫入到套接字輸出緩沖區中,client隻要從這個輸出緩沖區讀取就能夠得到結果。

void TCPHandler::sendData(const Block & block)
{
    //初始化OutputStream的參數
    initBlockOutput(block);

    // 調用BlockOutputStream的write函數,把Block寫到輸出流
    state.block_out->write(block);
    state.maybe_compressed_out->next();
    out->next();
}           

結語

了解ClickHouse背後SQL的查詢整個流程,不僅能讓資料庫使用者更清晰地認識到如何編寫最優化的SQL,也能夠讓資料庫核心開發者加深對資料庫體系結構的了解,提高開發效率。本文并沒有涉及到太深入的技術細節,諸如向量化執行引擎,SIMD,基于llvm的動态代碼生成,類MergeTree存儲引擎等CK的技術細節也沒有提及,隻是從宏觀角度給讀者介紹了執行SQL背後核心到底發生了什麼。後續我們會推出更多核心源碼解讀文章,敬請關注。

寫在最後

阿裡雲已經率先推出了ClickHouse的雲托管産品,産品首頁位址:

雲資料庫ClickHouse

,目前正在免費公測中,歡迎大家點選連結申請免費試用。

ClickHouse 源碼閱讀 —— SQL的前世今生

我們也開通了阿裡雲ClickHouse釘釘交流群,通過專業的資料庫專家為客戶提供咨詢、答疑服務。歡迎大家任選如下方式入群交流,我們将會定期推送ClickHouse最佳實踐、操作指南、原了解讀等深度文章。

• 方式一:使用釘釘搜尋群号 23300515

• 方式二:使用釘釘掃描下方二維碼

ClickHouse 源碼閱讀 —— SQL的前世今生