Skip to content

Reading from Greenplum using DBReader

Data can be read from Greenplum to Spark using DBReader. It also supports strategy for incremental data reading.

Warning

Please take into account Greenplum types.

Note

Unlike JDBC connectors, Greenplum connector for Spark does not support executing custom SQL queries using .sql method. Connector can be used to only read data from a table or view.

Supported DBReader features

Warning

In case of Greenplum connector, DBReader does not generate raw SELECT query. Instead it relies on Spark SQL syntax which in some cases (using column projection and predicate pushdown) can be converted to Greenplum SQL.

So columns, where and hwm.expression should be specified in Spark SQL syntax, not Greenplum SQL.

This is OK:

DBReader(
    columns=[
        "some_column",
        # this cast is executed on Spark side
        "CAST(another_column AS STRING)",
    ],
    # this predicate is parsed by Spark, and can be pushed down to Greenplum
    where="some_column LIKE 'val1%'",
)

This is will fail:

DBReader(
    columns=[
        "some_column",
        # Spark does not have `text` type
        "CAST(another_column AS text)",
    ],
    # Spark does not support ~ syntax for regexp matching
    where="some_column ~ 'val1.*'",
)

Examples

Snapshot strategy:

from onetl.connection import Greenplum
from onetl.db import DBReader

greenplum = Greenplum(...)

reader = DBReader(
    connection=greenplum,
    source="schema.table",
    columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
    where="key = 'something'",
)
df = reader.run()

Incremental strategy:

from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy

greenplum = Greenplum(...)

reader = DBReader(
    connection=greenplum,
    source="schema.table",
    columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
    where="key = 'something'",
    hwm=DBReader.AutoDetectHWM(name="greenplum_hwm", expression="updated_dt"),
)

with IncrementalStrategy():
    df = reader.run()

Interaction schema

High-level schema is described in Greenplum prerequisites. You can find detailed interaction schema below.

Spark <-> Greenplum interaction during DBReader.run()
---
title: Greenplum master <-> Spark driver
---

sequenceDiagram
    box "Spark"
    participant A as "Spark driver"
    participant B as "Spark executor1"
    participant C as "Spark executor2"
    participant D as "Spark executorN"
    end

    box "Greenplum"
    participant E as "Greenplum master"
    participant F as "Greenplum segment1"
    participant G as "Greenplum segment2"
    participant H as "Greenplum segmentN"
    end

    note over A,H: == Greenplum.check() ==

    activate A
    activate E
    A ->> E: CONNECT

    A -->> E : CHECK IF TABLE EXISTS gp_table
    E -->> A : TABLE EXISTS
    A ->> E : SHOW SCHEMA FOR gp_table
    E -->> A : (id bigint, col1 int, col2 text, ...)

    note over A,H: == DBReader.run() ==

    A ->> B: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 1
    A ->> C: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 2
    A ->> D: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION N

    note right of A : This is done in parallel,<br/>executors are independent<br/>|<br/>|<br/>|<br/>V
    B ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor1 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor1_host:executor1_port <br/>INSERT INTO EXTERNAL TABLE spark_executor1 FROM gp_table WHERE gp_segment_id = 1
    note right of E : Each white vertical line here is a opened connection to master.<br/>Usually, **N+1** connections are created from Spark to Greenplum master
    activate E
    E -->> F: SELECT DATA FROM gp_table_data_on_segment1 TO spark_executor1
    note right of F : No direct requests between Greenplum segments & Spark driver.<br/>Data transfer is always initiated by Greenplum segments.


    C ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor2 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor2_host:executor2_port <br/>INSERT INTO EXTERNAL TABLE spark_executor2 FROM gp_table WHERE gp_segment_id = 2
    activate E
    E -->> G: SELECT DATA FROM gp_table_data_on_segment2 TO spark_executor2

    D ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executorN (id bigint, col1 int, col2 text, ...)<br/>USING address=executorN_host:executorN_port <br/>INSERT INTO EXTERNAL TABLE spark_executorN FROM gp_table WHERE gp_segment_id = N
    activate E
    E -->> H: SELECT DATA FROM gp_table_data_on_segmentN TO spark_executorN

    F -xB: INITIALIZE CONNECTION TO Spark executor1<br/>PUSH DATA TO Spark executor1
    note left of B : Circle is an open GPFDIST port,<br/>listened by executor

    G -xC: INITIALIZE CONNECTION TO Spark executor2<br/>PUSH DATA TO Spark executor2
    H -xD: INITIALIZE CONNECTION TO Spark executorN<br/>PUSH DATA TO Spark executorN

    note over A,H: == Spark.stop() ==

    B -->> E : DROP TABLE spark_executor1
    deactivate E
    C -->> E : DROP TABLE spark_executor2
    deactivate E
    D -->> E : DROP TABLE spark_executorN
    deactivate E

    B -->> A: DONE
    C -->> A: DONE
    D -->> A: DONE

    A -->> E : CLOSE CONNECTION
    deactivate E
    deactivate A

