Postgres <-> Spark type mapping¶
Note
The results below are valid for Spark 3.5.8, and may differ on other Spark versions.
Type detection & casting¶
Spark's DataFrames always have a schema which is a list of columns with corresponding Spark types. All operations on a column are performed using column type.
Reading from Postgres¶
This is how Postgres connector performs this:
- For each column in query result (
SELECT column1, column2, ... FROM table ...) get column name and Postgres type. - Find corresponding
Postgres type (read)→Spark typecombination (see below) for each DataFrame column 1. If no combination is found, raise exception. - Create DataFrame from query with specific column names and Spark types.
Writing to some existing Postgres table¶
This is how Postgres connector performs this:
- Get names of columns in DataFrame. 1
- Perform
SELECT * FROM table LIMIT 0query. - Take only columns present in DataFrame (by name, case insensitive) 2. For each found column get Postgres type.
- Find corresponding
Spark type→Postgres type (write)combination (see below) for each DataFrame column. If no combination is found, raise exception. - If
Postgres type (write)matchPostgres type (read), no additional casts will be performed, DataFrame column will be written to Postgres as is. - If
Postgres type (write)does not matchPostgres type (read), DataFrame column will be casted to target column type on Postgres side. For example, you can write column with text data tointcolumn, if column contains valid integer values within supported value range and precision 3.
Create new table using Spark¶
Warning
ABSOLUTELY NOT RECOMMENDED!
This is how Postgres connector performs this:
- Find corresponding
Spark type→Postgres type (create)combination (see below) for each DataFrame column. If no combination is found, raise exception. - Generate DDL for creating table in Postgres, like
CREATE TABLE (col1 ...), and run it. - Write DataFrame to created table as is.
But Postgres connector support only limited number of types and almost no custom clauses (like PARTITION BY, INDEX, etc).
So instead of relying on Spark to create tables:
See example
writer = DBWriter(
connection=postgres,
target="public.table",
options=Postgres.WriteOptions(
if_exists="append",
createTableOptions="PARTITION BY RANGE (id)",
),
)
writer.run(df)
Always prefer creating table with desired DDL BEFORE WRITING DATA:
See example
postgres.execute(
"""
CREATE TABLE public.table (
id bigint,
business_dt timestamp(6),
value json
)
PARTITION BY RANGE (Id)
""",
)
writer = DBWriter(
connection=postgres,
target="public.table",
options=Postgres.WriteOptions(if_exists="append"),
)
writer.run(df)
See Postgres CREATE TABLE documentation.
Supported types¶
References¶
Here you can find source code with type conversions:
Numeric types¶
| Postgres type (read) | Spark type | Postgres type (write) | Postgres type (create) |
|---|---|---|---|
decimal |
DecimalType(P=38, S=18) |
decimal(P=38, S=18) |
decimal (unbounded) |
decimal(P=0..38) |
DecimalType(P=0..38, S=0) |
decimal(P=0..38, S=0) |
|
decimal(P=0..38, S=0..38) |
DecimalType(P=0..38, S=0..38) |
decimal(P=0..38, S=0..38) |
|
decimal(P=39.., S=0..) |
unsupported 4 | ||
decimal(P=.., S=..-1) |
unsupported 5 | ||
real |
FloatType() |
real |
real |
double precision |
DoubleType() |
double precision |
double precision |
smallint |
ShortType() |
smallint |
smallint |
- |
ByteType() |
||
integer |
IntegerType() |
integer |
integer |
bigint |
LongType() |
bigint |
bigint |
money |
StringType() 1 |
text |
text |
int4range |
| | |
int8range |
| | |
numrange |
| | |
int2vector |
| | |
Temporal types¶
| Postgres type (read) | Spark type | Postgres type (write) | Postgres type (create) |
|---|---|---|---|
date |
DateType() |
date |
date |
timetime(0..6)time with time zonetime(0..6) with time zone |
TimestampType(),with time format quirks 6 |
timestamp(6) |
timestamp(6) |
timestamptimestamp(0..6)timestamp with time zonetimestamp(0..6) with time zone |
TimestampType() |
timestamp(6) |
timestamp(6) |
- |
TimestampNTZType() |
timestamp(6) |
timestamp(6) |
interval of any precision |
StringType() 1 |
text |
text |
- |
DayTimeIntervalType() |
unsupported | unsupported |
- |
YearMonthIntervalType() |
unsupported | unsupported |
daterangetsrangetstzrange |
StringType() 1 |
text |
text |
Warning
Note that types in Postgres and Spark have different value ranges:
| Postgres type | Min value | Max value | Spark type | Min value | Max value |
|---|---|---|---|---|---|
date |
-4713-01-01 |
5874897-01-01 |
DateType() |
0001-01-01 |
9999-12-31 |
timestamp |
-4713-01-01 00:00:00.000000 |
294276-12-31 23:59:59.999999 |
TimestampType() |
0001-01-01 00:00:00.000000 |
9999-12-31 23:59:59.999999 |
time |
00:00:00.000000 |
24:00:00.000000 |
| | |
So not all of values can be read from Postgres to Spark.
References:
String types¶
| Postgres type (read) | Spark type | Postgres type (write) | Postgres type (create) |
|---|---|---|---|
charactercharacter(N)character varyingcharacter varying(N)textjsonjsonbxml |
StringType() |
text |
text |
CREATE TYPE ... AS ENUMtsvectortsquery |
StringType()1 |
| |
- |
CharType() |
unsupported |
unsupported |
- |
VarcharType() |
unsupported |
unsupported |
Binary types¶
| Postgres type (read) | Spark type | Postgres type (write) | Postgres type (create) |
|---|---|---|---|
boolean |
BooleanType() |
boolean |
boolean |
bitbit(N=1) |
BooleanType() |
bool,cannot insert data 3 |
bool |
bit(N=2..) |
ByteType() |
bytea,cannot insert data 3 |
bytea |
bit varyingbit varying(N) |
StringType() 1 |
text |
text |
bytea |
BinaryType() |
bytea |
bytea |
Struct types¶
| Postgres type (read) | Spark type | Postgres type (write) | Postgres type (create) |
|---|---|---|---|
T[] |
ArrayType(T) |
T[] |
T[] |
T[][] |
unsupported | ||
CREATE TYPE sometype (...) |
StringType() 1 |
text |
text |
- |
StructType()MapType() |
unsupported |
Network types¶
| Postgres type (read) | Spark type | Postgres type (write) | Postgres type (create) |
|---|---|---|---|
cidrinetmacaddrmacaddr8 |
StringType() 1 |
text |
text |
Geo types¶
| Postgres type (read) | Spark type | Postgres type (write) | Postgres type (create) |
|---|---|---|---|
circleboxlinelsegpathpointpolygon |
StringType() 1 |
text |
text |
Explicit type cast¶
DBReader¶
It is possible to explicitly cast column of unsupported type using DBReader(columns=...) syntax.
For example, you can use CAST(column AS text) to convert data to string representation on Postgres side, and so it will be read as Spark's StringType().
It is also possible to use to_json Postgres function to convert column of any type to string representation, and then parse this column on Spark side you can use the JSON.parse_column method:
from pyspark.sql.types import IntegerType
from onetl.connection import Postgres
from onetl.db import DBReader
from onetl.file.format import JSON
postgres = Postgres(...)
DBReader(
connection=postgres,
columns=[
"id",
"supported_column",
"CAST(unsupported_column AS text) unsupported_column_str",
# or
"to_json(unsupported_column) array_column_json",
],
)
df = reader.run()
json_schema = StructType(
[
StructField("id", IntegerType(), nullable=True),
StructField("name", StringType(), nullable=True),
...,
]
)
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
JSON().parse_column("array_column_json", json_schema).alias("json_string"),
)
DBWriter¶
It is always possible to convert data on the Spark side to a string, and then write it to a text column in a Postgres table.
Using JSON.serialize_column¶
You can use the JSON.serialize_column method for data serialization:
from onetl.file.format import JSON
from pyspark.sql.functions import col
from onetl.connection import Postgres
from onetl.db import DBWriter
postgres = Postgres(...)
postgres.execute(
"""
CREATE TABLE schema.target_table (
id int,
supported_column timestamp,
array_column_json jsonb -- any column type, actually
)
""",
)
write_df = df.select(
df.id,
df.supported_column,
JSON().serialize_column(df.unsupported_column).alias("array_column_json"),
)
writer = DBWriter(
connection=postgres,
target="schema.target_table",
)
writer.run(write_df)
Then you can parse this column on the Postgres side (for example, by creating a view):
SELECT
id,
supported_column,
array_column_json->'0' AS array_item_0
FROM
schema.target_table
To avoid casting the value on every table read you can use GENERATED ALWAYS STORED column, but this requires 2x space (for original and parsed value).
Manual conversion to string¶
Postgres connector also supports conversion text value directly to target column type, if this value has a proper format.
For example, you can write data like [123, 345) to int8range type because Postgres allows cast '[123, 345)'::int8range':
from pyspark.sql.ftypes import StringType
from pyspark.sql.functions import udf
from onetl.connection import Postgres
from onetl.db import DBReader
postgres = Postgres(...)
postgres.execute(
"""
CREATE TABLE schema.target_table (
id int,
range_column int8range -- any column type, actually
)
""",
)
@udf(returnType=StringType())
def array_to_range(value: tuple):
"""This UDF allows to convert tuple[start, end] to Postgres' range format"""
start, end = value
return f"[{start},{end})"
write_df = df.select(
df.id,
array_to_range(df.range_column).alias("range_column"),
)
writer = DBWriter(
connection=postgres,
target="schema.target_table",
)
writer.run(write_df)
This can be tricky to implement and may lead to longer write process. But this does not require extra space on Postgres side, and allows to avoid explicit value cast on every table read.
-
All Postgres types that doesn't have corresponding Java type are converted to
String. ↩↩↩↩↩↩↩↩↩↩ -
This allows to write data to tables with
DEFAULTandGENERATEDcolumns - if DataFrame has no such column, it will be populated by Postgres. ↩ -
This is true only if either DataFrame column is a
StringType(), or target column istexttype.But other types cannot be silently converted, like
bytea -> bit(N). This requires explicit casting, see [Manual conversion to string]. ↩↩↩ -
Postgres support decimal types with unlimited precision.
But Spark's
DecimalType(P, S)supports maximumP=38(128 bit). It is impossible to read, write or operate with values of larger precision, this leads to an exception. ↩ -
Postgres support decimal types with negative scale, like
decimal(38, -10). Spark doesn't. ↩ -
timetype is the same astimestampwith date1970-01-01. So instead of reading data from Postgres like23:59:59it is actually read1970-01-01 23:59:59, and vice versa. ↩