Greenplum <-> Spark type mapping¶
Note
The results below are valid for Spark 3.2.4, and may differ on other Spark versions.
Type detection & casting¶
Spark's DataFrames always have a schema which is a list of columns with corresponding Spark types. All operations on a column are performed using column type.
Reading from Greenplum¶
This is how Greenplum connector performs this:
- Execute query
SELECT * FROM table LIMIT 01. - For each column in query result get column name and Greenplum type.
- Find corresponding
Greenplum type (read)→Spark typecombination (see below) for each DataFrame column. If no combination is found, raise exception. - Use Spark column projection and predicate pushdown features to build a final query.
- Create DataFrame from generated query with inferred schema.
Writing to some existing Greenplum table¶
This is how Greenplum connector performs this:
- Get names of columns in DataFrame.
- Perform
SELECT * FROM table LIMIT 0query. - For each column in query result get column name and Greenplum type.
- Match table columns with DataFrame columns (by name, case insensitive).
If some column is present only in target table, but not in DataFrame (like
DEFAULTorSERIALcolumn), and vice versa, raise an exception. See Explicit type cast. - Find corresponding
Spark type→Greenplumtype (write)combination (see below) for each DataFrame column. If no combination is found, raise exception. - If
Greenplumtype (write)matchGreenplum type (read), no additional casts will be performed, DataFrame column will be written to Greenplum as is. - If
Greenplumtype (write)does not matchGreenplum type (read), DataFrame column will be casted to target column type on Greenplum side. For example, you can write column with text data to column ofjsontype (which Greenplum connector currently does not support).
Create new table using Spark¶
Warning
ABSOLUTELY NOT RECOMMENDED!
This is how Greenplum connector performs this:
- Find corresponding
Spark type→Greenplum type (create)combination (see below) for each DataFrame column. If no combination is found, raise exception. - Generate DDL for creating table in Greenplum, like
CREATE TABLE (col1 ...), and run it. - Write DataFrame to created table as is.
More details can be found here.
But Greenplum connector support only limited number of types and almost no custom clauses (like PARTITION BY).
So instead of relying on Spark to create tables:
See example
writer = DBWriter(
connection=greenplum,
target="public.table",
options=Greenplum.WriteOptions(
if_exists="append",
# by default distribution is random
distributedBy="id",
# partitionBy is not supported
),
)
writer.run(df)
Always prefer creating table with desired DDL BEFORE WRITING DATA:
See example
greenplum.execute(
"""
CREATE TABLE public.table (
id int32,
business_dt timestamp(6),
value json
)
PARTITION BY RANGE (business_dt)
DISTRIBUTED BY id
""",
)
writer = DBWriter(
connection=greenplum,
target="public.table",
options=Greenplum.WriteOptions(if_exists="append"),
)
writer.run(df)
See Greenplum CREATE TABLE documentation.
Supported types¶
See:
Numeric types¶
| Greenplum type (read) | Spark type | Greenplumtype (write) | Greenplum type (create) |
|---|---|---|---|
decimaldecimal(P=0..38)decimal(P=0..38, S=0..38) |
DecimalType(P=38, S=18)DecimalType(P=0..38, S=0)DecimalType(P=0..38, S=0..38) |
decimal(P=38, S=18)decimal(P=0..38, S=0)decimal(P=0..38, S=0..38) |
decimal (unbounded) |
decimal(P=39.., S=0..) |
unsupported 2 | ||
real |
FloatType() |
real |
real |
double precision |
DoubleType() |
double precision |
double precision |
- |
ByteType() |
unsupported | unsupported |
smallint |
ShortType() |
smallint |
smallint |
integer |
IntegerType() |
integer |
integer |
bigint |
LongType() |
bigint |
bigint |
moneyint4rangeint8rangenumrangeint2vector |
unsupported |
Temporal types¶
| Greenplum type (read) | Spark type | Greenplumtype (write) | Greenplum type (create) |
|---|---|---|---|
date |
DateType() |
date |
date |
timetime(0..6)time with time zonetime(0..6) with time zone |
TimestampType(), time format quirks 3 |
timestamp |
timestamp |
timestamptimestamp(0..6)timestamp with time zonetimestamp(0..6) with time zone |
TimestampType() |
timestamp |
timestamp |
interval or any precisiondaterangetsrangetstzrange |
unsupported |
Warning
Note that types in Greenplum and Spark have different value ranges:
| Greenplum type | Min value | Max value | Spark type | Min value | Max value |
|---|---|---|---|---|---|
date |
-4713-01-01 |
5874897-01-01 |
DateType() |
0001-01-01 |
9999-12-31 |
timestamptime |
-4713-01-01 00:00:00.00000000:00:00.000000 |
294276-12-31 23:59:59.99999924:00:00.000000 |
TimestampType() |
0001-01-01 00:00:00.000000 |
9999-12-31 23:59:59.999999 |
So not all of values can be read from Greenplum to Spark.
References:
String types¶
| Greenplum type (read) | Spark type | Greenplumtype (write) | Greenplum type (create) |
|---|---|---|---|
charactercharacter(N)character varyingcharacter varying(N)textxmlCREATE TYPE ... AS ENUM |
StringType() |
text |
text |
jsonjsonb |
unsupported |
Binary types¶
| Greenplum type (read) | Spark type | Greenplumtype (write) | Greenplum type (create) |
|---|---|---|---|
boolean |
BooleanType() |
boolean |
boolean |
bitbit(N)bit varyingbit varying(N) |
unsupported |
||
bytea |
unsupported 4 | ||
- |
BinaryType() |
bytea |
bytea |
Struct types¶
| Greenplum type (read) | Spark type | Greenplumtype (write) | Greenplum type (create) |
|---|---|---|---|
T[] |
unsupported | ||
- |
ArrayType() |
unsupported | |
CREATE TYPE sometype (...) |
StringType() |
text |
text |
- |
StructType()MapType() |
unsupported |
Unsupported types¶
Columns of these types cannot be read/written by Spark:
cidrinetmacaddrmacaddr8circleboxlinelsegpathpointpolygontsvectortsqueryuuid
The is a way to avoid this - just cast unsupported types to text. But the way this can be done is not a straightforward.
Explicit type cast¶
DBReader¶
Direct casting of Greenplum types is not supported by DBReader due to the connector’s implementation specifics.
reader = DBReader(
connection=greenplum,
# will fail
columns=["CAST(unsupported_column AS text)"],
)
But there is a workaround - create a view with casting unsupported column to text (or any other supported type). For example, you can use to_json Postgres function to convert column of any type to string representation and then parse this column on Spark side using JSON.parse_column method.
from pyspark.sql.types import ArrayType, IntegerType
from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.file.format import JSON
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE VIEW schema.view_with_json_column AS
SELECT
id,
supported_column,
to_json(array_column) array_column_as_json,
gp_segment_id -- ! important !
FROM
schema.table_with_unsupported_columns
""",
)
# create dataframe using this view
reader = DBReader(
connection=greenplum,
source="schema.view_with_json_column",
)
df = reader.run()
# Define the schema for the JSON data
json_scheme = ArrayType(IntegerType())
df = df.select(
df.id,
df.supported_column,
JSON().parse_column(df.array_column_as_json, json_scheme).alias("array_column"),
)
DBWriter¶
To write data to a column of text or json types in some Greenplum table, use JSON.serialize_column method.
from onetl.connection import Greenplum
from onetl.db import DBWriter
from onetl.file.format import JSON
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE TABLE schema.target_table (
id int,
supported_column timestamp,
array_column_as_json jsonb, -- or text
)
DISTRIBUTED BY id
""",
)
write_df = df.select(
df.id,
df.supported_column,
JSON().serialize_column(df.array_column).alias("array_column_json"),
)
writer = DBWriter(
connection=greenplum,
target="schema.target_table",
)
writer.run(write_df)
Then you can parse this column on Greenplum side:
SELECT
id,
supported_column,
-- access first item of an array
array_column_as_json->0
FROM
schema.target_table
-
Yes, all columns of a table, not just selected ones. This means that if source table contains columns with unsupported type, the entire table cannot be read. ↩
-
Greenplum support decimal types with unlimited precision.
But Spark's
DecimalType(P, S)supports maximumP=38(128 bit). It is impossible to read, write or operate with values of larger precision, this leads to an exception. ↩ -
timetype is the same astimestampwith date1970-01-01. So instead of reading data from Postgres like23:59:59it is actually read1970-01-01 23:59:59, and vice versa. ↩ -
Yes, that's weird. ↩