Greenplum-执行SQL创建Slice&Gang

相关概念:

  •  Motion:除了常见的数据库操作(例如表扫描,联接等)之外,Greenplum数据库还有一种名为motion的算子。motion用于在segment之间移动元组。
  •  Slice:为了在查询执行期间实现最大的并行度,Greenplum将查询计划的工作划分为slices。slice是计划中可以独立进行处理的部分。查询计划会为motion生成slice,motion的每一侧都有一个slice。
  •  Gang:属于同一个slice但是运行在不同的segment上的进程,称为gang。

实验环境:

 greenplum版本:6, 

 集群环境:单master,没有standerby Master. 两个 primary segment,没有mirror。

实验日志:

c
test=# select * from test;
DEBUG1:  00000: Message type Q received by from libpq, len = 20
DEBUG3:  00000: StartTransactionCommand
DEBUG3:  00000: StartTransaction

LOG:  00000: statement: select * from test;
LOCATION:  exec_simple_query, postgres.c:1639
[OPT]: Using default search strategy

 Gather Motion 2:1  (slice1; segments: 2)  (cost=0.00..431.00 rows=1 width=8)
   ->  Seq Scan on test  (cost=0.00..431.00 rows=1 width=8)

DEBUG1:  00000: GPORCA produced plan
LOG:  00000: plan:
DETAIL:     {PLANNEDSTMT 
   :commandType 1 
   :planTree 
      {MOTION 
      :motionID 1 
      :motionType 1 
      :nMotionNodes 1 
      :nInitPlans 0 
      :lefttree 
         {SEQSCAN 
         :flow 
            {FLOW 
            :flotype 0 
            }
         :nMotionNodes 0 
         :nInitPlans 0 
         }
      }
   :rtable (
      {RTE 
      :eref 
         {ALIAS 
         :aliasname test 
         :colnames ("id" "age")
         }
      }
   )
   :utilityStmt <> 
   :subplans <> 
   }

Slice 1 on seg0
DEBUG1:  00000: Message type M received by from libpq, len = 457  (seg0 slice1 192.168.106.132:7000 pid=43071)
DEBUG3:  00000: StartTransactionCommand  (seg0 slice1 192.168.106.132:7000 pid=43071)
DEBUG3:  00000: StartTransaction  (seg0 slice1 192.168.106.132:7000 pid=43071)
DEBUG3:  00000: CommitTransactionCommand  (seg0 slice1 192.168.106.132:7000 pid=43071)
DEBUG3:  00000: CommitTransaction  (seg0 slice1 192.168.106.132:7000 pid=43071)

Slice 1 on seg1
DEBUG1:  00000: Message type M received by from libpq, len = 457  (seg1 slice1 192.168.106.133:7000 pid=43218)
DEBUG3:  00000: StartTransactionCommand  (seg1 slice1 192.168.106.133:7000 pid=43218)
DEBUG3:  00000: StartTransaction  (seg1 slice1 192.168.106.133:7000 pid=43218)
DEBUG3:  00000: CommitTransactionCommand  (seg1 slice1 192.168.106.133:7000 pid=43218)
DEBUG3:  00000: CommitTransaction  (seg1 slice1 192.168.106.133:7000 pid=43218)

master
DEBUG3:  00000: CommitTransactionCommand
DEBUG3:  00000: CommitTransaction

上面的日志,是经过整理的,去掉了一些无用的或这里不关心的信息。

上面是执行语句select * from test;的调试信息,从上面可以看到:

1、执行计划:

Gather Motion 2:1  (slice1; segments: 2)  (cost=0.00..431.00 rows=1 width=8)
	   		->  Seq Scan on test  (cost=0.00..431.00 rows=1 width=8)

 2、执行计划树:

从日志可以看到,执行计划日志和执行计划树的对应关系:Gather Motion–>MOTION,Seq Scan–>SEQSCAN。

 3、Slice0:根slice,在master上跑。

GangType::GANGTYPE_UNALLOCATED,       /* a root slice executed by the qDisp */,slice根节点(master)上的slice类型。

 4、Slice1:在seg1,seg2上跑,具体见上面日志。