Recommendations

Select only required columns

Instead of passing "*" in DBReader(columns=[...]) prefer passing exact column names. This reduces the amount of data passed from Greenplum to Spark.

Pay attention to where value

Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper DBReader(where="column = 'value'") clause. This both reduces the amount of data send from Greenplum to Spark, and may also improve performance of the query. Especially if there are indexes or partitions for columns used in where clause.

Read data in parallel

DBReader in case of Greenplum connector requires view or table to have a column which is used by Spark for parallel reads.

Choosing proper column allows each Spark executor to read only part of data stored in the specified segment, avoiding moving large amounts of data between segments, which improves reading performance.

Using gp_segment_id

By default, DBReader will use gp_segment_id column for parallel data reading. Each DataFrame partition will contain data of a specific Greenplum segment.

This allows each Spark executor read only data from specific Greenplum segment, avoiding moving large amounts of data between segments.

If view is used, it is recommended to include gp_segment_id column to this view:

Reading from view with gp_segment_id column
from onetl.connection import Greenplum
from onetl.db import DBReader

greenplum = Greenplum(...)

greenplum.execute(
    """
    CREATE VIEW schema.view_with_gp_segment_id AS
    SELECT
        id,
        some_column,
        another_column,
        gp_segment_id  -- IMPORTANT
    FROM schema.some_table
    """,
)

reader = DBReader(
    connection=greenplum,
    source="schema.view_with_gp_segment_id",
)
df = reader.run()

Using custom partition_column

Sometimes table or view is lack of gp_segment_id column, but there is some column with value range correlated with Greenplum segment distribution.

In this case, custom column can be used instead:

Reading from view with custom partition_column
from onetl.connection import Greenplum
from onetl.db import DBReader

greenplum = Greenplum(...)

greenplum.execute(
    """
    CREATE VIEW schema.view_with_partition_column AS
    SELECT
        id,
        some_column,
        part_column  -- correlated to greenplum segment ID
    FROM schema.some_table
    """,
)

reader = DBReader(
    connection=greenplum,
    source="schema.view_with_partition_column",
    options=Greenplum.ReadOptions(
        # parallelize data using specified column
        partitionColumn="part_column",
        # create 10 Spark tasks, each will read only part of table data
        partitions=10,
    ),
)
df = reader.run()

Reading DISTRIBUTED REPLICATED tables

Replicated tables do not have gp_segment_id column at all, so you need to set partition_column to some column name of type integer/bigint/smallint.

Parallel JOIN execution

In case of using views which require some data motion between Greenplum segments, like JOIN queries, another approach should be used.

Each Spark executor N will run the same query, so each of N query will start its own JOIN process, leading to really heavy load on Greenplum segments.

This should be avoided.

Instead is recommended to run JOIN query on Greenplum side, save the result to an intermediate table, and then read this table using DBReader:

Reading from view using intermediate table
from onetl.connection import Greenplum
from onetl.db import DBReader

greenplum = Greenplum(...)

