听说它是层级数据的处理“神器”!

递归CTE是Greenplum中一个非常强大的功能,它使得Greenplum具有了处理层级数据和图数据的能力。那么,什么是递归CTE呢?递归CTE的名字中虽然包含CTE,但它们的功能,用法和实现都有很大的不同。

什么是递归CTE

CTE的全称是Common Table Expression,也被称为With语句。CTE更加侧重于SQL的易读性,可以将其看作是一种生命周期更短的临时表,在父查询中可以像引用临时表一样,引用CTE。

递归CTE则使SQL增加了递归的语义,可以基于一个初始集合和变换的规则,不断递归扩大初始集合,直到满足设定的条件位置。递归CTE非常适合处理层级关系的数据,比如行政关系省、市、区和街道是典型的嵌套关系。基于初始集合省份名称,寻找出该省所有的市、区和街道名称的集合就可以使用递归CTE方便的实现。另一个例子是基于公司层级的组织架构,如图1所示,如何查询总监王爽手下的所有员工。

图1 公司组织架构

在上面的例子中,由于公司的层级关系使用关系模型表达,从数据表中只能直接查询到直接上下级关系。如何计算间接的上下级关系,并把所有数据进行汇总,就需要使用到递归CTE。

首先,介绍一下如何声明递归CTE,递归CTE的典型模版如下所示:

WITH RECURSIVE <名称> (列) AS(
 初始非递归查询
UNION [ALL]
 递归查询
)父查询

从上述模版可以看出,递归CTE通常由两部分组成:非递归查询项和递归查询项。递归CTE连接递归查询和非递归查询有两种方式:UNION和UNION ALL。UNION ALL会将非递归项和所有递归项的结果求并,UNION会在UNION ALL结果求并的基础上,进行去重工作。

回到计算总监王爽手下的员工总数的例子,初始非递归查询可以是查询王爽自己。递归查询项是查询当前结果的所有下属员工。SQL语句如下所示:

WITH RECURSIVE myreport AS
(
 SELECT * FROM department WHERE name = '王爽' 
 UNION ALL
 SELECT d.* FROM department AS d, myreport AS md 
 WHERE d.parent_department = md.id
)
SELECT * FROM myreport;

Greenplum中的实现

前面介绍了递归CTE的概念和例子,接下来介绍一下递归CTE在Greenplum中的实现。

递归CTE的总体流程如下:

  1. 初始化WorkTable,其包含的元组对应递归CTE语句UNION[ALL]前面的非递归查询的结果。
  2. 如果WorkTable为空,则说明递归CTE执行完毕,退出。
  3. 基于WorkTable,执行UNION[ALL]后面的递归查询,并将UNION[ALL]的结果写入中间临时表。对于非UNION ALL的UNION,需要对递归查询的结果进行去重工作。
  4. 将中间临时表的数据拷贝到WorkTable,同时清空中间临时表。
  5. 返回第二步。

具体实现上,Greenplum使用WorkTableScan和RecursiveUnion两个节点完成上述递归CTE的工作。其中,WorkTableScan节点负责对递归CTE表的扫描,随着迭代的进行,每次迭代的WorkTable都会由上轮迭代产生的新元组组成。RecursiveUnion节点负责实现递归CTE的迭代逻辑。下面具体介绍一下WorkTableScan和RecursiveUnion两个节点的主要实现。

WorkTableScan节点的实现相对简单,核心是调用函数ExecWorkTableScan读取下一条元组,其步骤如下:

1. 第一次扫描时初始化RecursiveUnionState。

if (node->rustate == NULL)
{
 WorkTableScan *plan = (WorkTableScan *) node->ss.ps.plan;
 EState     *estate = node->ss.ps.state;
 ParamExecData *param;
 param = &(estate->es_param_exec_vals[plan->wtParam]);
 node->rustate = (RecursiveUnionState *) DatumGetPointer(param->value);
}

2. 之后,基于ExecScan接口读取每一条元组。 由于元组存储在tuplestore中,因此WorkTableScan实际上是调用tuplestore_gettupleslot获取元组。

tuplestorestate = node->rustate->working_table;
slot = node->ss.ss_ScanTupleSlot;
(void) tuplestore_gettupleslot(tuplestorestate, true, false, slot);

RecursiveUnion节点的执行逻辑实现在函数ExecRecursiveUnion中,其步骤如下:

1. 执行非递归项。 标记位recursing

if (!node->recursing)
{
 for (;;)
 {
 slot = ExecProcNode(outerPlan);
 if (TupIsNull(slot))
 break;
 if (plan->numCols > 0)
 {
 LookupTupleHashEntry(node->hashtable, slot, &isnew);
 MemoryContextReset(node->tempContext);
 if (!isnew)
 continue;
 }
 tuplestore_puttupleslot(node->working_table, slot);
 return slot;
 }
 node->recursing = true;
}

2. 执行递归项。 需要迭代执行下列步骤,直到满足退出条件,即递归查询不再产生新的元组为止。迭代步骤如下:

a. 基于当前WorkTable,执行递归查询获取元组

slot = ExecProcNode(innerPlan);

b. 如果元组为空,说明当前WorkTable扫描完毕。如果中间临时表为空,说明当前WorkTable没有产生新的元组,递归CTE执行完毕,退出迭代过程。

if (TupIsNull(slot))
{
 if (node->intermediate_empty)
 Break;
}

c. 刷新WorkTabe。如果元组为空,但中间临时表不为空,说明当前WorkTable产生了新的元组,递归CTE可以继续迭代下去。我们需要将存储新元组的中间临时表拷贝到WorkTable中,以便在下次迭代中使用新的WorkTable。Greenplum直接使用指针拷贝,来降低WorkTable转换的代价。转换后需要将中间临时表清空。

if (TupIsNull(slot))
{
 tuplestore_end(node->working_table);
 node->working_table = node->intermediate_table;
 node->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
 node->intermediate_empty = true;
 innerPlan->chgParam = bms_add_member(innerPlan->chgParam, plan->wtParam);
 continue;
}

d. 对于UNION类型的递归CTE,需要基于哈希表进行去重操作。如果哈希表已经存在当前元组,直接跳过插入中间临时表的步骤。

if (plan->numCols > 0){LookupTupleHashEntry(node->hashtable, slot, &isnew);  MemoryContextReset(node->tempContext);  if (!isnew)  continue;}

e. 将元组存入中间临时表,并返回元组给上层节点。

node->intermediate_empty = false;
tuplestore_puttupleslot(node->intermediate_table, slot);
return slot;

以上就是对递归CTE的介绍,感兴趣的同学可以尝试使用递归CTE解决各种复杂的层次数据问题。

分享本博文:

2020 Greenplum峰会

点击了解更多信息

《Data Warehousing with Greenplum》

Greenplum官方书籍《Data Warehousing with Greenplum》。阅读它,以了解如何充分利用Greenplum的功能。

关注微信公众号

Greenplum中文社区

Greenplum官方微信群

扫码加入我们的技术讨论,请备注“网站”