Skip to content

Reading from Iceberg using DBReader

DBReader supports strategy for incremental data reading, but does not support custom queries, like JOIN.

Supported DBReader features

Warning

columns, where and hwm.expression should be written using SparkSQL syntax.

Examples

Snapshot strategy:

from onetl.connection import Iceberg
from onetl.db import DBReader

iceberg = Iceberg(catalog_name="my_catalog", ...)

reader = DBReader(
    connection=iceberg,
    source="my_schema.table",  # catalog is already defined in connection
    columns=["id", "key", "value", "updated_dt"],
    where="key = 'something'",
)
df = reader.run()

Incremental strategy:

from onetl.connection import Iceberg
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy

iceberg = Iceberg(catalog_name="my_catalog", ...)

reader = DBReader(
    connection=iceberg,
    source="my_schema.table",  # catalog is already defined in connection
    columns=["id", "key", "value", "updated_dt"],
    where="key = 'something'",
    hwm=DBReader.AutoDetectHWM(name="iceberg_hwm", expression="updated_dt"),
)

with IncrementalStrategy():
    df = reader.run()

Recommendations

Select only required columns

Instead of passing "*" in DBReader(columns=[...]) prefer passing exact column names. This drastically reduces the amount of data read by Spark.