Skip to content

Postgres <-> Spark type mapping

Note

The results below are valid for Spark 3.5.8, and may differ on other Spark versions.

Type detection & casting

Spark's DataFrames always have a schema which is a list of columns with corresponding Spark types. All operations on a column are performed using column type.

Reading from Postgres

This is how Postgres connector performs this:

  • For each column in query result (SELECT column1, column2, ... FROM table ...) get column name and Postgres type.
  • Find corresponding Postgres type (read)Spark type combination (see below) for each DataFrame column 1. If no combination is found, raise exception.
  • Create DataFrame from query with specific column names and Spark types.

Writing to some existing Postgres table

This is how Postgres connector performs this:

  • Get names of columns in DataFrame. 1
  • Perform SELECT * FROM table LIMIT 0 query.
  • Take only columns present in DataFrame (by name, case insensitive) 2. For each found column get Postgres type.
  • Find corresponding Spark typePostgres type (write) combination (see below) for each DataFrame column. If no combination is found, raise exception.
  • If Postgres type (write) match Postgres type (read), no additional casts will be performed, DataFrame column will be written to Postgres as is.
  • If Postgres type (write) does not match Postgres type (read), DataFrame column will be casted to target column type on Postgres side. For example, you can write column with text data to int column, if column contains valid integer values within supported value range and precision 3.

Create new table using Spark

Warning

ABSOLUTELY NOT RECOMMENDED!

This is how Postgres connector performs this:

  • Find corresponding Spark typePostgres type (create) combination (see below) for each DataFrame column. If no combination is found, raise exception.
  • Generate DDL for creating table in Postgres, like CREATE TABLE (col1 ...), and run it.
  • Write DataFrame to created table as is.

But Postgres connector support only limited number of types and almost no custom clauses (like PARTITION BY, INDEX, etc). So instead of relying on Spark to create tables:

See example
writer = DBWriter(
    connection=postgres,
    target="public.table",
    options=Postgres.WriteOptions(
        if_exists="append",
        createTableOptions="PARTITION BY RANGE (id)",
    ),
)
writer.run(df)

Always prefer creating table with desired DDL BEFORE WRITING DATA:

See example
postgres.execute(
    """
    CREATE TABLE public.table (
        id bigint,
        business_dt timestamp(6),
        value json
    )
    PARTITION BY RANGE (Id)
    """,
)

writer = DBWriter(
    connection=postgres,
    target="public.table",
    options=Postgres.WriteOptions(if_exists="append"),
)
writer.run(df)

See Postgres CREATE TABLE documentation.

Supported types

References

See List of Postgres types.

Here you can find source code with type conversions:

Numeric types

Postgres type (read) Spark type Postgres type (write) Postgres type (create)
decimal DecimalType(P=38, S=18) decimal(P=38, S=18)

decimal (unbounded)
decimal(P=0..38) DecimalType(P=0..38, S=0) decimal(P=0..38, S=0)
decimal(P=0..38, S=0..38) DecimalType(P=0..38, S=0..38) decimal(P=0..38, S=0..38)
decimal(P=39.., S=0..) unsupported 4
decimal(P=.., S=..-1) unsupported 5
real FloatType() real real
double precision DoubleType() double precision double precision
smallint ShortType() smallint smallint
- ByteType()
integer IntegerType() integer integer
bigint LongType() bigint bigint
money



StringType() 1




text




text
int4range
int8range
numrange
int2vector

Temporal types

Postgres type (read) Spark type Postgres type (write) Postgres type (create)
date DateType() date date
time
time(0..6)
time with time zone
time(0..6) with time zone

TimestampType(),
with time format quirks 6


timestamp(6)


timestamp(6)
timestamp
timestamp(0..6)
timestamp with time zone
timestamp(0..6) with time zone


TimestampType()


timestamp(6)


timestamp(6)
- TimestampNTZType() timestamp(6) timestamp(6)
interval of any precision StringType() 1 text text
- DayTimeIntervalType() unsupported unsupported
- YearMonthIntervalType() unsupported unsupported
daterange
tsrange
tstzrange

StringType() 1


text


text

Warning

Note that types in Postgres and Spark have different value ranges:

Postgres type Min value Max value Spark type Min value Max value
date -4713-01-01 5874897-01-01 DateType() 0001-01-01 9999-12-31
timestamp -4713-01-01 00:00:00.000000 294276-12-31 23:59:59.999999

TimestampType()


0001-01-01 00:00:00.000000


9999-12-31 23:59:59.999999
time 00:00:00.000000 24:00:00.000000

So not all of values can be read from Postgres to Spark.

References:

String types

Postgres type (read) Spark type Postgres type (write) Postgres type (create)
character
character(N)
character varying
character varying(N)
text
json
jsonb
xml




StringType()





text





text
CREATE TYPE ... AS ENUM
tsvector
tsquery

StringType()1
- CharType() unsupported unsupported
- VarcharType() unsupported unsupported

Binary types