Gather Motion 2:1  (slice1; segments: 2)  (cost=0.00..431.00 rows=1 width=8)

代码分析:

c
void
PostgresMain(int argc, char *argv[], const char *dbname, const char *username)
{
	......
    for (;;)
    {
 		......
        switch (firstchar)
        {
            case 'Q':           /* simple query */
                {
                    ......
                    else
                        exec_simple_query(query_string);
                    send_ready_for_query = true;
                }
                break;
         }
     }
}

 

c
日志截取:
DEBUG1:  00000: Message type Q received by from libpq, len = 20
......
LOCATION:  exec_simple_query, postgres.c:1639
  • 从日志和逻辑可以看到,master执行SQL的入口点就在这里,当第一个字符是‘Q’时,表示要执行语句。而执行SQL字符串的入口函数,就是exec_simple_query。
  • 这里先忽略SQL的解析,计划制定,直接到初始化slice,和后面的步骤,这里重点讨论并行计划中的slice和gang。

Slice相关结构:

c
 * Slice 0 is the root slice of plan as a whole.
 * Slices 1 through nMotion are motion slices with a sending motion at
 *  the root of the slice.
 * Slices nMotion+1 and on are root slices of initPlans.
typedef struct SliceTable
{
    NodeTag     type;

    int         nMotions;       /* The number Motion nodes in the entire plan */
    int         nInitPlans;     /* The number of initplan slices allocated */
    int         localSlice;     /* Index of the slice to execute. */
    List       *slices;         /* List of slices */
    int         instrument_options; /* OR of InstrumentOption flags */
    uint32      ic_instance_id;
} SliceTable;

由注释可以看到,Slice分为三类:

  •  根slice,在master上,id为0
  •  Motion的slice
  •  initPlans的slice

 Segment信息表:

c
template1=# select * from gp_segment_configuration;
 dbid | content | role | preferred_role | mode | status | port | hostname | address |        datadir        
------+---------+------+----------------+------+--------+------+----------+---------+-----------------------
    1 |      -1 | p    | p              | n    | u      | 5432 | mdw      | mdw     | /data/master/gpseg-1
    2 |       0 | p    | p              | n    | u      | 7000 | sdw1     | sdw1    | /data1/primary/gpseg0
    3 |       1 | p    | p              | n    | u      | 7000 | sdw2     | sdw2    | /data1/primary/gpseg1
(3 rows)

由这张表可以看到,master的信息中content为-1,这也与代码中的gp_segment_config.h中的MASTER_CONTENT_ID相对应,实际上,Master的节点,会做GpIdentity.segindex = MASTER_CONTENT_ID的初始化,所以,只要segindex是-1,我们可以认为当前在master节点上。

Slice创建调用栈:

c
void
InitSliceTable(EState *estate, int nMotions, int nSubplans)
{
    SliceTable *table;
    Slice      *slice;
    int         i,
                n;
    MemoryContext oldcontext;
    n = 1 + nMotions + nSubplans;
  
    table = makeNode(SliceTable);
    table->nMotions = nMotions;
	table->nInitPlans = nSubplans;
	......
    for (i = 0; i < n; i++)
    {
        slice = makeNode(Slice);
        slice->sliceIndex = i;
         ......
        slice->gangType = GANGTYPE_UNALLOCATED;
        ......
        table->slices = lappend(table->slices, slice);
    }
	estate->es_sliceTable = table;
	......
}

由日志可以看到:      

c
:nMotionNodes 1 
:nInitPlans 0 

所以,InitSliceTable创建了两个slice,index分别为0,1,实际上,对应这slice0和slice1。

PlanStmt相关Log信息:

c
DETAIL:     {PLANNEDSTMT 
   ......
   :intoClause <> 
   :copyIntoClause <> 
   :refreshClause <> 
   ......
}

Slice初始化调用栈:

c
static void
FillSliceTable(EState *estate, PlannedStmt *stmt)
{
    FillSliceTable_cxt cxt;
    SliceTable *sliceTable = estate->es_sliceTable;

    if (!sliceTable)
        return;
    cxt.prefix.node = (Node *) stmt;
    cxt.estate = estate;
    cxt.currentSliceId = 0;
    if (stmt->intoClause != NULL || stmt->copyIntoClause != NULL || stmt->refreshClause)
	{
    	......
    }
    /*
     * NOTE: We depend on plan_tree_walker() to recurse into subplans of
     * SubPlan nodes.
     */
    FillSliceTable_walker((Node *) stmt->planTree, &cxt);
}

