分布式查询与数据混排
概述
分布式查询是分布式数据库的必要功能。 本文档旨在解释分布式查询及其数据流设计。
本地查询
让我们看看在单个数据库节点上正常查询的运行方式。
' +------+ +------------+ +---------+
' | | AST | | Plan | |
' SQL--->|Parser+------>|Plan Builder+----->|Optimizer|
' | | | | | |
' +------+ +------------+ +---+-----+
' | Plan
' v
' +----------+ +-----------+
' | | Processor | |
' Data <------+DataStream|<-----------+Interpreter|
' | | | |
' +----------+ +-----------+
解析器和抽象语法树(AST)
Databend 使用第三方 SQL 解析器及其抽象语法树(AST)。 更多信息,请参见:https://github.com/ballista-compute/sqlparser-rs
PlanBuilder 和计划(Plan)
查询计划(或查询执行计划)是用于在 Databend 中访问数据的步骤序列。它由 PlanBuilder 从 AST 构建。我们也使用树来描述它(类似于 AST)。但它与 AST 有一些不同:
- 计划是可序列化和反序列化的。
- 计划在语法上是安全的,我们不必担心它。
- 计划用于描述计算和数据依赖关系,与语法优先级无关
- 我们可以使用
EXPLAIN SELECT ...
显示它
EXPLAIN SELECT number % 3 AS key, SUM(number) AS value FROM numbers(1000) WHERE number > 10 AND number < 990 GROUP BY key ORDER BY key ASC LIMIT 10;

| explain |

| Limit: 10
Projection: (number % 3) as key:UInt8, SUM(number) as value:UInt64
Sort: (number % 3):UInt8,
AggregatorFinal: groupBy=[[(number % 3)]], aggr=[[SUM(number)]]
AggregatorPartial: groupBy=[[(number % 3)]], aggr=[[SUM(number)]]
Expression: (number % 3):UInt8, number:UInt64 (Before GroupBy)
Filter: ((number > 10) AND (number < 990))
ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 1000, read_bytes: 8000] |

优化器和计划(Plan)
对于一个查询,尤其是复杂的查询,可 以使用不同的计划组合、顺序和结构来获取数据。每种不同的方式都会得到不同的处理时间。因此,我们需要在最短的时间内找到一种合理的计划组合方式,这就是优化器所做的。
解释器和处理器(Processor)
解释器将优化后的计划构建成可执行的数据流。我们通过拉取流中的数据来获取 SQL 的结果。SQL 中每个操作符的计算逻辑对应一个处理器,例如 FilterPlan -> FilterProcessor, ProjectionPlan -> ProjectionProcessor
分布式查询
在集群模式下,我们可能需要处理一些与单机模式不同的问题。
- 在分布式模式下,要查询的表总是分布在不同的节点上
- 对于某些场景,分布式处理总是高效的,例如带有键的 GROUP BY,JOIN
- 对于某些场景,我们无法进行分布式处理,例如 LIMIT,不带键的 GROUP BY
- 为了确保快速计算,我们需要协调计算和数据的位置。
让我们看看在数据库集群上正常查询的运行方式。
' +------+ +------------+ +------------------+
' | | AST | | Plan | Optimizer |
' SQL--->|Parser+------>|Plan Builder+----->| |
' | | | | | ScatterOptimizer |
' +------+ +------------+ +--------+---------+
' |
' +--------------+ |
' | | |
' +--+ FlightStream | <------+ | Plan
' | | | | |
' | +--------------+ | |
' | | |
' | | |
' | | Flight RPC v
' +----------+ Processor | +--------------+ | +----------------+
' | | RemoteProcessor | | | | do_action | Interpreter |
' Data<--+DataStream|<----------------+--+ FlightStream | <------+------------------+ |
' | | | | | | | PlanRescheduler|
' +----------+ | +--------------+ | +----------------+
' | |
' | |
' | |
' | |
' | +--------------+ |
' | | | |
' +--+ FlightStream | <------+
' | |
' +--------------+
ScatterOptimizer 和 StagePlan
在 Databend 中,我们使用 ScatterOptimizer 来决定查询的分布式计算。 换句话说,分布式查询是单机查询的一种优化。
在 ScatterOptimizer 中,我们遍历查询的所有计划并重写感兴趣的计划 (重写为 StagePlan { kind:StageKind, input:Self })
,其中 input 是重写后的计划,kind 是一个枚举(Normal: 数据再次混排,Expansive: 数据从一个节点扩散到多个节点,Convergent: 数据从多个节点聚合到一个节点)
PlanScheduler 和 RemoteProcessor
在集群模式下,我们提取 ScatterOptimizer 优化后的计划中的所有 StagePlans,并根据 kind 将它们发送到集群中的相应节点。
例如:
EXPLAIN SELECT argMin(user, salary) FROM (SELECT sum(number) AS salary, number%3 AS user FROM numbers_local(1000000000) GROUP BY user);

| explain |

| Projection: argMin(user, salary):UInt64 <-- 在本地节点执行
AggregatorFinal: groupBy=[[]], aggr=[[argMin(user, salary)]]
RedistributeStage[expr: 0] <-- 在集群的所有节点执行
AggregatorPartial: groupBy=[[]], aggr=[[argMin(user, salary)]]
Projection: sum(number) as salary:UInt64, (number % 3) as user:UInt8
AggregatorFinal: groupBy=[[(number % 3)]], aggr=[[sum(number)]]
RedistributeStage[expr: sipHash(_group_by_key)] <-- 在集群的所有节点执行
AggregatorPartial: groupBy=[[(number % 3)]], aggr=[[sum(number)]]
Expression: (number % 3):UInt8, number:UInt64 (Before GroupBy)
RedistributeStage[expr: blockNumber()] <-- 在本地节点执行
ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 1000000000, read_bytes: 8000000000] |

Flight API DataStream
我们需要以某种方式获取发送给其他节点执行的计划的结果。FuseData 使用了第三方库 arrow-flight。更多信息:[https://github.com/apache/arrow-rs/tree/master/arrow-flight]