Skip to content

MySQL <-> Spark type mapping

Note

The results below are valid for Spark 3.5.8, and may differ on other Spark versions.

Type detection & casting

Spark's DataFrames always have a schema which is a list of columns with corresponding Spark types. All operations on a column are performed using column type.

Reading from MySQL

This is how MySQL connector performs this:

  • For each column in query result (SELECT column1, column2, ... FROM table ...) get column name and MySQL type.
  • Find corresponding MySQL type (read)Spark type combination (see below) for each DataFrame column. If no combination is found, raise exception.
  • Create DataFrame from query with specific column names and Spark types.

Writing to some existing MySQL table

This is how MySQL connector performs this:

  • Get names of columns in DataFrame. 1
  • Perform SELECT * FROM table LIMIT 0 query.
  • Take only columns present in DataFrame (by name, case insensitive). For each found column get MySQL type.
  • Find corresponding Spark typeMySQL type (write) combination (see below) for each DataFrame column. If no combination is found, raise exception.
  • If MySQL type (write) match MySQL type (read), no additional casts will be performed, DataFrame column will be written to MySQL as is.
  • If MySQL type (write) does not match MySQL type (read), DataFrame column will be casted to target column type on MySQL side. For example, you can write column with text data to int column, if column contains valid integer values within supported value range and precision.

Create new table using Spark

Warning

ABSOLUTELY NOT RECOMMENDED!

This is how MySQL connector performs this:

  • Find corresponding Spark typeMySQL type (create) combination (see below) for each DataFrame column. If no combination is found, raise exception.
  • Generate DDL for creating table in MySQL, like CREATE TABLE (col1 ...), and run it.
  • Write DataFrame to created table as is.

But some cases this may lead to using wrong column type. For example, Spark creates column of type timestamp which corresponds to MySQL type timestamp(0) (precision up to seconds) instead of more precise timestamp(6) (precision up to nanoseconds). This may lead to incidental precision loss, or sometimes data cannot be written to created table at all.

So instead of relying on Spark to create tables:

See example
writer = DBWriter(
    connection=mysql,
    target="myschema.target_tbl",
    options=MySQL.WriteOptions(
        if_exists="append",
        createTableOptions="ENGINE = InnoDB",
    ),
)
writer.run(df)

Always prefer creating tables with specific types BEFORE WRITING DATA:

See example
mysql.execute(
    """
    CREATE TABLE schema.table (
        id bigint,
        key text,
        value timestamp(6) -- specific type and precision
    )
    ENGINE = InnoDB
    """,
)

writer = DBWriter(
    connection=mysql,
    target="myschema.target_tbl",
    options=MySQL.WriteOptions(if_exists="append"),
)
writer.run(df)

References

Here you can find source code with type conversions:

Supported types

See official documentation

Numeric types

MySQL type (read) Spark type MySQL type (write) MySQL type (create)
decimal DecimalType(P=10, S=0) decimal(P=10, S=0) decimal(P=10, S=0)
decimal(P=0..38) DecimalType(P=0..38, S=0) decimal(P=0..38, S=0) decimal(P=0..38, S=0)
decimal(P=0..38, S=0..30) DecimalType(P=0..38, S=0..30) decimal(P=0..38, S=0..30) decimal(P=0..38, S=0..30)
decimal(P=39..65, S=...) unsupported 2
float
double
DoubleType() double double
tinyint
smallint
mediumint
int


IntegerType()


int


int
bigint LongType() bigint bigint

Temporal types

MySQL type (read) Spark type MySQL type (write) MySQL type (create)
year
date
DateType() date date
datetime, seconds
timestamp, seconds
datetime(0), seconds
timestamp(0), seconds


TimestampType(), microseconds


timestamp(6), microseconds


timestamp(0), seconds
datetime(3), milliseconds
timestamp(3), milliseconds
datetime(6), microseconds
timestamp(6), microseconds


TimestampType(), microseconds


timestamp(6), microseconds


timestamp(0), seconds, precision loss 3,
time, seconds
time(0), seconds
TimestampType(), microseconds, with time format quirks 4 timestamp(6), microseconds timestamp(0), seconds
time(3), milliseconds
time(6), microseconds
TimestampType(), microseconds, with time format quirks 4 timestamp(6), microseconds timestamp(0), seconds, precision loss 3,