通过上面的日志信息可以看到:if (stmt->intoClause != NULL || stmt->copyIntoClause != NULL || stmt->refreshClause) 这个条件不满足,我们重点看下面的函数

c
/* ----------------
typedef struct ModifyTable
{
    ......
CmdType     operation;      /* INSERT, UPDATE, or DELETE */
......
} ModifyTable;
由注释可以看到,这个结构体,代表修改表的操作。

static bool FillSliceTable_walker(Node *node, void *context)
{
    if (IsA(node, ModifyTable))
	{
    	......
    }
    /* A DML node is the same as a ModifyTable node, in ORCA plans. */
    if (IsA(node, DML))
    {
 	     ......
    }
    if (IsA(node, Motion))
	{
    	......
        /* Top node of subplan should have a Flow node. */
        Insist(motion->plan.lefttree && motion->plan.lefttree->flow);
        sendFlow = motion->plan.lefttree->flow;
        /* Look up the sending gang's slice table entry. */
        sendSlice = (Slice *) list_nth(sliceTable->slices, motion->motionID);
        /* Look up the receiving (parent) gang's slice table entry. */
        recvSlice = (Slice *)list_nth(sliceTable->slices, parentSliceIndex);
        /* Sending slice become a children of recv slice */
        recvSlice->children = lappend_int(recvSlice->children, sendSlice->sliceIndex);
        sendSlice->parentIndex = parentSliceIndex;
        sendSlice->rootIndex = recvSlice->rootIndex;
        /* The gang beneath a Motion will be a reader. */
        sendSlice->gangType = GANGTYPE_PRIMARY_READER;

        if (sendFlow->flotype != FLOW_SINGLETON) //日志信息(:flotype 0 ),FLOW_SINGLETON为1,所以走入这个分支。
        {
            sendSlice->gangType = GANGTYPE_PRIMARY_READER;
            /*
             * If the PLAN is generated by ORCA, We assume that they
             * distpatch on all segments.
             */
            if (stmt->planGen == PLANGEN_PLANNER),日志信息(:planGen 1 ),PLANGEN_PLANNER为0,所以走下面的分支。
                FillSliceGangInfo(sendSlice, sendFlow->numsegments);
            else
                FillSliceGangInfo(sendSlice, getgpsegmentCount());
        }
        else
        {
             ......
        }
        ......
        /* recurse into children */
        cxt->currentSliceId = motion->motionID;
        result = plan_tree_walker(node, FillSliceTable_walker, cxt);
        cxt->currentSliceId = parentSliceIndex;
        return result;
    }
    if (IsA(node, SubPlan))
	{
      	......
    }
    return plan_tree_walker(node, FillSliceTable_walker, cxt);
}

逻辑相关结构体:

c
typedef enum FlowType
{
    FLOW_UNDEFINED,     /* used prior to calculation of type of derived flow */
    FLOW_SINGLETON,     /* flow has single stream */
    FLOW_REPLICATED,    /* flow is replicated across IOPs */
    FLOW_PARTITIONED,   /* flow is partitioned across IOPs */
} FlowType;

typedef enum PlanGenerator
{
    PLANGEN_PLANNER,            /* plan produced by the planner*/
    PLANGEN_OPTIMIZER,          /* plan produced by the optimizer*/
} PlanGenerator;

逻辑相关的日志信息:

c
DETAIL:     {PLANNEDSTMT 
   :commandType 1 
   :planGen 1 
   :planTree 
      {MOTION 
      :motionID 1 
      :nMotionNodes 1 
      :nInitPlans 0 
      :lefttree 
         {SEQSCAN......}
         :flow 
         {FLOW 
            :flotype 0 
            :req_move 0 
            :locustype 0 
            :segindex 0 
            :numsegments 1 
            :hashExprs <> 
            :hashOpfamilies <> 
            :flow_before_req_move <>
         }
      }
   :rtable (
      {RTE 
      :eref 
         {ALIAS 
         :aliasname test 
         :colnames ("id" "age")
         }
      }
   )
   :utilityStmt <> 
   :subplans <> 
   }