Postgres type (read) Spark type Postgres type (write) Postgres type (create)
boolean BooleanType() boolean boolean
bit
bit(N=1)
BooleanType() bool,
cannot insert data 3
bool
bit(N=2..) ByteType() bytea,
cannot insert data 3
bytea
bit varying
bit varying(N)
StringType() 1 text text
bytea BinaryType() bytea bytea

Struct types

Postgres type (read) Spark type Postgres type (write) Postgres type (create)
T[] ArrayType(T) T[] T[]
T[][] unsupported
CREATE TYPE sometype (...) StringType() 1 text text
- StructType()
MapType()
unsupported

Network types

Postgres type (read) Spark type Postgres type (write) Postgres type (create)
cidr
inet
macaddr
macaddr8
StringType() 1

text


text

Geo types

Postgres type (read) Spark type Postgres type (write) Postgres type (create)
circle
box
line
lseg
path
point
polygon



StringType() 1



text



text

Explicit type cast

DBReader

It is possible to explicitly cast column of unsupported type using DBReader(columns=...) syntax.

For example, you can use CAST(column AS text) to convert data to string representation on Postgres side, and so it will be read as Spark's StringType().

It is also possible to use to_json Postgres function to convert column of any type to string representation, and then parse this column on Spark side you can use the JSON.parse_column method:

from pyspark.sql.types import IntegerType

from onetl.connection import Postgres
from onetl.db import DBReader
from onetl.file.format import JSON

postgres = Postgres(...)

DBReader(
    connection=postgres,
    columns=[
        "id",
        "supported_column",
        "CAST(unsupported_column AS text) unsupported_column_str",
        # or
        "to_json(unsupported_column) array_column_json",
    ],
)
df = reader.run()

json_schema = StructType(
    [
        StructField("id", IntegerType(), nullable=True),
        StructField("name", StringType(), nullable=True),
        ...,
    ]
)
df = df.select(
    df.id,
    df.supported_column,
    # explicit cast
    df.unsupported_column_str.cast("integer").alias("parsed_integer"),
    JSON().parse_column("array_column_json", json_schema).alias("json_string"),
)

DBWriter

It is always possible to convert data on the Spark side to a string, and then write it to a text column in a Postgres table.

Using JSON.serialize_column

You can use the JSON.serialize_column method for data serialization:

from onetl.file.format import JSON
from pyspark.sql.functions import col

from onetl.connection import Postgres
from onetl.db import DBWriter

postgres = Postgres(...)

postgres.execute(
    """
    CREATE TABLE schema.target_table (
        id int,
        supported_column timestamp,
        array_column_json jsonb -- any column type, actually
    )
    """,
)

write_df = df.select(
    df.id,
    df.supported_column,
    JSON().serialize_column(df.unsupported_column).alias("array_column_json"),
)

writer = DBWriter(
    connection=postgres,
    target="schema.target_table",
)
writer.run(write_df)

Then you can parse this column on the Postgres side (for example, by creating a view):

SELECT
    id,
    supported_column,
    array_column_json->'0' AS array_item_0
FROM
    schema.target_table

To avoid casting the value on every table read you can use GENERATED ALWAYS STORED column, but this requires 2x space (for original and parsed value).

Manual conversion to string

Postgres connector also supports conversion text value directly to target column type, if this value has a proper format.

For example, you can write data like [123, 345) to int8range type because Postgres allows cast '[123, 345)'::int8range':

from pyspark.sql.ftypes import StringType
from pyspark.sql.functions import udf

from onetl.connection import Postgres
from onetl.db import DBReader

postgres = Postgres(...)

postgres.execute(
    """
    CREATE TABLE schema.target_table (
        id int,
        range_column int8range -- any column type, actually
    )
    """,
)


@udf(returnType=StringType())
def array_to_range(value: tuple):
    """This UDF allows to convert tuple[start, end] to Postgres' range format"""
    start, end = value
    return f"[{start},{end})"


write_df = df.select(
    df.id,
    array_to_range(df.range_column).alias("range_column"),
)

writer = DBWriter(
    connection=postgres,
    target="schema.target_table",
)
writer.run(write_df)

This can be tricky to implement and may lead to longer write process. But this does not require extra space on Postgres side, and allows to avoid explicit value cast on every table read.


  1. All Postgres types that doesn't have corresponding Java type are converted to String

  2. This allows to write data to tables with DEFAULT and GENERATED columns - if DataFrame has no such column, it will be populated by Postgres. 

  3. This is true only if either DataFrame column is a StringType(), or target column is text type.

    But other types cannot be silently converted, like bytea -> bit(N). This requires explicit casting, see [Manual conversion to string]. 

  4. Postgres support decimal types with unlimited precision.

    But Spark's DecimalType(P, S) supports maximum P=38 (128 bit). It is impossible to read, write or operate with values of larger precision, this leads to an exception. 

  5. Postgres support decimal types with negative scale, like decimal(38, -10). Spark doesn't. 

  6. time type is the same as timestamp with date 1970-01-01. So instead of reading data from Postgres like 23:59:59 it is actually read 1970-01-01 23:59:59, and vice versa.