Skip to content

MongoDB <-> Spark type mapping

Note

The results below are valid for Spark 3.5.8, and may differ on other Spark versions.

Type detection & casting

Spark's DataFrames always have a schema which is a list of fields with corresponding Spark types. All operations on a field are performed using field type.

MongoDB is, by design, __schemaless__. So there are 2 ways how this can be handled:

  • User provides DataFrame schema explicitly:
See example
from onetl.connection import MongoDB
from onetl.db import DBReader

from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    TimestampType,
)

mongodb = MongoDB(...)

df_schema = StructType(
    [
        StructField("_id", StringType()),
        StructField("some", StringType()),
        StructField(
            "field",
            StructType(
                [
                    StructField("nested", IntegerType()),
                ]
            ),
        ),
    ]
)

reader = DBReader(
    connection=mongodb,
    source="some_collection",
    df_schema=df_schema,
)
df = reader.run()

# or

df = mongodb.pipeline(
    collection="some_collection",
    df_schema=df_schema,
)
  • Rely on MongoDB connector schema infer:

    df = mongodb.pipeline(collection="some_collection")
    

In this case MongoDB connector read a sample of collection documents, and build DataFrame schema based on document fields and values.

It is highly recommended to pass df_schema explicitly, to avoid type conversion issues.

References

Here you can find source code with type conversions:

Supported types

See official documentation

Numeric types

MongoDB type (read) Spark type MongoDB type (write)
Decimal128 DecimalType(P=34, S=32) Decimal128
-
Double
FloatType()
DoubleType()
Double
-
-
Int32
ByteType()
ShortType()
IntegerType()
Int32
Int64 LongType() Int64

Temporal types

MongoDB type (read) Spark type MongoDB type (write)
- DateType(), days Date, milliseconds
Date, milliseconds TimestampType(), microseconds Date, milliseconds, precision loss 1
Timestamp, seconds TimestampType(), microseconds Date, milliseconds
-
-
TimestampNTZType()
DayTimeIntervalType()
unsupported

Warning

Note that types in MongoDB and Spark have different value ranges:

MongoDB type Min value Max value Spark type Min value Max value
Date
Timestamp
-290 million years
1970-01-01 00:00:00
290 million years
2106-02-07 09:28:16
TimestampType() 0001-01-01 00:00:00.000000 9999-12-31 23:59:59.999999

So not all values can be read from MongoDB to Spark, and can written from Spark DataFrame to MongoDB.

References:

String types

Note: fields of deprecated MongoDB type Symbol are excluded during read.

MongoDB type (read) Spark type MongoDB type (write)
String
Code
RegExp
StringType() String

Binary types

MongoDB type (read) Spark type MongoDB type (write)
Boolean BooleanType() Boolean
Binary BinaryType() Binary

Struct types

MongoDB type (read) Spark type MongoDB type (write)
Array[T] ArrayType(T) Array[T]
Object[...]
-
StructType([...])
MapType(...)
Object[...]

Special types

MongoDB type (read) Spark type MongoDB type (write)
ObjectId
MaxKey
MinKey


StringType()


String
Null
Undefined
NullType() Null
DBRef StructType([$ref: StringType(), $id: StringType()]) Object[$ref: String, $id: String]

Explicit type cast

DBReader

Currently it is not possible to cast field types using DBReader. But this can be done using MongoDB.pipeline.

MongoDB.pipeline

You can use $project aggregation to cast field types:

from pyspark.sql.types import IntegerType, StructField, StructType

from onetl.connection import MongoDB
from onetl.db import DBReader

mongodb = MongoDB(...)

df = mongodb.pipeline(
    collection="my_collection",
    pipeline=[
        {
            "$project": {
                # convert unsupported_field to string
                "unsupported_field_str": {
                    "$convert": {
                        "input": "$unsupported_field",
                        "to": "string",
                    },
                },
                # skip unsupported_field from result
                "unsupported_field": 0,
            }
        }
    ],
)

# cast field content to proper Spark type
df = df.select(
    df.id,
    df.supported_field,
    # explicit cast
    df.unsupported_field_str.cast("integer").alias("parsed_integer"),
)

DBWriter

Convert dataframe field to string on Spark side, and then write it to MongoDB:

df = df.select(
    df.id,
    df.unsupported_field.cast("string").alias("array_field_json"),
)

writer.run(df)

  1. MongoDB Date type has precision up to milliseconds (23:59:59.999). Inserting data with microsecond precision (23:59:59.999999) will lead to throwing away microseconds