Skip to content

DB Reader

Bases: FrozenModel

Allows you to read data from a table with specified database connection and parameters, and return its content as Spark dataframe. support hooks

Note

DBReader can return different results depending on Read Strategies

Note

This class operates with only one source at a time. It does NOT support executing queries to multiple source, like SELECT ... JOIN.

Added in 0.1.0

Changed in 0.8.0

Moved onetl.core.DBReaderonetl.db.DBReader

Parameters:

  • connection (BaseDBConnection) –

    Class which contains DB connection properties. See DB Connections section

  • source (str) –

    Table/collection/etc name to read data from.

    If connection has schema support, you need to specify the full name of the source including the schema, e.g. schema.name.

    Changed in 0.7.0

    Renamed tablesource

  • columns (list of str, default: None ) –

    The list of columns to be read.

    If RDBMS supports any kind of expressions, you can pass them too.

    columns = [
        "mycolumn",
        "another_column as alias",
        "count(*) over ()",
        "some(function) as alias2",
    ]
    

    Note

    Some sources does not have columns.

    Note

    It is recommended to pass column names explicitly to avoid selecting too many columns, and to avoid adding unexpected columns to dataframe if source DDL is changed.

    Deprecated since 0.10.0

    Syntax DBReader(columns="col1, col2") (string instead of list) is not supported, and will be removed in v1.0.0

  • where (Any, default: None ) –

    Custom where for SQL query or MongoDB pipeline.

    where syntax depends on the source. For example, SQL sources accept where as a string, but MongoDB sources accept where as a dictionary.

    # SQL database connection
    where = "column_1 > 2"
    
    # MongoDB connection
    where = {
        "col_1": {"$gt": 1, "$lt": 100},
        "col_2": {"$gt": 2},
        "col_3": {"$eq": "hello"},
    }
    

    Note

    Some sources does not support data filtering.

  • hwm (type[HWM] | None, default: None ) –

    HWM class to be used as HWM value.

    hwm = DBReader.AutoDetectHWM(
        name="some_unique_hwm_name",
        expression="hwm_column",
    )
    
    HWM value will be fetched using hwm_column SQL query.

    If you want to use some SQL expression as HWM value, you can use it as well:

    hwm = DBReader.AutoDetectHWM(
        name="some_unique_hwm_name",
        expression="cast(hwm_column_orig as date)",
    )
    

    Note

    Some sources does not support passing expressions and can be used only with column/field names which present in the source.

    Changed in 0.10.0

    Replaces deprecated hwm_column and hwm_expression attributes

  • hint (Any, default: None ) –

    Hint expression used for querying the data.

    hint syntax depends on the source. For example, SQL sources accept hint as a string, but MongoDB sources accept hint as a dictionary.

    # SQL database connection
    hint = "index(myschema.mytable mycolumn)"
    
    # MongoDB connection
    hint = {
        "mycolumn": 1,
    }
    

    Note

    Some sources does not support hints.

  • df_schema (StructType, default: None ) –

    Spark DataFrame schema, used for proper type casting of the rows.

    from pyspark.sql.types import (
        DoubleType,
        IntegerType,
        StringType,
        StructField,
        StructType,
        TimestampType,
    )
    
    df_schema = StructType(
        [
            StructField("_id", IntegerType()),
            StructField("text_string", StringType()),
            StructField("hwm_int", IntegerType()),
            StructField("hwm_datetime", TimestampType()),
            StructField("float_value", DoubleType()),
        ],
    )
    
    reader = DBReader(
        connection=connection,
        source="fiddle.dummy",
        df_schema=df_schema,
    )
    

    Note

    Some sources does not support passing dataframe schema.

  • options (dict | ReadOptions | None, default: None ) –

    Spark read options, like partitioning mode.

    Postgres.ReadOptions(
        partitioningMode="hash",
        partitionColumn="some_column",
        numPartitions=20,
        fetchsize=1000,
    )
    

    Note

    Some sources does not support reading options.

Examples:

from onetl.db import DBReader
from onetl.connection import Postgres

postgres = Postgres(...)

# create reader
reader = DBReader(connection=postgres, source="fiddle.dummy")

# read data from table "fiddle.dummy"
df = reader.run()
from onetl.connection import Postgres
from onetl.db import DBReader

postgres = Postgres(...)
options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100")

# create reader and pass some options to the underlying connection object
reader = DBReader(connection=postgres, source="fiddle.dummy", options=options)

# read data from table "fiddle.dummy"
df = reader.run()
from onetl.db import DBReader
from onetl.connection import Postgres

postgres = Postgres(...)
options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100")

# create reader with specific columns, rows filter
reader = DBReader(
    connection=postgres,
    source="default.test",
    where="d_id > 100",
    hint="NOWAIT",
    columns=["d_id", "d_name", "d_age"],
    options=options,
)

# read data from table "fiddle.dummy"
df = reader.run()

See Read Strategies for more examples

from onetl.strategy import IncrementalStrategy

...

reader = DBReader(
    connection=postgres,
    source="fiddle.dummy",
    hwm=DBReader.AutoDetectHWM(  # mandatory for IncrementalStrategy
        name="some_unique_hwm_name",
        expression="d_age",
    ),
)

# read data from table "fiddle.dummy"
# but only with new rows (`WHERE d_age > previous_hwm_value`)
with IncrementalStrategy():
    df = reader.run()

run()

Reads data from source table and saves as Spark dataframe. support hooks

Note

This method can return different results depending on Read Strategies

Warning

If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Added in 0.1.0

Returns:

  • df ( DataFrame ) –

    Spark dataframe

Examples:

Read data to Spark dataframe:

df = reader.run()

has_data()

Returns True if there is some data in the source, False otherwise. support hooks

Note

This method can return different results depending on Read Strategies

Warning

If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Added in 0.10.0

Raises:

  • RuntimeError

    Current strategy is not compatible with HWM parameter.

Examples:

reader = DBReader(...)

# handle situation when there is no data in the source
if reader.has_data():
    df = reader.run()
else:
    # implement your handling logic here
    ...

raise_if_no_data()

Raises exception NoDataError if source does not contain any data. support hooks

Note

This method can return different results depending on Read Strategies

Warning

If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Added in 0.10.0

Raises:

  • RuntimeError

    Current strategy is not compatible with HWM parameter.

  • NoDataError

    There is no data in source.

Examples:

reader = DBReader(...)

# ensure that there is some data in the source before reading it using Spark
reader.raise_if_no_data()