FillSliceTable_walker有四个分支:

  •  if (IsA(node, ModifyTable))
  • if (IsA(node, DML))
  • if (IsA(node, Motion))
  • if (IsA(node, SubPlan))

FillSliceTable_walker调用逻辑:

c
FillSliceTable_walker((Node *) stmt->planTree, &cxt);

 可以看到,smpt对应着Log的PLANNEDSTMT

   关键字,而日志里面,planTree是一个Motion(MOTION)。

 所以流程会走Motion对应的分支。

第三个分支:

  •  FillSliceTable_walker被FillSliceTable调用,在FillSliceTable中cxt.currentSliceId= 0; 所以这里FillSliceTable_walker中,int parentSliceIndex = cxt->currentSliceId;,parentSliceIndex 为0。
  • 由日志可以看到,motion->motionID是1(:motionID 1)。所以sendSlice是日志里的slice1,recvSlice是slice0。

这个函数做了四件事:

  • 设置recvSlice为slice0,把slice1(sendSlice)设置为子slice0的子节点。
  • 设置slice1类型为GANGTYPE_PRIMARY_READER。
  • 设置把slice1发送到所有的segment。
  • 对slice1调用plan_tree_walker,待整理。

上下文变量初始化:

c
static int
BackendStartup(Port *port)
{

    pid = fork_process();
    if (pid == 0)               /* child */
	{
		......
        MyProcPid = getpid();   /* reset MyProcPid */
        ......
	}
}

由此可见,MyProcPID代表当前被fork的子进程。

创建Gang调用栈:

代码:

c
void
AssignGangs(CdbDispatcherState *ds, QueryDesc *queryDesc)
{
    ......
    InventorySliceTree(ds, sliceTable->slices, rootIdx);
}

这个函数调用InventorySliceTree实现功能。

