跳到主要内容

分布式查询与数据混排

概述

分布式查询是分布式数据库的必要功能。 本文档旨在解释分布式查询及其数据流设计。

本地查询

让我们看看在单个数据库节点上正常查询的运行方式。

'        +------+       +------------+      +---------+
' | | AST | | Plan | |
' SQL--->|Parser+------>|Plan Builder+----->|Optimizer|
' | | | | | |
' +------+ +------------+ +---+-----+
' | Plan
' v
' +----------+ +-----------+
' | | Processor | |
' Data <------+DataStream|<-----------+Interpreter|
' | | | |
' +----------+ +-----------+

解析器和抽象语法树(AST)

Databend 使用第三方 SQL 解析器及其抽象语法树(AST)。 更多信息,请参见:https://github.com/ballista-compute/sqlparser-rs

PlanBuilder 和计划(Plan)

查询计划(或查询执行计划)是用于在 Databend 中访问数据的步骤序列。它由 PlanBuilder 从 AST 构建。我们也使用树来描述它(类似于 AST)。但它与 AST 有一些不同:

  • 计划是可序列化和反序列化的。
  • 计划在语法上是安全的,我们不必担心它。
  • 计划用于描述计算和数据依赖关系,与语法优先级无关
  • 我们可以使用 EXPLAIN SELECT ... 显示它
EXPLAIN SELECT number % 3 AS key, SUM(number) AS value FROM numbers(1000) WHERE number > 10 AND number < 990 GROUP BY key ORDER BY key ASC LIMIT 10;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| explain |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Limit: 10
Projection: (number % 3) as key:UInt8, SUM(number) as value:UInt64
Sort: (number % 3):UInt8,
AggregatorFinal: groupBy=[[(number % 3)]], aggr=[[SUM(number)]]
AggregatorPartial: groupBy=[[(number % 3)]], aggr=[[SUM(number)]]
Expression: (number % 3):UInt8, number:UInt64 (Before GroupBy)
Filter: ((number > 10) AND (number < 990))
ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 1000, read_bytes: 8000] |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

优化器和计划(Plan)

对于一个查询,尤其是复杂的查询,可以使用不同的计划组合、顺序和结构来获取数据。每种不同的方式都会得到不同的处理时间。因此,我们需要在最短的时间内找到一种合理的计划组合方式,这就是优化器所做的。

解释器和处理器(Processor)

解释器将优化后的计划构建成可执行的数据流。我们通过拉取流中的数据来获取 SQL 的结果。SQL 中每个操作符的计算逻辑对应一个处理器,例如 FilterPlan -> FilterProcessor, ProjectionPlan -> ProjectionProcessor

分布式查询

在集群模式下,我们可能需要处理一些与单机模式不同的问题。

  • 在分布式模式下,要查询的表总是分布在不同的节点上
  • 对于某些场景,分布式处理总是高效的,例如带有键的 GROUP BY,JOIN
  • 对于某些场景,我们无法进行分布式处理,例如 LIMIT,不带键的 GROUP BY
  • 为了确保快速计算,我们需要协调计算和数据的位置。

让我们看看在数据库集群上正常查询的运行方式。

'                                              +------+       +------------+      +------------------+
' | | AST | | Plan | Optimizer |
' SQL--->|Parser+------>|Plan Builder+----->| |
' | | | | | ScatterOptimizer |
' +------+ +------------+ +--------+---------+
' |
' +--------------+ |
' | | |
' +--+ FlightStream | <------+ | Plan
' | | | | |
' | +--------------+ | |
' | | |
' | | |
' | | Flight RPC v
' +----------+ Processor | +--------------+ | +----------------+
' | | RemoteProcessor | | | | do_action | Interpreter |
' Data<--+DataStream|<----------------+--+ FlightStream | <------+------------------+ |
' | | | | | | | PlanRescheduler|
' +----------+ | +--------------+ | +----------------+
' | |
' | |
' | |
' | |
' | +--------------+ |
' | | | |
' +--+ FlightStream | <------+
' | |
' +--------------+

ScatterOptimizer 和 StagePlan

在 Databend 中,我们使用 ScatterOptimizer 来决定查询的分布式计算。换句话说,分布式查询是单机查询的一种优化。

在 ScatterOptimizer 中,我们遍历查询的所有计划并重写感兴趣的计划 (重写为 StagePlan { kind:StageKind, input:Self }),其中 input 是重写后的计划,kind 是一个枚举(Normal: 数据再次混排,Expansive: 数据从一个节点扩散到多个节点,Convergent: 数据从多个节点聚合到一个节点)

PlanScheduler 和 RemoteProcessor

在集群模式下,我们提取 ScatterOptimizer 优化后的计划中的所有 StagePlans,并根据 kind 将它们发送到集群中的相应节点。

例如:

EXPLAIN SELECT argMin(user, salary)  FROM (SELECT sum(number) AS salary, number%3 AS user FROM numbers_local(1000000000) GROUP BY user);
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| explain |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection: argMin(user, salary):UInt64 <-- 在本地节点执行
AggregatorFinal: groupBy=[[]], aggr=[[argMin(user, salary)]]
RedistributeStage[expr: 0] <-- 在集群的所有节点执行
AggregatorPartial: groupBy=[[]], aggr=[[argMin(user, salary)]]
Projection: sum(number) as salary:UInt64, (number % 3) as user:UInt8
AggregatorFinal: groupBy=[[(number % 3)]], aggr=[[sum(number)]]
RedistributeStage[expr: sipHash(_group_by_key)] <-- 在集群的所有节点执行
AggregatorPartial: groupBy=[[(number % 3)]], aggr=[[sum(number)]]
Expression: (number % 3):UInt8, number:UInt64 (Before GroupBy)
RedistributeStage[expr: blockNumber()] <-- 在本地节点执行
ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 1000000000, read_bytes: 8000000000] |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Flight API DataStream

我们需要以某种方式获取发送给其他节点执行的计划的结果。FuseData 使用了第三方库 arrow-flight。更多信息:[https://github.com/apache/arrow-rs/tree/master/arrow-flight]