0.13.0 (2025-02-24)¶
🎉 3 years since first release 0.1.0 🎉
Breaking Changes¶
-
Add Python 3.13. support. (#298)
-
Change the logic of
FileConnection.walkandFileConnection.list_dir. (#327)Previously
limits.stops_at(path) == Trueconsidered as \"return current file and stop\", and could lead to exceeding some limit. Not it means \"stop immediately\". -
Change default value for
FileDFWriter.Options(if_exists=...)fromerrortoappend, to make it consistent with other.Options()classes within onETL. (#343)
Features¶
-
Add support for
FileModifiedTimeHWMHWM class (see etl-entities 2.5.0):from etl_entitites.hwm import FileModifiedTimeHWM from onetl.file import FileDownloader from onetl.strategy import IncrementalStrategy downloader = FileDownloader( ..., hwm=FileModifiedTimeHWM(name="somename"), ) with IncrementalStrategy(): downloader.run() -
Introduce
FileSizeRange(min=..., max=...)filter class. (#325)Now users can set
FileDownloader/FileMoverto download/move only files with specific file size range:from onetl.file import FileDownloader from onetl.file.filter import FileSizeRange downloader = FileDownloader( ..., filters=[FileSizeRange(min="10KiB", max="1GiB")], ) -
Introduce
TotalFilesSize(...)limit class. (#326)Now users can set
FileDownloader/FileMoverto stop downloading/moving files after reaching a certain amount of data:from datetime import datetime, timedelta from onetl.file import FileDownloader from onetl.file.limit import TotalFilesSize downloader = FileDownloader( ..., limits=[TotalFilesSize("1GiB")], ) -
Implement
FileModifiedTime(since=..., until=...)file filter. (#330)Now users can set
FileDownloader/FileMoverto download/move only files with specific file modification time:from datetime import datetime, timedelta from onetl.file import FileDownloader from onetl.file.filter import FileModifiedTime downloader = FileDownloader( ..., filters=[FileModifiedTime(before=datetime.now() - timedelta(hours=1))], ) -
Add
SparkS3.get_exclude_packages()andKafka.get_exclude_packages()methods. (#341)Using them allows to skip downloading dependencies not required by this specific connector, or which are already a part of Spark/PySpark:
from onetl.connection import SparkS3, Kafka maven_packages = [ *SparkS3.get_packages(spark_version="3.5.4"), *Kafka.get_packages(spark_version="3.5.4"), ] exclude_packages = SparkS3.get_exclude_packages() + Kafka.get_exclude_packages() spark = ( SparkSession.builder.appName("spark_app_onetl_demo") .config("spark.jars.packages", ",".join(maven_packages)) .config("spark.jars.excludes", ",".join(exclude_packages)) .getOrCreate() )
Improvements¶
-
All DB connections opened by
JDBC.fetch(...),JDBC.execute(...)orJDBC.check()are immediately closed after the statements is executed. (#334)Previously Spark session with
master=local[3]actually opened up to 5 connections to target DB - one forJDBC.check(), another for Spark driver interaction with DB to create tables, and one for each Spark executor. Now only max 4 connections are opened, asJDBC.check()does not hold opened connection.This is important for RDBMS like Postgres or Greenplum where number of connections is strictly limited and limit is usually quite low.
-
Set up
ApplicationName(client info) for Clickhouse, MongoDB, MSSQL, MySQL and Oracle. (#339, #248)Also update
ApplicationNameformat for Greenplum, Postgres, Kafka and SparkS3. Now all connectors have the sameApplicationNameformat:${spark.applicationId} ${spark.appName} onETL/${onetl.version} Spark/${spark.version}The only connections not sending
ApplicationNameare Teradata and FileConnection implementations. -
Now
DB.check()will test connection availability not only on Spark driver, but also from some Spark executor. (#346)This allows to fail immediately if Spark driver host has network access to target DB, but Spark executors have not.
Note
Now
Greenplum.check()requires the same user grants asDBReader(connection=greenplum):-- yes, "writable" for reading data from GP, it's not a mistake ALTER USER username CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist'); -- for both reading and writing to GP -- ALTER USER username CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist') CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');Please ask your Greenplum administrators to provide these grants.
Bug Fixes¶
-
Avoid suppressing Hive Metastore errors while using
DBWriter. (#329)Previously this was implemented as:
try: spark.sql(f"SELECT * FROM {table}") table_exists = True except Exception: table_exists = FalseIf Hive Metastore was overloaded and responded with an exception, it was considered as non-existing table, resulting to full table override instead of append or override only partitions subset.
-
Fix using onETL to write data to PostgreSQL or Greenplum instances behind pgbouncer with
pool_mode=transaction. (#336)Previously
Postgres.check()opened a read-only transaction, pgbouncer changed the entire connection type from read-write to read-only, and whenDBWriter.run(df)executed in read-only connection, producing errors like:org.postgresql.util.PSQLException: ERROR: cannot execute INSERT in a read-only transaction org.postgresql.util.PSQLException: ERROR: cannot execute TRUNCATE TABLE in a read-only transactionAdded a workaround by passing
readOnly=Trueto JDBC params for read-only connections, so pgbouncer may differ read-only and read-write connections properly.After upgrading onETL 0.13.x or higher the same error still may appear of pgbouncer still holds read-only connections and returns them for DBWriter. To this this, user can manually convert read-only connection to read-write:
postgres.execute("BEGIN READ WRITE;") # <-- add this line DBWriter(...).run()After all connections in pgbouncer pool were converted from read-only to read-write, and error fixed, this additional line could be removed.
-
Fix
MSSQL.fetch(...)andMySQL.fetch(...)opened a read-write connection instead of read-only. (#337)Now this is fixed:
MSSQL.fetch(...)establishes connection withApplicationIntent=ReadOnly.MySQL.fetch(...)callsSET SESSION TRANSACTION READ ONLYstatement.
-
Fixed passing multiple filters to
FileDownloaderandFileMover. (#338) If was caused by sorting filters list in internal logging method, butFileFiltersubclasses are not sortable. -
Fix a false warning about a lof of parallel connections to Grenplum. (#342)
Creating Spark session with
.master("local[5]")may open up to 6 connections to Greenplum (=number of Spark executors + 1 for driver), but onETL instead used number of CPU cores on the host as a number of parallel connections.This lead to showing a false warning that number of Greenplum connections is too high, which actually should be the case only if number of executors is higher than 30.
-
Fix MongoDB trying to use current database name as
authSource. (#347)Use default connector value which is
admindatabase. Previous onETL versions could be fixed by:from onetl.connection import MongoDB mongodb = MongoDB( ..., database="mydb", extra={ "authSource": "admin", }, )
Dependencies¶
-
Update DB connectors/drivers to latest versions: (#345)
- Clickhouse
0.6.5→0.7.2 - MongoDB
10.4.0→10.4.1 - MySQL
9.0.0→9.2.0 - Oracle
23.5.0.24.07→23.7.0.25.01 - Postgres
42.7.4→42.7.5
- Clickhouse
Doc only Changes¶
- Split large code examples to tabs. (#344)