Skip to content

YAML HWM Store

Bases: BaseHWMStore, FrozenModel

"YAML local store for HWM values. Used as default HWM store. support hooks

Parameters:

  • path (Path | str) –

    Folder name there HWM value files will be stored.

    Default:

    • ~/.local/share/onETL/yml_hwm_store on Linux
    • C:\Documents and Settings\<User>\Application Data\oneTools\onETL\yml_hwm_store on Windows
    • ~/Library/Application Support/onETL/yml_hwm_store on MacOS
  • encoding (str, default: utf-8 ) –

    Encoding of files with HWM value

Examples:

Default parameters

from onetl.connection import Hive, Postgres
from onetl.db import DBReader, DBWriter
from onetl.strategy import IncrementalStrategy
from onetl.hwm.store import YAMLHWMStore

postgres = Postgres(...)
hive = Hive(...)

reader = DBReader(
    connection=postgres,
    source="public.mydata",
    columns=["id", "data"],
    hwm=DBReader.AutoDetectHWM(name="some_unique_name", expression="id"),
)

writer = DBWriter(connection=hive, target="db.newtable")

with YAMLHWMStore():
    with IncrementalStrategy():
        df = reader.run()
        writer.run(df)

# will create file
# "~/.local/share/onETL/id__public.mydata__postgres_postgres.domain.com_5432__myprocess__myhostname.yml"
# with encoding="utf-8" and save a serialized HWM values to this file
With all options

with YAMLHWMStore(path="/my/store", encoding="utf-8"):
    with IncrementalStrategy():
        df = reader.run()
        writer.run(df)

# will create file
# "/my/store/id__public.mydata__postgres_postgres.domain.com_5432__myprocess__myhostname.yml"
# with encoding="utf-8" and save a serialized HWM values to this file
File content example:

- column:
    name: id
    partition: {}
  modified_time: '2023-02-11T17:10:49.659019'
  process:
      dag: ''
      host: myhostname
      name: myprocess
      task: ''
  source:
      db: public
      instance: postgres://postgres.domain.com:5432/target_database
      name: mydata
  type: int
  value: '1500'
- column:
      name: id
      partition: {}
  modified_time: '2023-02-11T16:00:31.962150'
  process:
      dag: ''
      host: myhostname
      name: myprocess
      task: ''
  source:
      db: public
      instance: postgres://postgres.domain.com:5432/target_database
      name: mydata
  type: int
  value: '1000'

get_hwm(name)

Get HWM by name from the YAML store. Returns None if not found.

set_hwm(hwm)

Save HWM to the YAML store. Returns path to the file where HWM is stored.