`c
void
InventorySliceTree(CdbDispatcherState *ds, List *slices, int sliceIndex)
{
    ListCell *cell;
    int childIndex;
    Slice *slice = list_nth(slices, sliceIndex);

    if (slice->gangType == GANGTYPE_UNALLOCATED)
    {
        slice->primaryGang = NULL;
        slice->primaryProcesses = getCdbProcessesForQD(true);
    }
    else
    {
        Assert(slice->segments != NIL);
        slice->primaryGang = AllocateGang(ds, slice->gangType, slice->segments);
        setupCdbProcessList(slice);
    }
    foreach(cell, slice->children)
    {
        childIndex = lfirst_int(cell);
        InventorySliceTree(ds, slices, childIndex);
    }
}

 由前面的分析可以知道,slice0的gangType为GANGTYPE_UNALLOCATED,所以,slice0的primaryProcesses被设置。而slice1走else的逻辑。最后,递归执行InventorySliceTree。我们的场景只有两个slice,所以不会进foreach的逻辑。

slice0的处理:

c
/*
 * getCdbProcessForQD:  Manufacture a CdbProcess representing the QD,
 * as if it were a worker from the executor factory.
 *
 * NOTE: Does not support multiple (mirrored) QDs.
 */
List *
getCdbProcessesForQD(int isPrimary)
{
    CdbComponentDatabaseInfo *qdinfo;
	CdbProcess *proc;

    Assert(Gp_role == GP_ROLE_DISPATCH);
    qdinfo = cdbcomponent_getComponentInfo(MASTER_CONTENT_ID);
    proc = makeNode(CdbProcess);

	......
	proc->pid = MyProcPid;
	......
    list = lappend(list, proc);
    return list;
}

 由上面的实现可以看到,primaryProcesses被设置为当前的进程。可以看到,这个函数,是为master分配slice的执行进程的,实际上就是当前的dispatch进程。(关于Gp_role的描述,可以看我的另一篇文章:[greenplum-QD&QE启动流程](https://blog.csdn.net/dusx1981/article/details/112980031))所以,slice0上没有被分配gang。

slice1的处理:

c
/*
 * Creates a new gang by logging on a session to each segDB involved.
 *
 * elog ERROR or return a non-NULL gang.
 */
Gang *
AllocateGang(CdbDispatcherState *ds, GangType type, List *segments)
{
    MemoryContext   oldContext;
    SegmentType     segmentType;
    Gang            *newGang = NULL;
	int             i;
	......
    if (Gp_role != GP_ROLE_DISPATCH)
    {
        elog(FATAL, "dispatch process called with role %d", Gp_role);
	}

    if (type == GANGTYPE_PRIMARY_WRITER)
        segmentType = SEGMENTTYPE_EXPLICT_WRITER;
    /* for extended query like cursor, must specify a reader */
    else if (ds->isExtendedQuery)
        segmentType = SEGMENTTYPE_EXPLICT_READER;
    else
        segmentType = SEGMENTTYPE_ANY;
    
	......
    newGang = cdbgang_createGang(segments, segmentType);
    newGang->allocated = true;
    newGang->type = type;
    /*
     * Push to the head of the allocated list, later in
     * cdbdisp_destroyDispatcherState() we should recycle them from the head to
     * restore the original order of the idle gangs.
     */
    ds->allocatedGangs = lcons(newGang, ds->allocatedGangs);
	ds->largestGangSize = Max(ds->largestGangSize, newGang->size);

    if (type == GANGTYPE_PRIMARY_WRITER)
    {
        /*
         * set "whoami" for utility statement. non-utility statement will
         * overwrite it in function getCdbProcessList.
         */
        for (i = 0; i < newGang->size; i++)
            cdbconn_setQEIdentifier(newGang->db_descriptors[i], -1);
    }
    return newGang;
}

由前面的逻辑知道,slice1的类型为GANGTYPE_PRIMARY_READER。这里,segmentType为SEGMENTTYPE_ANY。

创建libpq连接调用链:

代码:

c
Gang *
cdbgang_createGang_async(List *segments, SegmentType segmentType)
{
    Gang    *newGangDefinition;

    newGangDefinition = NULL;
	/* allocate and initialize a gang structure */
	......
    newGangDefinition = buildGangDefinition(segments, segmentType);
    CurrentGangCreating = newGangDefinition;
	totalSegs = getgpsegmentCount();
	size = list_length(segments);
	......
    PG_TRY();
    {
        for (i = 0; i < size; i++)
        {
             ......
            segdbDesc = newGangDefinition->db_descriptors[i];

            ret = build_gpqeid_param(gpqeid, sizeof(gpqeid),
                                     segdbDesc->isWriter,
                                     segdbDesc->identifier,
                                     segdbDesc->segment_database_info->hostSegs,
                                     totalSegs * 2);
             ......
            cdbconn_doConnectStart(segdbDesc, gpqeid, options);
  
            pollingStatus[i] = PGRES_POLLING_WRITING;
        }

        for (;;)
        {......}
 	 ......
     return newGangDefinition;
}

这里我们省略了网络连接,交互的细节,重点看Gang相关的东西:

  •  buildGangDefinition,为每个Gang里面的segment,创建一个SegmentDatabaseDescriptor,可以理解为一个代表Segment Database的对象。
  • build_gpqeid_param,生成gpqeid,这会让segment初始化QE角色。
  • cdbconn_doConnectStart,用这个函数,连接每个SegmentDatabaseDescriptor代表的数据库,这里就是每个segment上的数据库,从前面的分析可以知道,当前的场景,是连接所有的segment。而每个连接,会对应生成一个QE的进程,QE的初始化流程,见:[greenplum-QD&QE启动流程](https://blog.csdn.net/dusx1981/article/details/112980031)

由此,我们可以得到当前实验场景下的网络拓扑:

所以,可以把slice看成管理Gang的数据结构,而Gang是管理分布式进程工作的数据结构。

作者简介


杜士雄,资深研发,架构师。
曾就职于百度,搜狗等厂。最近一段时间,开始学习探索Greenplum的源码,希望能把学习进行到底。

关注微信公众号

VMware 中国研发中心

Greenplum官方技术交流群

扫码添加小助手即可入群,添加时请备注 “GP网站”