data-shuffle
概述
分布式查询是分布式数据库的必要功能。 本文档旨在解释分布式查询及其数据流设计。
本地查询
让我们看看普通查询如何在单个数据库节点上运行。
' +------+ +------------+ +---------+
' | | AST | | Plan | |
' SQL--->|Parser+------>|Plan Builder+----->|Optimizer|
' | | | | | |
' +------+ +------------+ +---+-----+
' | Plan
' v
' +----------+ +-----------+
' | | Processor | |
' Data <------+DataStream|<-----------+Interpreter|
' | | | |
' +----------+ +-----------+
Parser 和 AST
Databend 使用第三方 SQL 解析器及其 AST。 更多信息请参考:https://github.com/ballista-compute/sqlparser-rs
PlanBuilder 和 Plan
查询计划(或查询执行计划)是在 Databend 中访问数据所使用的一系列步骤。它由 PlanBuilder 从 AST 构建。我们还使用树来描述它(类似于 AST)。但它与 AST 有一些不同:
- Plan 是可序列化和反序列化的。
- Plan 在语法上是安全的,我们不用担心它。
- Plan 用于描述计算和数据依赖关系,与语法优先级无关
- 我们可以使用
EXPLAIN SELECT ...
来显示它
EXPLAIN SELECT number % 3 AS key, SUM(number) AS value FROM numbers(1000) WHERE number > 10 AND number < 990 GROUP BY key ORDER BY key ASC LIMIT 10;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| explain |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Limit: 10
Projection: (number % 3) as key:UInt8, SUM(number) as value:UInt64
Sort: (number % 3):UInt8,
AggregatorFinal: groupBy=[[(number % 3)]], aggr=[[SUM(number)]]
AggregatorPartial: groupBy=[[(number % 3)]], aggr=[[SUM(number)]]
Expression: (number % 3):UInt8, number:UInt64 (Before GroupBy)
Filter: ((number > 10) AND (number < 990))
ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 1000, read_bytes: 8000] |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Optimizer 和 Plan
对于一个查询,特别是复杂的查询,可以使用不同的计划组合、顺序和结构来获取数据。每种不同的方式都会得到不同的处理时间。因此,我们需要在最短的时间内找到一个合理的计划组合方式,这就是优化器所做的事情。
Interpreter 和 Processor
解释器将优化的计划构建为可执行的数据流。我们通过拉取流中的数据来拉取 SQL 的结果。SQL 中每个算子的计算逻辑对应一个处理器,例如 FilterPlan -> FilterProcessor, ProjectionPlan -> ProjectionProcessor
分布式查询
在集群模式下,我们可能需要处理一些与单机模式不同的问题。
- 在分布式模式下,要查询的表总是分布在不同的节点上
- 对于某些场景,分布式处理总是高效的,例如带键的 GROUP BY、JOIN
- 对于某些场景,我们无法进行分布式处理,例如 LIMIT、不带键的 GROUP BY
- 为了确保快速计算,我们需要协调计算和数据的位置。
让我们看看普通查询如何在数据库集群上运行。
' +------+ +------------+ +------------------+
' | | AST | | Plan | Optimizer |
' SQL--->|Parser+------>|Plan Builder+----->| |
' | | | | | ScatterOptimizer |
' +------+ +------------+ +--------+---------+
' |
' +--------------+ |
' | | |
' +--+ FlightStream | <------+ | Plan
' | | | | |
' | +--------------+ | |
' | | |
' | | |
' | | Flight RPC v
' +----------+ Processor | +--------------+ | +----------------+
' | | RemoteProcessor | | | | do_action | Interpreter |
' Data<--+DataStream|<----------------+--+ FlightStream | <------+------------------+ |
' | | | | | | | PlanRescheduler|
' +----------+ | +--------------+ | +----------------+
' | |
' | |
' | |
' | |
' | +--------------+ |
' | | | |
' +--+ FlightStream | <------+
' | |
' +--------------+
ScatterOptimizer 和 StagePlan
在 Databend 中,我们使用 ScatterOptimizer 来决定查询的分布式计算。换句话说,分布式查询是对单机查询的优化。
在 ScatterOptimizer 中,我们遍历查询的所有计划,并重写感兴趣的计划 (rewrite as StagePlan { kind:StageKind, input:Self })
,其中 input 是重写的计划,kind 是一个枚举 (Normal: 数据再次 shuffle, Expansive: 数据从一个节点扩散到多个节点, Convergent: 数据从多个节点聚合到一个节点)
PlanScheduler 和 RemoteProcessor
在集群模式下,我们提取 ScatterOptimizer 优化的计划中的所有 StagePlan,并根据 kind 将它们发送到集群中的相应节点。
例如:
EXPLAIN SELECT argMin(user, salary) FROM (SELECT sum(number) AS salary, number%3 AS user FROM numbers_local(1000000000) GROUP BY user);
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| explain |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection: argMin(user, salary):UInt64 <-- execute in local node
AggregatorFinal: groupBy=[[]], aggr=[[argMin(user, salary)]]
RedistributeStage[expr: 0] <-- execute in all nodes of the cluster
AggregatorPartial: groupBy=[[]], aggr=[[argMin(user, salary)]]
Projection: sum(number) as salary:UInt64, (number % 3) as user:UInt8
AggregatorFinal: groupBy=[[(number % 3)]], aggr=[[sum(number)]]
RedistributeStage[expr: sipHash(_group_by_key)] <-- execute in all nodes of the cluster
AggregatorPartial: groupBy=[[(number % 3)]], aggr=[[sum(number)]]
Expression: (number % 3):UInt8, number:UInt64 (Before GroupBy)
RedistributeStage[expr: blockNumber()] <-- execute in local node
ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 1000000000, read_bytes: 8000000000] |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
### Flight API DataStream
我们需要以某种方式获取发送到其他节点执行的计划的结果。FuseData 使用第三方库 arrow-flight。更多信息:[https://github.com/apache/arrow-rs/tree/master/arrow-flight]