Greenplum外部数据写入工具Greenplum-Spark Connector介绍

1. 前序

Greenplum是一款优秀的mpp数据库产品,官方推荐了几种将外部数据写入Greenplum方式,包含:通用的Jdbc,gpload以及Pivotal Greenplum-Spark Connector等。

  • Jdbc:Jdbc方式,写大数据量会很慢。
  • gpload:适合写大数据量数据,能并行写入。但其缺点是需要安装客户端,包括gpfdist等依赖,安装起来很麻烦。需要了解可以参考gpload。
  • Greenplum-Spark Connector:基于Spark并行处理,并行写入Greenplum,并提供了并行读取的接口。也是接下来该文重点介绍的部分。

2. Greenplum-Spark Connector读数据架构

一个Spark application,是由Driver和Executor节点构成。当Spark application使用Greenplum-Spark Connector加载Greenplum数据时,其Driver端会通过JDBC的方式请求Greenplum的master节点获取相关的元数据信息。Connector将会根据这些元数据信息去决定Spark的Executor去怎样去并行的读取该表的数据。

Greenplum数据库存储数据是按segment组织的,Greenplum-Spark Connector在加载Greenplum数据时,需要指定Greenplum表的一个字段作为Spark的partition字段,Connector会使用这个字段的值来计算,该Greenplum表的某个segment该被哪一个或多个Spark partition读取。

其读取过程如下:

  1. Spark Driver通过Jdbc的方式连接Greenplum master,并读取指定表的相关元数据信息。然后根据指定的分区字段以及分区个数去决定segment怎么分配。
  2. Spark Executor端会通过Jdbc的方式连接Greenplum master,创建Greenplum外部表。
  3. 然后Spark Executor通过Http方式连接Greenplum的数据节点,获取指定的segment的数据。该获取数据的操作在Spark Executor并行执行。

3. Greenplum-Spark Connector写数据流程

  1. GSC在Spark Executor端通过Jetty启动一个Http服务,将该服务封装为支持Greenplum的gpfdist协议。
  2. GSC在Spark Executor端通过Jdbc方式连接Greenplum master,创建Greenplum外部表,该外部表文件地址指向该Executor所启动的gpfdist协议地址。SQL示例如下:
CREATE READABLE EXTERNAL TABLE
"public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")
LOCATION ('gpfdist://10.0.8.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')
FORMAT 'CSV'
(DELIMITER AS '|'
 NULL AS '')
ENCODING 'UTF-8'

3. GSC在Spark Executor端通过Jdbc方式连接Greenplum master,然后执行insert语句至真实的表中,数据来源于这张外部表。SQL示例如下:

INSERT INTO "public"."rank_a1"
SELECT *
FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"

至于这张外部表的数据,是否落地当前Executor服务器,不清楚。猜测不会落地,而是直接通过Http直接传递给了Greenplum对应的Segment。

4. GSC监听onApplicationEnd事件,在Spark application结束后,删除创建的外部表。

4. Greenplum-Spark Connector使用

  1. 下载GSC Jar包。 下载地址:Pivotal Network。 可直接下载最新版本的GSC即1.6.2,支持Greenplum5.0之后的版本。greenplum-spark_-.jar,如:
greenplum-spark_2.11-1.6.2.jar

2. maven中引入:

        <dependency>
            <groupId>io.pivotal.greenplum.spark</groupId>
            <artifactId>greenplum-spark_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>

3. Spark提交引入:

  • spark-shell或spark-submit时候,通过–jars加入greenplum-spark_2.11-1.6.2.jar。
  • 将greenplum-spark_2.11-1.6.2.jar与Spark application包打成 uber jar 提交。

5. Greenplum-Spark Connector参数

6. 从Greenplum读取数据

  1. DataFrameReader.load()方式:
val gscReadOptionMap = Map(
      "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
      "user" -> "bill",
      "password" -> "changeme",
      "dbschema" -> "myschema",
      "dbtable" -> "table1",
      "partitionColumn" -> "id"
)

val gpdf = spark.read.format("greenplum")
      .options(gscReadOptionMap)
      .load()

2. spark.read.greenplum()方式:

val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"
val tblname = "avgdelay"
val jprops = new Properties()
jprops.put("user", "user2")
jprops.put("password", "changeme")
jprops.put("partitionColumn", "airlineid")
val gpdf = spark.read.greenplum(url, tblname, jprops)

然而,这种方式必然需要引入一个隐式转换,官网也没介绍。