greenplum.execute(
    """
    CREATE UNLOGGED TABLE schema.intermediate_table AS
    SELECT
        id,
        tbl1.col1,
        tbl1.data,
        tbl2.another_data
    FROM
        schema.table1 as tbl1
    JOIN
        schema.table2 as tbl2
    ON
        tbl1.col1 = tbl2.col2
    WHERE ...
    """,
)

reader = DBReader(
    connection=greenplum,
    source="schema.intermediate_table",
)
df = reader.run()

# write dataframe somethere

greenplum.execute(
    """
    DROP TABLE schema.intermediate_table
    """,
)

Warning

NEVER do that:

df1 = DBReader(connection=greenplum, target="public.table1", ...).run()
df2 = DBReader(connection=greenplum, target="public.table2", ...).run()

joined_df = df1.join(df2, on="col")

This will lead to sending all the data from both table1 and table2 to Spark executor memory, and then JOIN will be performed on Spark side, not inside Greenplum. This is VERY inefficient.

TEMPORARY tables notice

Someone could think that writing data from view or result of JOIN to TEMPORARY table, and then passing it to DBReader, is an efficient way to read data from Greenplum. This is because temp tables are not generating WAL files, and are automatically deleted after finishing the transaction.

That will NOT work. Each Spark executor establishes its own connection to Greenplum. And each connection starts its own transaction which means that every executor will read empty temporary table.

You should use UNLOGGED tables to write data to intermediate table without generating WAL logs.

Options

GreenplumReadOptions

Bases: GenericOptions

VMware's Greenplum Spark connector reading options.

Warning

Some options, like url, dbtable, server.*, pool.*, etc are populated from connection attributes, and cannot be overridden by the user in ReadOptions to avoid issues.

Examples:

Note

You can pass any value supported by connector, even if it is not mentioned in this documentation. Option names should be in camelCase!

The set of supported options depends on connector version.

from onetl.connection import Greenplum

options = Greenplum.ReadOptions(
    partitionColumn="reg_id",
    partitions=10,
)

num_partitions = Field(alias='partitions') class-attribute instance-attribute

Number of jobs created by Spark to read the table content in parallel.

See documentation for partition_column for more details

Warning

By default connector uses number of segments in the Greenplum cluster. You should not change this option, unless you know what you're doing

Warning

Both options partition_column and num_partitions should have a value, or both should be None

partition_column = Field(alias='partitionColumn') class-attribute instance-attribute

Column used to parallelize reading from a table.

Warning

You should not change this option, unless you know what you're doing.

It's preferable to use default values to read data parallel by number of segments in Greenplum cluster.

Possible values:

  • None (default): Spark generates N jobs (where N == number of segments in Greenplum cluster), each job is reading only data from a specific segment (filtering data by gp_segment_id column).

    This is very effective way to fetch the data from a cluster.

  • table column Allocate each executor a range of values from a specific column.

    Spark generates for each executor an SQL query:

    Executor 1:

    SELECT ... FROM table
    WHERE (partition_column >= lowerBound
            OR partition_column IS NULL)
    AND partition_column < (lower_bound + stride)
    
    Executor 2:

    SELECT ... FROM table
    WHERE partition_column >= (lower_bound + stride)
    AND partition_column < (lower_bound + 2 * stride)
    
    ...

    Executor N:

    SELECT ... FROM table
    WHERE partition_column >= (lower_bound + (N-1) * stride)
    AND partition_column <= upper_bound
    
    Where stride=(upper_bound - lower_bound) / num_partitions, lower_bound=MIN(partition_column), upper_bound=MAX(partition_column).

    Note

    Column type must be numeric. Other types are not supported.

    Note

    num_partitions is used just to calculate the partition stride, NOT for filtering the rows in table. So all rows in the table will be returned (unlike Incremental Read Strategies).

    Note

    All queries are executed in parallel. To execute them sequentially, use Batch Read Strategies.

Warning

Both options partition_column and num_partitions should have a value, or both should be None

Examples:

Read data in 10 parallel jobs by range of values in id_column column:

Greenplum.ReadOptions(
    partitionColumn="id_column",
    partitions=10,
)