Reading from MongoDB using MongoDB.pipeline¶
MongoDB.sql allows passing custom pipeline, but does not support incremental strategies.
Warning
Please take into account Mongodb types
Recommendations¶
Pay attention to pipeline value¶
Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper mongodb.pipeline(..., pipeline={"$match": {"column": {"$eq": "value"}}}) value.
This both reduces the amount of data send from MongoDB to Spark, and may also improve performance of the query.
Especially if there are indexes for columns used in pipeline value.
References¶
pipeline(collection, pipeline=None, df_schema=None, options=None)
¶
Execute a pipeline for a specific collection, and return DataFrame.
Almost like Aggregation pipeline syntax in MongoDB:
db.collection_name.aggregate([{"$match": ...}, {"$group": ...}])
Note
This method does not support Read Strategies, use DBReader instead
Added in 0.7.0
Parameters:
-
collection(str) –Collection name.
-
pipeline(dict | list[dict], default:None) –Pipeline containing a database query. See Aggregation pipeline syntax.
-
df_schema(StructType, default:None) –Schema describing the resulting DataFrame.
-
options(PipelineOptions | dict, default:None) –Additional pipeline options, see MongoDB.PipelineOptions.
Examples:
Get document with a specific field value:
df = connection.pipeline(
collection="collection_name",
pipeline={"$match": {"field": {"$eq": 1}}},
)
df = connection.pipeline(
collection="collection_name",
pipeline={
"$group": {
"_id": 1,
"min": {"$min": "$column_int"},
"max": {"$max": "$column_int"},
}
},
)
from pyspark.sql.types import (
DoubleType,
IntegerType,
StringType,
StructField,
StructType,
TimestampType,
)
df_schema = StructType(
[
StructField("_id", StringType()),
StructField("some_string", StringType()),
StructField("some_int", IntegerType()),
StructField("some_datetime", TimestampType()),
StructField("some_float", DoubleType()),
],
)
df = connection.pipeline(
collection="collection_name",
df_schema=df_schema,
pipeline={"$match": {"some_int": {"$gt": 999}}},
)
df = connection.pipeline(
collection="collection_name",
pipeline={"$match": {"field": {"$eq": 1}}},
options=MongoDB.PipelineOptions(hint={"field": 1}),
)
MongoDBPipelineOptions
¶
Bases: GenericOptions
Aggregation pipeline options for MongoDB connector.
The only difference from [MongoDB.ReadOptions][MongoDBReadOptions]
that latter does not allow to pass the hint parameter.
Warning
Options uri, database, collection, pipeline are populated from connection attributes,
and cannot be overridden by the user in PipelineOptions to avoid issues.
Added in 0.7.0
Examples:
Note
You can pass any value
supported by connector,
even if it is not mentioned in this documentation. Option names should be in camelCase!
The set of supported options depends on connector version.
from onetl.connection import MongoDB
options = MongoDB.PipelineOptions(
hint={"some_field": 1},
)