7. 写数据至Greenplum

7.1. 写数据示例:

val gscWriteOptionMap = Map(
      "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
      "user" -> "bill",
      "password" -> "changeme",
      "dbschema" -> "myschema",
      "dbtable" -> "table2",
)

dfToWrite.write.format("greenplum")
      .options(gscWriteOptionMap)
      .save()

在通过GSC写到Greenplum表时,如果表已经存在或表中已经存在数据,可通过DataFrameWriter.mode(SaveMode savemode)方式指定其输出模式。相关模式行为如下:

7.2. GSC自动建表:

  1. 创建的Greenplum表将不会有distribution列,如下为GSC生成的建表语句:
CREATE TABLE "public"."rank_a1" 
("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);

2. 创建的Greenplum表的字段名将会使用Spark DataFrame中的字段名。

3. 在GSC自动建表时,将会为字段名加上双引号,这将使Greenplum区分大小写。

4. 当Spark DataFrame的字段不为nullable时,GSC自动建表的字段将是 NOT NULL。

5. 将会对应的Spark DataFrame字段类型映射为Greenplum的字段类型。参考,字段类型映射表

7.3. 提前手动建表:

  1. 将Spark DataFrame的字段名的数据写至Greenplum表的对应的字段中。值得注意的是,GSC在做映射的时候,是严格区分大小写的。
  2. 写至Greenplum的字段的数据类型,与对应的Spark DataFrame一致,具体参见字段类型映射
  3. 如果Spark数据中某列包含空数据,需确保对应的Greenplum表的列没有被指定为NOT NULL。
  4. Greenplum表中建表时其字段顺序可以与Spark DataFrame中不一致。但Greenplum表中不能出现不存在在Spark DataFrame中的字段。如下例子:
// Greenplum 中的字段
CREATE TABLE public.rank_a1 (
    id int4 NOT NULL,
    "rank" text NULL,
    "year" int4 NOT NULL,
    gender int4 NOT NULL,
    count int4 NOT NULL
)
DISTRIBUTED BY (id);

// Spark DataFrame中的字段
var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")

// 在写数据至public.rank_a1表时,将会报错如下
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (5): _1, _2, _3, _4, _5
New column names (4): id, rank, year, gender
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435)
    at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44)
    at com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14)
    at com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)

5. 确保指定的用户对于该表有读写的权限,自动建表,需要有建表的权限。

8. Troubleshooting

8.1. 端口相关问题

8.2. Greenplum连接数问题

当连接Greenplum的连接数接近Greenplum数据库配置的最大连接数(max_connections)时。Spark application将会抛出 connection limit exceeded 错误。

排查过程:

  1. 查询Greenplum数据的最大连接数:
postgres=# show max_connections;
 max_connections
-----------------
 250
(1 row)

2. 查询当前连接Greenplum数据库的连接数:

postgres=# SELECT count(*) FROM pg_stat_activity;

3. 查询指定的用户连接Greenplum数据的连接数:

postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';
postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';

4. 查询Greenplum数据库空闲和活动的连接数:

postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<IDLE>';
postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<IDLE>';

5. 查询连接Greenplum数据库名,用户名,客户端地址,客户端ip,当前查询语句:

postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;

如果确认是Spark application使用连接数过多,则配置JDBC Connection Pooling相关参数,减少连接数。

8.3. Greenplum Database Data Length Errors

在使用Greenplum 4.x或5.x的时候,可能会报出“data line too long”错误。这是因为在Greenplum数据库中参数项“gp_max_csv_line_length”默认值是1M。需要登陆Greenplum master修改这个参数值,示例如下,通过gpconfig修改该参数的值为5M:

gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880
gpadmin@gpmaster$ gpstop -u

9. 类型映射表

9.1. Greenplum to Spark

9.2. Spark to Greenplum

10. 参考

  1. Greenplum-Spark Connector官方文档
  2. Greenplum建表语句文档
  3. Greenplum参数配置官方文档

本文作者

秣码一盏   

本人专注大数据、微服务领域,大数据平台建设,全网统一:秣码一盏,欢迎关注交流。

分享本博文:

2020 Greenplum峰会

点击了解更多信息

《Data Warehousing with Greenplum》

Greenplum官方书籍《Data Warehousing with Greenplum》。阅读它,以了解如何充分利用Greenplum的功能。

关注微信公众号

Greenplum中文社区

Greenplum官方微信群

扫码加入我们的技术讨论,请备注“网站”