Incremental Batch Strategy¶
Bases: BatchHWMStrategy
Incremental batch strategy for DB Reader.
Note
Cannot be used with File Downloader
Same as IncrementalStrategy, but reads data from the source in sequential batches (1..N) like:
1: SELECT id, data
FROM public.mydata
WHERE id > 1000 AND id <= 1100; -- previous HWM value is 1000, step is 100
2: WHERE id > 1100 AND id <= 1200; -- + step
3: WHERE id > 1200 AND id <= 1300; -- + step
N: WHERE id > 1300 AND id <= 1400; -- until stop
Warning
Unlike SnapshotBatchStrategy, it saves current HWM value after each batch into [HWM Store][hwm].
So if code inside the context manager raised an exception, like:
with IncrementalBatchStrategy() as batches:
for _ in batches:
df = reader.run() # something went wrong here
writer.run(df) # or here
# or here...
All of that allows to resume reading process from the last successful batch.
Warning
Not every [DB connection][db-connections] supports batch strategy. For example, Kafka connection doesn't support it. Make sure the connection you use is compatible with the IncrementalBatchStrategy.
Added in 0.1.0
Parameters:
-
step(Any) –Step size used for generating batch SQL queries like:
SELECT id, data FROM public.mydata WHERE id > 1000 AND id <= 1100; -- 1000 is previous HWM value, step is 100Note
Step defines a range of values will be fetched by each batch. This is not a number of rows, it depends on a table content and value distribution across the rows.
Note
stepvalue will be added to the HWM, so it should have a proper type.For example, for
TIMESTAMPcolumnsteptype should bedatetime.timedelta, notint -
stop(Any, default:None) –If passed, the value will be used for generating WHERE clauses with
hwm.expressionfilter, as a stop value for the last batch.If not set, the value is determined by a separated query:
SELECT MAX(id) as stop FROM public.mydata WHERE id > 1000; -- 1000 is previous HWM value (if any)Note
stopshould be the same type ashwm.expressionvalue, e.g.datetime.datetimeforTIMESTAMPcolumn,datetime.dateforDATE, and so on -
offset(Any, default:None) –If passed, the offset value will be used to read rows which appeared in the source after the previous read.
For example, previous incremental run returned rows:
Current HWM value is 1000.898 899 900 1000But since then few more rows appeared in the source:
and you need to read them too.898 899 900 901 # new 902 # new ... 999 # new 1000So you can set
offset=100, so the first batch of a next incremental run will look like:and return rows from 901 (not 900) to 1000 (duplicate).SELECT id, data FROM public.mydata WHERE id > 900 AND id <= 1000; -- 900 = 1000 - 100 = HWM - offsetWarning
This can lead to reading duplicated values from the table. You probably need additional deduplication step to handle them
Note
offsetvalue will be subtracted from the HWM, so it should have a proper type.For example, for
TIMESTAMPcolumnoffsettype should bedatetime.timedelta, notint
Examples:
from onetl.db import DBReader, DBWriter
from onetl.strategy import IncrementalBatchStrategy
reader = DBReader(
connection=postgres,
source="public.mydata",
columns=["id", "data"],
hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
)
writer = DBWriter(connection=hive, target="db.newtable")
with IncrementalBatchStrategy(step=100) as batches:
for _ in batches:
df = reader.run()
writer.run(df)
-- previous HWM value was 1000
-- each batch (1..N) will perform a query which return some part of input data
1: SELECT id, data
FROM public.mydata
WHERE id > 1100 AND id <= 1200; --- from HWM to HWM+step (EXCLUDING first row)
2: WHERE id > 1200 AND id <= 1300; -- + step
N: WHERE id > 1300 AND id <= 1400; -- until max value of HWM column
...
with IncrementalBatchStrategy(step=100, stop=2000) as batches:
for _ in batches:
df = reader.run()
writer.run(df)
-- previous HWM value was 1000
-- each batch (1..N) will perform a query which return some part of input data
1: SELECT id, data
FROM public.mydata
WHERE id > 1000 AND id <= 1100; --- from HWM to HWM+step (EXCLUDING first row)
2: WHERE id > 1100 AND id <= 1200; -- + step
...
N: WHERE id > 1900 AND id <= 2000; -- until stop
...
with IncrementalBatchStrategy(step=100, offset=100) as batches:
for _ in batches:
df = reader.run()
writer.run(df)
-- previous HWM value was 1000
-- each batch (1..N) will perform a query which return some part of input data
1: SELECT id, data
FROM public.mydata
WHERE id > 900 AND id <= 1000; --- from HWM-offset to HWM-offset+step (EXCLUDING first row)
2: WHERE id > 1000 AND id <= 1100; -- + step
3: WHERE id > 1100 AND id <= 1200; -- + step
...
N: WHERE id > 1300 AND id <= 1400; -- until max value of HWM column
...
with IncrementalBatchStrategy(
step=100,
stop=2000,
offset=100,
) as batches:
for _ in batches:
df = reader.run()
writer.run(df)
-- previous HWM value was 1000
-- each batch (1..N) will perform a query which return some part of input data
1: SELECT id, data
FROM public.mydata
WHERE id > 900 AND id <= 1000; --- from HWM-offset to HWM-offset+step (EXCLUDING first row)
2: WHERE id > 1000 AND id <= 1100; -- + step
3: WHERE id > 1100 AND id <= 1200; -- + step
...
N: WHERE id > 1900 AND id <= 2000; -- until stop
hwm.expression, offset and stop can be a date or datetime, not only integer:
from onetl.db import DBReader, DBWriter
from datetime import date, timedelta
reader = DBReader(
connection=postgres,
source="public.mydata",
columns=["business_dt", "data"],
hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="business_dt"),
)
writer = DBWriter(connection=hive, target="db.newtable")
with IncrementalBatchStrategy(
step=timedelta(days=5),
stop=date("2021-01-31"),
offset=timedelta(days=1),
) as batches:
for _ in batches:
df = reader.run()
writer.run(df)
-- previous HWM value was '2021-01-10'
-- each batch (1..N) will perform a query which return some part of input data
1: SELECT business_dt, data
FROM public.mydata
WHERE business_dt > CAST('2021-01-09' AS DATE) -- from HWM-offset (EXCLUDING first row)
AND business_dt <= CAST('2021-01-14' AS DATE); -- to HWM-offset+step
2: WHERE business_dt > CAST('2021-01-14' AS DATE) -- + step
AND business_dt <= CAST('2021-01-19' AS DATE);
3: WHERE business_dt > CAST('2021-01-19' AS DATE) -- + step
AND business_dt <= CAST('2021-01-24' AS DATE);
...
N: WHERE business_dt > CAST('2021-01-29' AS DATE)
AND business_dt <= CAST('2021-01-31' AS DATE); -- until stop