DB Reader¶
Bases: FrozenModel
Allows you to read data from a table with specified database connection
and parameters, and return its content as Spark dataframe.
Note
DBReader can return different results depending on Read Strategies
Note
This class operates with only one source at a time. It does NOT support executing queries
to multiple source, like SELECT ... JOIN.
Added in 0.1.0
Changed in 0.8.0
Moved onetl.core.DBReader → onetl.db.DBReader
Parameters:
-
connection(BaseDBConnection) –Class which contains DB connection properties. See DB Connections section
-
source(str) –Table/collection/etc name to read data from.
If connection has schema support, you need to specify the full name of the source including the schema, e.g.
schema.name.Changed in 0.7.0
Renamed
table→source -
columns(list of str, default:None) –The list of columns to be read.
If RDBMS supports any kind of expressions, you can pass them too.
columns = [ "mycolumn", "another_column as alias", "count(*) over ()", "some(function) as alias2", ]Note
Some sources does not have columns.
Note
It is recommended to pass column names explicitly to avoid selecting too many columns, and to avoid adding unexpected columns to dataframe if source DDL is changed.
Deprecated since 0.10.0
Syntax
DBReader(columns="col1, col2")(string instead of list) is not supported, and will be removed in v1.0.0 -
where(Any, default:None) –Custom
wherefor SQL query or MongoDB pipeline.wheresyntax depends on the source. For example, SQL sources acceptwhereas a string, but MongoDB sources acceptwhereas a dictionary.# SQL database connection where = "column_1 > 2" # MongoDB connection where = { "col_1": {"$gt": 1, "$lt": 100}, "col_2": {"$gt": 2}, "col_3": {"$eq": "hello"}, }Note
Some sources does not support data filtering.
-
hwm(type[HWM] | None, default:None) –HWM class to be used as HWM value.
HWM value will be fetched usinghwm = DBReader.AutoDetectHWM( name="some_unique_hwm_name", expression="hwm_column", )hwm_columnSQL query.If you want to use some SQL expression as HWM value, you can use it as well:
hwm = DBReader.AutoDetectHWM( name="some_unique_hwm_name", expression="cast(hwm_column_orig as date)", )Note
Some sources does not support passing expressions and can be used only with column/field names which present in the source.
Changed in 0.10.0
Replaces deprecated
hwm_columnandhwm_expressionattributes -
hint(Any, default:None) –Hint expression used for querying the data.
hintsyntax depends on the source. For example, SQL sources accepthintas a string, but MongoDB sources accepthintas a dictionary.# SQL database connection hint = "index(myschema.mytable mycolumn)" # MongoDB connection hint = { "mycolumn": 1, }Note
Some sources does not support hints.
-
df_schema(StructType, default:None) –Spark DataFrame schema, used for proper type casting of the rows.
from pyspark.sql.types import ( DoubleType, IntegerType, StringType, StructField, StructType, TimestampType, ) df_schema = StructType( [ StructField("_id", IntegerType()), StructField("text_string", StringType()), StructField("hwm_int", IntegerType()), StructField("hwm_datetime", TimestampType()), StructField("float_value", DoubleType()), ], ) reader = DBReader( connection=connection, source="fiddle.dummy", df_schema=df_schema, )Note
Some sources does not support passing dataframe schema.
-
options(dict | ReadOptions | None, default:None) –Spark read options, like partitioning mode.
Postgres.ReadOptions( partitioningMode="hash", partitionColumn="some_column", numPartitions=20, fetchsize=1000, )Note
Some sources does not support reading options.
Examples:
from onetl.db import DBReader
from onetl.connection import Postgres
postgres = Postgres(...)
# create reader
reader = DBReader(connection=postgres, source="fiddle.dummy")
# read data from table "fiddle.dummy"
df = reader.run()
from onetl.connection import Postgres
from onetl.db import DBReader
postgres = Postgres(...)
options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100")
# create reader and pass some options to the underlying connection object
reader = DBReader(connection=postgres, source="fiddle.dummy", options=options)
# read data from table "fiddle.dummy"
df = reader.run()
from onetl.db import DBReader
from onetl.connection import Postgres
postgres = Postgres(...)
options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100")
# create reader with specific columns, rows filter
reader = DBReader(
connection=postgres,
source="default.test",
where="d_id > 100",
hint="NOWAIT",
columns=["d_id", "d_name", "d_age"],
options=options,
)
# read data from table "fiddle.dummy"
df = reader.run()
See Read Strategies for more examples
from onetl.strategy import IncrementalStrategy
...
reader = DBReader(
connection=postgres,
source="fiddle.dummy",
hwm=DBReader.AutoDetectHWM( # mandatory for IncrementalStrategy
name="some_unique_hwm_name",
expression="d_age",
),
)
# read data from table "fiddle.dummy"
# but only with new rows (`WHERE d_age > previous_hwm_value`)
with IncrementalStrategy():
df = reader.run()
run()
¶
Reads data from source table and saves as Spark dataframe.
Note
This method can return different results depending on Read Strategies
Warning
If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.
Added in 0.1.0
Returns:
-
df(DataFrame) –Spark dataframe
Examples:
Read data to Spark dataframe:
df = reader.run()
has_data()
¶
Returns True if there is some data in the source, False otherwise.
Note
This method can return different results depending on Read Strategies
Warning
If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.
Added in 0.10.0
Raises:
-
RuntimeError–Current strategy is not compatible with HWM parameter.
Examples:
reader = DBReader(...)
# handle situation when there is no data in the source
if reader.has_data():
df = reader.run()
else:
# implement your handling logic here
...
raise_if_no_data()
¶
Raises exception NoDataError if source does not contain any data.
Note
This method can return different results depending on Read Strategies
Warning
If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.
Added in 0.10.0
Raises:
-
RuntimeError–Current strategy is not compatible with HWM parameter.
-
NoDataError–There is no data in source.
Examples:
reader = DBReader(...)
# ensure that there is some data in the source before reading it using Spark
reader.raise_if_no_data()