0.10.0 (2023-12-18)¶
Breaking Changes¶
- Upgrade
etl-entitiesfrom v1 to v2 (#172).
This implies that HWM classes are now have different internal structure than they used to.
Before:
from etl_entities.old_hwm import IntHWM as OldIntHWM
from etl_entities.source import Column, Table
from etl_entities.process import Process
hwm = OldIntHWM(
process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
source=Table(name="schema.table", instance="postgres://host:5432/db"),
column=Column(name="col1"),
value=123,
)
After:
from etl_entities.hwm import ColumnIntHWM
hwm = ColumnIntHWM(
name="some_unique_name",
description="any value you want",
source="schema.table",
expression="col1",
value=123,
)
Breaking change: If you used HWM classes from etl_entities module, you should rewrite your code to make it compatible with new version.
More details
HWMclasses used by previous onETL versions were moved frometl_entitiestoetl_entities.old_hwmsubmodule. They are here for compatibility reasons, but are planned to be removed inetl-entitiesv3 release.- New
HWMclasses have flat structure instead of nested. - New
HWMclasses have mandatorynameattribute (it was known asqualified_namebefore). - Type aliases used while serializing and deserializing
HWMobjects todictrepresentation were changed too:int→column_int.
To make migration simpler, you can use new method:
old_hwm = OldIntHWM(...)
new_hwm = old_hwm.as_new_hwm()
Which automatically converts all fields from old structure to new one, including qualified_name → name.
-
Breaking changes:
-
Methods
BaseHWMStore.get()andBaseHWMStore.save()were renamed toget_hwm()andset_hwm(). - They now can be used only with new HWM classes from
etl_entities.hwm, old HWM classes are not supported.
If you used them in your code, please update it accordingly.
- YAMLHWMStore CANNOT read files created by older onETL versions (0.9.x or older).
Update procedure
# pip install onetl==0.9.5
# Get qualified_name for HWM
# Option 1. HWM is built manually
from etl_entities import IntHWM, FileListHWM
from etl_entities.source import Column, Table, RemoteFolder
from etl_entities.process import Process
# for column HWM
old_column_hwm = IntHWM(
process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
source=Table(name="schema.table", instance="postgres://host:5432/db"),
column=Column(name="col1"),
)
qualified_name = old_column_hwm.qualified_name
# "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost"
# for file HWM
old_file_hwm = FileListHWM(
process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
source=RemoteFolder(name="/absolute/path", instance="ftp://ftp.server:21"),
)
qualified_name = old_file_hwm.qualified_name
# "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost"
# Option 2. HWM is generated automatically (by DBReader/FileDownloader)
# See onETL logs and search for string like qualified_name = '...'
qualified_name = "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost"
# Get .yml file path by qualified_name
import os
from pathlib import PurePosixPath
from onetl.hwm.store import YAMLHWMStore
# here you should pass the same arguments as used on production, if any
yaml_hwm_store = YAMLHWMStore()
hwm_path = yaml_hwm_store.get_file_path(qualified_name)
print(hwm_path)
# for column HWM
# LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/col1__schema.table__postgres_host_5432_db__cde.abc.myprocess__myhost.yml')
# for file HWM
# LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/file_list__absolute_path__ftp_ftp.server_21__cde.abc.myprocess__myhost.yml')
# Read raw .yml file content
from yaml import safe_load, dump
raw_old_hwm_items = safe_load(hwm_path.read_text())
print(raw_old_hwm_items)
# for column HWM
# [
# {
# "column": { "name": "col1", "partition": {} },
# "modified_time": "2023-12-18T10: 39: 47.377378",
# "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
# "source": { "instance": "postgres: //host:5432/db", "name": "schema.table" },
# "type": "int",
# "value": "123",
# },
# ]
# for file HWM
# [
# {
# "modified_time": "2023-12-18T11:15:36.478462",
# "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
# "source": { "instance": "ftp://ftp.server:21", "name": "/absolute/path" },
# "type": "file_list",
# "value": ["file1.txt", "file2.txt"],
# },
# ]
# Convert file content to new structure, compatible with onETL 0.10.x
raw_new_hwm_items = []
for old_hwm in raw_old_hwm_items:
new_hwm = {"name": qualified_name, "modified_time": old_hwm["modified_time"]}
if "column" in old_hwm:
new_hwm["expression"] = old_hwm["column"]["name"]
new_hwm["entity"] = old_hwm["source"]["name"]
old_hwm.pop("process", None)
if old_hwm["type"] == "int":
new_hwm["type"] = "column_int"
new_hwm["value"] = old_hwm["value"]
elif old_hwm["type"] == "date":
new_hwm["type"] = "column_date"
new_hwm["value"] = old_hwm["value"]
elif old_hwm["type"] == "datetime":
new_hwm["type"] = "column_datetime"
new_hwm["value"] = old_hwm["value"]
elif old_hwm["type"] == "file_list":
new_hwm["type"] = "file_list"
new_hwm["value"] = [
os.fspath(PurePosixPath(old_hwm["source"]["name"]).joinpath(path))
for path in old_hwm["value"]
]
else:
raise ValueError("WAT?")
raw_new_hwm_items.append(new_hwm)
print(raw_new_hwm_items)
# for column HWM
# [
# {
# "name": "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost",
# "modified_time": "2023-12-18T10:39:47.377378",
# "expression": "col1",
# "source": "schema.table",
# "type": "column_int",
# "value": 123,
# },
# ]
# for file HWM
# [
# {
# "name": "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost",
# "modified_time": "2023-12-18T11:15:36.478462",
# "entity": "/absolute/path",
# "type": "file_list",
# "value": ["/absolute/path/file1.txt", "/absolute/path/file2.txt"],
# },
# ]
# Save file with new content
with open(hwm_path, "w") as file:
dump(raw_new_hwm_items, file)
# Stop Python interpreter and update onETL
# pip install onetl==0.10.0
# Check that new .yml file can be read
from onetl.hwm.store import YAMLHWMStore
qualified_name = ...
# here you should pass the same arguments as used on production, if any
yaml_hwm_store = YAMLHWMStore()
yaml_hwm_store.get_hwm(qualified_name)
# for column HWM
# ColumnIntHWM(
# name='col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost',
# description='',
# entity='schema.table',
# value=123,
# expression='col1',
# modified_time=datetime.datetime(2023, 12, 18, 10, 39, 47, 377378),
# )
# for file HWM
# FileListHWM(
# name='file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost',
# description='',
# entity=AbsolutePath('/absolute/path'),
# value=frozenset({AbsolutePath('/absolute/path/file1.txt'), AbsolutePath('/absolute/path/file2.txt')}),
# expression=None,
# modified_time=datetime.datetime(2023, 12, 18, 11, 15, 36, 478462)
# )
# That's all!
But most of users use other HWM store implementations which do not have such issues.
- Several classes and functions were moved from
onetltoetl_entities:
from onetl.hwm.store import (
detect_hwm_store,
BaseHWMStore,
HWMStoreClassRegistry,
register_hwm_store_class,
HWMStoreManager,
MemoryHWMStore,
)
from etl_entities.hwm_store import (
detect_hwm_store,
BaseHWMStore,
HWMStoreClassRegistry,
register_hwm_store_class,
HWMStoreManager,
MemoryHWMStore,
)
They still can be imported from old module, but this is deprecated and will be removed in v1.0.0 release.
- Change the way of passing
HWMtoDBReaderandFileDownloaderclasses:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | |
New HWM classes have mandatory name attribute which should be passed explicitly,
instead of generating if automatically under the hood.
Automatic name generation using the old DBReader.hwm_column / FileDownloader.hwm_type
syntax is still supported, but will be removed in v1.0.0 release. (#179)
- Performance of read Incremental and Batch strategies has been drastically improved. (#182).
Before and after in details
DBReader.run() + incremental/batch strategy behavior in versions 0.9.x and older:
- Get table schema by making query
SELECT * FROM table WHERE 1=0(ifDBReader.columnshas `*``) - Expand
*`` to real column names from table, add herehwm_column`, remove duplicates (as some RDBMS does not allow that). - Create dataframe from query like
SELECT hwm_expression AS hwm_column, ...other table columns... FROM table WHERE hwm_expression > prev_hwm.value. - Determine HWM class using dataframe schema:
df.schema[hwm_column].dataType. - Determine x HWM column value using Spark:
df.select(max(hwm_column)).collect(). - Use
max(hwm_column)as next HWM value, and save it to HWM Store. - Return dataframe to user.
This was far from ideal:
-
Dataframe content (all rows or just changed ones) was loaded from the source to Spark only to get min/max values of specific column.
-
Step of fetching table schema and then substituting column names in the next query caused some unexpected errors.
For example, source contains columns with mixed name case, like
"CamelColumn"or"spaced column".Column names were not escaped during query generation, leading to queries that cannot be executed by database.
So users have to explicitly pass column names
DBReader, wrapping columns with mixed naming with `"``:reader = DBReader( connection=..., source=..., columns=[ # passing '*' here leads to wrong SQL query generation "normal_column", '"CamelColumn"', '"spaced column"', ..., ], ) -
Using
DBReaderwithIncrementalStrategycould lead to reading rows already read before.Dataframe was created from query with WHERE clause like
hwm.expression > prev_hwm.value, nothwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value.So if new rows appeared in the source after HWM value is determined, they can be read by accessing dataframe content (because Spark dataframes are lazy), leading to inconsistencies between HWM value and dataframe content.
This may lead to issues then
DBReader.run()read some data, updated HWM value, and next call of `DBReader.run()`` will read rows that were already read in previous run.
DBReader.run() + incremental/batch strategy behavior in versions 0.10.x and newer:
- Detect type of HWM expression:
SELECT hwm.expression FROM table WHERE 1=0. - Determine corresponding Spark type
df.schema[0]and when determine matching HWM class (ifDBReader.AutoDetectHWMis used). - Get min/max values by querying the source:
SELECT MAX(hwm.expression) FROM table WHERE hwm.expression >= prev_hwm.value. - Use
max(hwm.expression)as next HWM value, and save it to HWM Store. - Create dataframe from query
SELECT ... table columns ... FROM table WHERE hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value, baking new HWM value into the query. - Return dataframe to user.
Improvements:
- Allow source to calculate min/max instead of loading everything to Spark. This should be faster on large amounts of data (up to x2), because we do not transfer all the data from the source to Spark. This can be even faster if source have indexes for HWM column.
- Columns list is passed to source as-is, without any resolving on
DBReaderside. So you can passDBReader(columns=["*"])to read tables with mixed columns naming. - Restrict dataframe content to always match HWM values, which leads to never reading the same row twice.
Breaking change: HWM column is not being implicitly added to dataframe. It was a part of SELECT clause, but now it is mentioned only in WHERE clause.
So if you had code like this, you have to rewrite it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | |
But most users just use columns=["*"] anyway, they won't see any changes.
FileDownloader.run()now updates HWM in HWM Store not after each file is being successfully downloaded, but after all files were handled.
This is because:
- FileDownloader can be used with
DownloadOptions(workers=N), which could lead to race condition - one thread can save to HWM store one HWM value when another thread will save different value. - FileDownloader can download hundreds and thousands of files, and issuing a request to HWM Store for each file could potentially DDoS HWM Store. (#189)
There is a exception handler which tries to save HWM to HWM store if download process was interrupted. But if it was interrupted by force, like sending SIGKILL event,
HWM will not be saved to HWM store, so some already downloaded files may be downloaded again next time.
But unexpected process kill may produce other negative impact, like some file will be downloaded partially, so this is an expected behavior.
Features¶
- Add Python 3.12 compatibility. (#167)
Excelfile format now can be used with Spark 3.5.0. (#187)SnapshotBatchStagyandIncrementalBatchStrategydoes no raise exceptions if source does not contain any data. Instead they stop at first iteration and return empty dataframe. (#188)- Cache result of
connection.check()in high-level classes likeDBReader,FileDownloaderand so on. This makes logs less verbose. (#190)
Bug Fixes¶
- Fix
@slotand@hookdecorators returning methods with missing arguments in signature (Pylance, VS Code). (#183) - Kafka connector documentation said that it does support reading topic data incrementally by passing
group.idorgroupIdPrefix. Actually, this is not true, because Spark does not send information to Kafka which messages were consumed. So currently users can only read the whole topic, no incremental reads are supported.