Reading from Greenplum using DBReader¶
Data can be read from Greenplum to Spark using DBReader. It also supports strategy for incremental data reading.
Warning
Please take into account Greenplum types.
Note
Unlike JDBC connectors, Greenplum connector for Spark does not support
executing custom SQL queries using .sql method. Connector can be used to only read data from a table or view.
Supported DBReader features¶
- ✅︎
columns(see note below) - ✅︎
where(see note below) - ✅︎
hwm(see note below), supported strategies: - ❌
hint(is not supported by Greenplum) - ❌
df_schema - ✅︎
options(see Greenplum.ReadOptions)
Warning
In case of Greenplum connector, DBReader does not generate raw SELECT query. Instead it relies on Spark SQL syntax
which in some cases (using column projection and predicate pushdown) can be converted to Greenplum SQL.
So columns, where and hwm.expression should be specified in Spark SQL syntax,
not Greenplum SQL.
This is OK:
DBReader(
columns=[
"some_column",
# this cast is executed on Spark side
"CAST(another_column AS STRING)",
],
# this predicate is parsed by Spark, and can be pushed down to Greenplum
where="some_column LIKE 'val1%'",
)
This is will fail:
DBReader(
columns=[
"some_column",
# Spark does not have `text` type
"CAST(another_column AS text)",
],
# Spark does not support ~ syntax for regexp matching
where="some_column ~ 'val1.*'",
)
Examples¶
Snapshot strategy:
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
reader = DBReader(
connection=greenplum,
source="schema.table",
columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
where="key = 'something'",
)
df = reader.run()
Incremental strategy:
from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
greenplum = Greenplum(...)
reader = DBReader(
connection=greenplum,
source="schema.table",
columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
where="key = 'something'",
hwm=DBReader.AutoDetectHWM(name="greenplum_hwm", expression="updated_dt"),
)
with IncrementalStrategy():
df = reader.run()
Interaction schema¶
High-level schema is described in Greenplum prerequisites. You can find detailed interaction schema below.
Spark <-> Greenplum interaction during DBReader.run()
---
title: Greenplum master <-> Spark driver
---
sequenceDiagram
box "Spark"
participant A as "Spark driver"
participant B as "Spark executor1"
participant C as "Spark executor2"
participant D as "Spark executorN"
end
box "Greenplum"
participant E as "Greenplum master"
participant F as "Greenplum segment1"
participant G as "Greenplum segment2"
participant H as "Greenplum segmentN"
end
note over A,H: == Greenplum.check() ==
activate A
activate E
A ->> E: CONNECT
A -->> E : CHECK IF TABLE EXISTS gp_table
E -->> A : TABLE EXISTS
A ->> E : SHOW SCHEMA FOR gp_table
E -->> A : (id bigint, col1 int, col2 text, ...)
note over A,H: == DBReader.run() ==
A ->> B: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 1
A ->> C: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 2
A ->> D: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION N
note right of A : This is done in parallel,<br/>executors are independent<br/>|<br/>|<br/>|<br/>V
B ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor1 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor1_host:executor1_port <br/>INSERT INTO EXTERNAL TABLE spark_executor1 FROM gp_table WHERE gp_segment_id = 1
note right of E : Each white vertical line here is a opened connection to master.<br/>Usually, **N+1** connections are created from Spark to Greenplum master
activate E
E -->> F: SELECT DATA FROM gp_table_data_on_segment1 TO spark_executor1
note right of F : No direct requests between Greenplum segments & Spark driver.<br/>Data transfer is always initiated by Greenplum segments.
C ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor2 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor2_host:executor2_port <br/>INSERT INTO EXTERNAL TABLE spark_executor2 FROM gp_table WHERE gp_segment_id = 2
activate E
E -->> G: SELECT DATA FROM gp_table_data_on_segment2 TO spark_executor2
D ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executorN (id bigint, col1 int, col2 text, ...)<br/>USING address=executorN_host:executorN_port <br/>INSERT INTO EXTERNAL TABLE spark_executorN FROM gp_table WHERE gp_segment_id = N
activate E
E -->> H: SELECT DATA FROM gp_table_data_on_segmentN TO spark_executorN
F -xB: INITIALIZE CONNECTION TO Spark executor1<br/>PUSH DATA TO Spark executor1
note left of B : Circle is an open GPFDIST port,<br/>listened by executor
G -xC: INITIALIZE CONNECTION TO Spark executor2<br/>PUSH DATA TO Spark executor2
H -xD: INITIALIZE CONNECTION TO Spark executorN<br/>PUSH DATA TO Spark executorN
note over A,H: == Spark.stop() ==
B -->> E : DROP TABLE spark_executor1
deactivate E
C -->> E : DROP TABLE spark_executor2
deactivate E
D -->> E : DROP TABLE spark_executorN
deactivate E
B -->> A: DONE
C -->> A: DONE
D -->> A: DONE
A -->> E : CLOSE CONNECTION
deactivate E
deactivate A
Recommendations¶
Select only required columns¶
Instead of passing "*" in DBReader(columns=[...]) prefer passing exact column names. This reduces the amount of data passed from Greenplum to Spark.
Pay attention to where value¶
Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper DBReader(where="column = 'value'") clause. This both reduces the amount of data send from Greenplum to Spark, and may also improve performance of the query. Especially if there are indexes or partitions for columns used in where clause.
Read data in parallel¶
DBReader in case of Greenplum connector requires view or table to have a column which is used by Spark for parallel reads.
Choosing proper column allows each Spark executor to read only part of data stored in the specified segment, avoiding moving large amounts of data between segments, which improves reading performance.
Using gp_segment_id¶
By default, DBReader will use gp_segment_id column for parallel data reading. Each DataFrame partition will contain data of a specific Greenplum segment.
This allows each Spark executor read only data from specific Greenplum segment, avoiding moving large amounts of data between segments.
If view is used, it is recommended to include gp_segment_id column to this view:
Reading from view with gp_segment_id column
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE VIEW schema.view_with_gp_segment_id AS
SELECT
id,
some_column,
another_column,
gp_segment_id -- IMPORTANT
FROM schema.some_table
""",
)
reader = DBReader(
connection=greenplum,
source="schema.view_with_gp_segment_id",
)
df = reader.run()
Using custom partition_column¶
Sometimes table or view is lack of gp_segment_id column, but there is some column
with value range correlated with Greenplum segment distribution.
In this case, custom column can be used instead:
Reading from view with custom partition_column
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE VIEW schema.view_with_partition_column AS
SELECT
id,
some_column,
part_column -- correlated to greenplum segment ID
FROM schema.some_table
""",
)
reader = DBReader(
connection=greenplum,
source="schema.view_with_partition_column",
options=Greenplum.ReadOptions(
# parallelize data using specified column
partitionColumn="part_column",
# create 10 Spark tasks, each will read only part of table data
partitions=10,
),
)
df = reader.run()
Reading DISTRIBUTED REPLICATED tables¶
Replicated tables do not have gp_segment_id column at all, so you need to set partition_column to some column name of type integer/bigint/smallint.
Parallel JOIN execution¶
In case of using views which require some data motion between Greenplum segments, like JOIN queries, another approach should be used.
Each Spark executor N will run the same query, so each of N query will start its own JOIN process, leading to really heavy load on Greenplum segments.
This should be avoided.
Instead is recommended to run JOIN query on Greenplum side, save the result to an intermediate table, and then read this table using DBReader:
Reading from view using intermediate table
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE UNLOGGED TABLE schema.intermediate_table AS
SELECT
id,
tbl1.col1,
tbl1.data,
tbl2.another_data
FROM
schema.table1 as tbl1
JOIN
schema.table2 as tbl2
ON
tbl1.col1 = tbl2.col2
WHERE ...
""",
)
reader = DBReader(
connection=greenplum,
source="schema.intermediate_table",
)
df = reader.run()
# write dataframe somethere
greenplum.execute(
"""
DROP TABLE schema.intermediate_table
""",
)
Warning
NEVER do that:
df1 = DBReader(connection=greenplum, target="public.table1", ...).run()
df2 = DBReader(connection=greenplum, target="public.table2", ...).run()
joined_df = df1.join(df2, on="col")
This will lead to sending all the data from both table1 and table2 to Spark executor memory, and then JOIN
will be performed on Spark side, not inside Greenplum. This is VERY inefficient.
TEMPORARY tables notice¶
Someone could think that writing data from view or result of JOIN to TEMPORARY table, and then passing it to DBReader, is an efficient way to read data from Greenplum. This is because temp tables are not generating WAL files, and are automatically deleted after finishing the transaction.
That will NOT work. Each Spark executor establishes its own connection to Greenplum. And each connection starts its own transaction which means that every executor will read empty temporary table.
You should use UNLOGGED tables to write data to intermediate table without generating WAL logs.
Options¶
GreenplumReadOptions
¶
Bases: GenericOptions
VMware's Greenplum Spark connector reading options.
Warning
Some options, like url, dbtable, server.*, pool.*,
etc are populated from connection attributes,
and cannot be overridden by the user in ReadOptions to avoid issues.
Examples:
Note
You can pass any value
supported by connector,
even if it is not mentioned in this documentation. Option names should be in camelCase!
The set of supported options depends on connector version.
from onetl.connection import Greenplum
options = Greenplum.ReadOptions(
partitionColumn="reg_id",
partitions=10,
)
num_partitions = Field(alias='partitions')
class-attribute
instance-attribute
¶
Number of jobs created by Spark to read the table content in parallel.
See documentation for partition_column for more details
Warning
By default connector uses number of segments in the Greenplum cluster. You should not change this option, unless you know what you're doing
Warning
Both options partition_column and num_partitions should have a value,
or both should be None
partition_column = Field(alias='partitionColumn')
class-attribute
instance-attribute
¶
Column used to parallelize reading from a table.
Warning
You should not change this option, unless you know what you're doing.
It's preferable to use default values to read data parallel by number of segments in Greenplum cluster.
Possible values:
-
None(default): Spark generates N jobs (where N == number of segments in Greenplum cluster), each job is reading only data from a specific segment (filtering data bygp_segment_idcolumn).This is very effective way to fetch the data from a cluster.
-
table column Allocate each executor a range of values from a specific column.
Spark generates for each executor an SQL query:
Executor 1:
Executor 2:SELECT ... FROM table WHERE (partition_column >= lowerBound OR partition_column IS NULL) AND partition_column < (lower_bound + stride)...SELECT ... FROM table WHERE partition_column >= (lower_bound + stride) AND partition_column < (lower_bound + 2 * stride)Executor N:
WhereSELECT ... FROM table WHERE partition_column >= (lower_bound + (N-1) * stride) AND partition_column <= upper_boundstride=(upper_bound - lower_bound) / num_partitions,lower_bound=MIN(partition_column),upper_bound=MAX(partition_column).Note
Column type must be numeric. Other types are not supported.
Note
num_partitions is used just to calculate the partition stride, NOT for filtering the rows in table. So all rows in the table will be returned (unlike Incremental Read Strategies).
Note
All queries are executed in parallel. To execute them sequentially, use Batch Read Strategies.
Warning
Both options partition_column and num_partitions should have a value,
or both should be None
Examples:
Read data in 10 parallel jobs by range of values in id_column column:
Greenplum.ReadOptions(
partitionColumn="id_column",
partitions=10,
)