Writing to Greenplum using DBWriter¶
For writing data to Greenplum, use DBWriter with GreenplumWriteOptions.
Warning
Please take into account Greenplum types.
Warning
It is always recommended to create table explicitly using Greenplum.execute instead of relying on Spark's table DDL generation.
This is because Spark's DDL generator can create columns with different types than it is expected.
Examples¶
from onetl.connection import Greenplum
from onetl.db import DBWriter
greenplum = Greenplum(...)
df = ... # data is here
writer = DBWriter(
connection=greenplum,
target="schema.table",
options=Greenplum.WriteOptions(
if_exists="append",
# by default distribution is random
distributedBy="id",
# partitionBy is not supported
),
)
writer.run(df)
Interaction schema¶
High-level schema is described in Greenplum prerequisites. You can find detailed interaction schema below.
Spark <-> Greenplum interaction during DBWriter.run()
---
title: Greenplum master <-> Spark driver
---
sequenceDiagram
box Spark
participant A as Spark driver
participant B as Spark executor1
participant C as Spark executor2
participant D as Spark executorN
end
box Greenplum
participant E as Greenplum master
participant F as Greenplum segment1
participant G as Greenplum segment2
participant H as Greenplum segmentN
end
note over A,H: == Greenplum.check() ==
A ->> E: CONNECT
activate E
activate A
A -->> E : CHECK IF TABLE EXISTS gp_table
E -->> A : TABLE EXISTS
A ->> E : SHOW SCHEMA FOR gp_table
E -->> A : (id bigint, col1 int, col2 text, ...)
note over A,H: == DBReader.run() ==
A ->> B: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 1
activate B
A ->> C: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 2
activate C
A ->> D: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION N
activate D
note right of A : This is done in parallel,<br/>executors are independent<br/>|<br/>|<br/>|<br/>V
B ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor1 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor1_host:executor1_port<br/>INSERT INTO EXTERNAL TABLE spark_executor1 FROM gp_table WHERE gp_segment_id = 1
activate E
note right of E : Each white vertical line here is a opened connection to master.<br/>Usually, **N+1** connections are created from Spark to Greenplum master
E -->> F: SELECT DATA FROM gp_table_data_on_segment1 TO spark_executor1
activate F
note right of F : No direct requests between Greenplum segments & Spark.<br/>Data transfer is always initiated by Greenplum segments.
C ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor2 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor2_host:executor2_port<br/>INSERT INTO EXTERNAL TABLE spark_executor2 FROM gp_table WHERE gp_segment_id = 2
activate E
E -->> G: SELECT DATA FROM gp_table_data_on_segment2 TO spark_executor2
activate G
D ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executorN (id bigint, col1 int, col2 text, ...)<br/>USING address=executorN_host:executorN_port<br/>INSERT INTO EXTERNAL TABLE spark_executorN FROM gp_table WHERE gp_segment_id = N
activate E
E -->> H: SELECT DATA FROM gp_table_data_on_segmentN TO spark_executorN
activate H
F -xB: INITIALIZE CONNECTION TO Spark executor1<br/>PUSH DATA TO Spark executor1
deactivate F
note left of B : Circle is an open GPFDIST port,<br/>listened by executor
G -xC: INITIALIZE CONNECTION TO Spark executor2<br/>PUSH DATA TO Spark executor2
deactivate G
H -xD: INITIALIZE CONNECTION TO Spark executorN<br/>PUSH DATA TO Spark executorN
deactivate H
note over A,H: == Spark.stop() ==
B -->> E : DROP TABLE spark_executor1
deactivate E
C -->> E : DROP TABLE spark_executor2
deactivate E
D -->> E : DROP TABLE spark_executorN
deactivate E
B -->> A: DONE
deactivate B
C -->> A: DONE
deactivate C
D -->> A: DONE
deactivate D
A -->> E: CLOSE CONNECTION
deactivate E
deactivate A
Options¶
GreenplumWriteOptions
¶
Bases: GenericOptions
VMware's Greenplum Spark connector writing options.
Warning
Some options, like url, dbtable, server.*, pool.*, etc
are populated from connection attributes, and cannot be overridden
by the user in WriteOptions to avoid issues.
Examples:
Note
You can pass any value
supported by connector,
even if it is not mentioned in this documentation. Option names should be in camelCase!
The set of supported options depends on connector version.
from onetl.connection import Greenplum
options = Greenplum.WriteOptions(
if_exists="append",
truncate="false",
distributedBy="mycolumn",
)
if_exists = Field(default=(GreenplumTableExistBehavior.APPEND), alias=(avoid_alias('mode')))
class-attribute
instance-attribute
¶
Behavior of writing data into existing table.
Possible values:
-
append(default) Adds new rows into existing table.Behavior in details
-
Table does not exist Table is created using options provided by user (
distributedByand others). -
Table exists Data is appended to a table. Table has the same DDL as before writing data.
Warning
This mode does not check whether table already contains rows from dataframe, so duplicated rows can be created.
Also Spark does not support passing custom options to insert statement, like
ON CONFLICT, so don't try to implement deduplication using unique indexes or constraints.Instead, write to staging table and perform deduplication using
executemethod.
-
-
replace_entire_tableTable is dropped and then created.Behavior in details
-
Table does not exist Table is created using options provided by user (
distributedByand others). -
Table exists Table content is replaced with dataframe content.
After writing completed, target table could either have the same DDL as before writing data (
truncate=True), or can be recreated (truncate=False).
-
-
ignoreIgnores the write operation if the table already exists.Behavior in details
-
Table does not exist Table is created using options provided by user (
distributedByand others). -
Table exists The write operation is ignored, and no data is written to the table.
-
-
errorRaises an error if the table already exists.Behavior in details
-
Table does not exist Table is created using options provided by user (
distributedByand others). -
Table exists An error is raised, and no data is written to the table.
-
Changed in 0.9.0
Renamed mode → if_exists