天天看点

shardingsphere源码分析(六)-- 归并引擎shardingsphere源码分析(六)-- 归并引擎

shardingsphere源码分析(六)-- 归并引擎

  • shardingsphere源码分析(六)-- 归并引擎
    • 官方介绍
    • debug
    • 总结

shardingsphere源码分析(六)-- 归并引擎

官方介绍

链接如下:

https://shardingsphere.apache.org/document/current/cn/features/sharding/principle/merge/

将从各个数据节点获取的多数据结果集,组合成为一个结果集并正确的返回至请求客户端,称为结果归并。

ShardingSphere 支持的结果归并从功能上分为遍历、排序、分组、分页和聚合 5 种类型,它们是组合而非互斥的关系。 从结构划分,可分为流式归并、内存归并和装饰者归并。流式归并和内存归并是互斥的,装饰者归并可以在流式归并和内存归并之上做进一步的处理。

  • 遍历归并

    它是最为简单的归并方式。 只需将多个数据结果集合并为一个单向链表即可。在遍历完成链表中当前数据结果集之后,将链表元素后移一位,继续遍历下一个数据结果集即可。

  • 排序归并

    由于在 SQL 中存在 ORDER BY 语句,因此每个数据结果集自身是有序的,因此只需要将数据结果集当前游标指向的数据值进行排序即可。 这相当于对多个有序的数组进行排序,归并排序是最适合此场景的排序算法。

  • 分组归并

    分组归并的情况最为复杂,它分为流式分组归并和内存分组归并。 流式分组归并要求 SQL 的排序项与分组项的字段以及排序类型(ASC 或 DESC)必须保持一致,否则只能通过内存归并才能保证其数据的正确性。

  • 聚合归并

    无论是流式分组归并还是内存分组归并,对聚合函数的处理都是一致的。 除了分组的 SQL 之外,不进行分组的 SQL 也可以使用聚合函数。 因此,聚合归并是在之前介绍的归并类的之上追加的归并能力,即装饰者模式。聚合函数可以归类为比较、累加和求平均值这 3 种类型。

  • 分页归并

    上文所述的所有归并类型都可能进行分页。 分页也是追加在其他归并类型之上的装饰器,ShardingSphere 通过装饰者模式来增加对数据结果集进行分页的能力。 分页归并负责将无需获取的数据过滤掉。

归并引擎的整体结构划分如下图。

shardingsphere源码分析(六)-- 归并引擎shardingsphere源码分析(六)-- 归并引擎

debug

运行examples/shardingsphere-jdbc-example/sharding-example/sharding-raw-jdbc-example/src/main/java/org/apache/shardingsphere/example/sharding/raw/jdbc/YamlRangeConfigurationExampleMain.java

查询sql,才会走归并引擎

// ShardingSpherePreparedStatement.java
public ResultSet executeQuery() throws SQLException {
	...
  	List<QueryResult> queryResults = this.executeQuery0();
    // 归并
    MergedResult mergedResult = this.mergeQuery(queryResults);
    ...
 }
           

先通过SPI初始化MergeEngine

// MergeEngine.java
public MergeEngine(DatabaseType databaseType, ShardingSphereSchema schema, ConfigurationProperties props, Collection<ShardingSphereRule> rules) {
    this.databaseType = databaseType;
    this.schema = schema;
    this.props = props;
    this.engines = OrderedSPIRegistry.getRegisteredServices(rules, ResultProcessEngine.class);
}
           

然后,调用merge 函数

// MergeEngine.java
public MergedResult merge(List<QueryResult> queryResults, SQLStatementContext<?> sqlStatementContext) throws SQLException {
    Optional<MergedResult> mergedResult = this.executeMerge(queryResults, sqlStatementContext);
    Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(this.decorate((MergedResult)mergedResult.get(), sqlStatementContext)) : this.decorate((QueryResult)queryResults.get(0), sqlStatementContext);
    return (MergedResult)result.orElseGet(() -> {
        return new TransparentMergedResult((QueryResult)queryResults.get(0));
    });
}
           

查询语句最终走到 ShardingDQLResultMerger 里