Warning

Note that types in MySQL and Spark have different value ranges:

MySQL type Min value Max value Spark type Min value Max value
year
date
1901
1000-01-01
2155
9999-12-31
DateType() 0001-01-01 9999-12-31
datetime
timestamp
time
1000-01-01 00:00:00.000000
1970-01-01 00:00:01.000000
-838:59:59.000000
9999-12-31 23:59:59.499999
9999-12-31 23:59:59.499999
838:59:59.000000
TimestampType() 0001-01-01 00:00:00.000000 9999-12-31 23:59:59.999999

So Spark can read all the values from MySQL, but not all of values in Spark DataFrame can be written to MySQL.

References:

String types

MySQL type (read) Spark type MySQL type (write) MySQL type (create)
char
char(N)
varchar(N)
mediumtext
text
longtext
json
enum("val1", "val2", ...)
set("val1", "val2", ...)




StringType()




longtext




longtext

Binary types

MySQL type (read) Spark type MySQL type (write) MySQL type (create)
binary
binary(N)
varbinary(N)
mediumblob
blob
longblob



BinaryType()



blob



blob

Geometry types

MySQL type (read) Spark type MySQL type (write) MySQL type (create)
point
linestring
polygon
geometry
multipoint
multilinestring
multipolygon
geometrycollection




BinaryType()




blob




blob

Explicit type cast

DBReader

It is possible to explicitly cast column type using DBReader(columns=...) syntax.

For example, you can use CAST(column AS text) to convert data to string representation on MySQL side, and so it will be read as Spark's StringType().

It is also possible to use JSON_OBJECT MySQL function and parse JSON columns in MySQL with the JSON.parse_column method.

from pyspark.sql.types import IntegerType, StructType, StructField

from onetl.connection import MySQL
from onetl.db import DBReader
from onetl.file.format import JSON

mysql = MySQL(...)

DBReader(
    connection=mysql,
    columns=[
        "id",
        "supported_column",
        "CAST(unsupported_column AS text) unsupported_column_str",
        # or
        "JSON_OBJECT('key', value_column) json_column",
    ],
)
df = reader.run()

json_scheme = StructType([StructField("key", IntegerType())])

df = df.select(
    df.id,
    df.supported_column,
    # explicit cast
    df.unsupported_column_str.cast("integer").alias("parsed_integer"),
    JSON().parse_column("json_column", json_scheme).alias("struct_column"),
)

DBWriter

To write JSON data to a json or text column in a MySQL table, use the JSON.serialize_column method.

from onetl.connection import MySQL
from onetl.db import DBWriter
from onetl.file.format import JSON

mysql.execute(
    """
    CREATE TABLE schema.target_tbl (
        id bigint,
        array_column_json json -- any string type, actually
    )
    ENGINE = InnoDB
    """,
)

df = df.select(
    df.id,
    JSON().serialize_column(df.array_column).alias("array_column_json"),
)

writer.run(df)

Then you can parse this column on MySQL side - for example, by creating a view:

SELECT
    id,
    array_column_json->"$[0]" AS array_item
FROM target_tbl

Or by using GENERATED column:

CREATE TABLE schema.target_table (
    id bigint,
    supported_column timestamp,
    array_column_json json, -- any string type, actually
    -- virtual column
    array_item_0 GENERATED ALWAYS AS (array_column_json->"$[0]")) VIRTUAL
    -- or stired column
    -- array_item_0 GENERATED ALWAYS AS (array_column_json->"$[0]")) STORED
)

VIRTUAL column value is calculated on every table read. STORED column value is calculated during insert, but this require additional space.


  1. This allows to write data to tables with DEFAULT and GENERATED columns - if DataFrame has no such column, it will be populated by MySQL. 

  2. MySQL support decimal types with precision P up to 65.

    But Spark's DecimalType(P, S) supports maximum P=38. It is impossible to read, write or operate with values of larger precision, this leads to an exception. 

  3. MySQL dialect generates DDL with MySQL type timestamp which is alias for timestamp(0) with precision up to seconds (23:59:59). Inserting data with microseconds precision (23:59:59.999999) will lead to throwing away microseconds

  4. time type is the same as timestamp with date 1970-01-01. So instead of reading data from MySQL like 23:59:59 it is actually read 1970-01-01 23:59:59, and vice versa.