// ShardingDQLResultMerger.java
public MergedResult merge(List<QueryResult> queryResults, SQLStatementContext<?> sqlStatementContext, ShardingSphereSchema schema) throws SQLException {
    if (1 == queryResults.size()) {
        return new IteratorStreamMergedResult(queryResults);
    } else {
        Map<String, Integer> columnLabelIndexMap = this.getColumnLabelIndexMap((QueryResult)queryResults.get(0));
        SelectStatementContext selectStatementContext = (SelectStatementContext)sqlStatementContext;
        selectStatementContext.setIndexes(columnLabelIndexMap);
        // 判断要做哪种合并
        MergedResult mergedResult = this.build(queryResults, selectStatementContext, columnLabelIndexMap, schema);
        return this.decorate(queryResults, selectStatementContext, mergedResult);
    }
}
           

这里根据group by、distinct、order by等关键字做不同的归并处理

// ShardingDQLResultMerger.java
private MergedResult build(List<QueryResult> queryResults, SelectStatementContext selectStatementContext, Map<String, Integer> columnLabelIndexMap, ShardingSphereSchema schema) throws SQLException {
    if (this.isNeedProcessGroupBy(selectStatementContext)) {
        return this.getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schema);
    } else if (this.isNeedProcessDistinctRow(selectStatementContext)) {
        this.setGroupByForDistinctRow(selectStatementContext);
        return this.getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schema);
    } else {
        return (MergedResult)(this.isNeedProcessOrderBy(selectStatementContext) ? new OrderByStreamMergedResult(queryResults, selectStatementContext, schema) : new IteratorStreamMergedResult(queryResults));
    }
}
           
这里 logicsql 是 SELECT * FROM t_order
Actual SQL: ds_0 ::: SELECT * FROM t_order ORDER BY order_id ASC
           
shardingsphere源码分析(六)-- 归并引擎shardingsphere源码分析(六)-- 归并引擎

所以上面走的是排序归并

最后归并完的结果如下图

shardingsphere源码分析(六)-- 归并引擎shardingsphere源码分析(六)-- 归并引擎

我们修改过的avg语句,实际sql如下

Logic SQL: SELECT avg(user_id) FROM t_order_item 
SQLStatement: MySQLSelectStatement(limit=Optional.empty, lock=Optional.empty, window=Optional.empty) 
Actual SQL: ds_0 ::: SELECT avg(user_id) , COUNT(user_id) AS AVG_DERIVED_COUNT_0 , SUM(user_id) AS AVG_DERIVED_SUM_0 FROM t_order_item 
Actual SQL: ds_1 ::: SELECT avg(user_id) , COUNT(user_id) AS AVG_DERIVED_COUNT_0 , SUM(user_id) AS AVG_DERIVED_SUM_0 FROM t_order_item 
           

走的是分组归并

shardingsphere源码分析(六)-- 归并引擎shardingsphere源码分析(六)-- 归并引擎

不过由于查询字段和之前的代码不一致,运行到设值的时候报错了

我们再次修改

修改examples/example-core/example-raw-jdbc/src/main/java/org/apache/shardingsphere/example/core/jdbc/repository/OrderItemRepositoryImpl.java

public List<OrderItem> selectAll() throws SQLException {
    String sql = "SELECT * FROM t_order_item group by status";
    return getOrderItems(sql);
}
           

然后执行 mvn install

再次运行 YamlRangeConfigurationExampleMain

在查询 t_order_item 的时候,走了分组归并

然后下面这段代码是控制走流式分组归并还是内存分组归并

// ShardingDQLResultMerger.java
private MergedResult getGroupByMergedResult(List<QueryResult> queryResults, SelectStatementContext selectStatementContext, Map<String, Integer> columnLabelIndexMap, ShardingSphereSchema schema) throws SQLException {
    return (MergedResult)(selectStatementContext.isSameGroupByAndOrderByItems() ? new GroupByStreamMergedResult(columnLabelIndexMap, queryResults, selectStatementContext, schema) : new GroupByMemoryMergedResult(queryResults, selectStatementContext, schema));
}
           

我们这条sql走的是流式分组归并

shardingsphere源码分析(六)-- 归并引擎shardingsphere源码分析(六)-- 归并引擎

最后分组查出来的结果如下

shardingsphere源码分析(六)-- 归并引擎shardingsphere源码分析(六)-- 归并引擎

总结

如果查询没有带分库分表键的话,查询结果就需要归并处理,所以查询语句最好带上分库分表键。