Skip to content

Reading from Oracle using Oracle.sql

Oracle.sql allows passing custom SQL query, but does not support incremental strategies.

Warning

Please take into account Oracle types

Warning

Statement is executed in read-write connection, so if you're calling some functions/procedures with DDL/DML statements inside, they can change data in your database.

Syntax support

Only queries with the following syntax are supported:

  • ✅︎ SELECT ... FROM ...
  • ✅︎ WITH alias AS (...) SELECT ...
  • SHOW ...
  • SET ...; SELECT ...; - multiple statements not supported

Examples

from onetl.connection import Oracle

oracle = Oracle(...)
df = oracle.sql(
    """
    SELECT
        id,
        key,
        CAST(value AS VARCHAR2(4000)) value,
        updated_at
    FROM
        some.mytable
    WHERE
        key = 'something'
    """,
    options=Oracle.SQLOptions(
        partitionColumn="id",
        numPartitions=10,
        lowerBound=0,
        upperBound=1000,
    ),
)

Recommendations

Select only required columns

Instead of passing SELECT * FROM ... prefer passing exact column names SELECT col1, col2, .... This reduces the amount of data passed from Oracle to Spark.

Pay attention to where value

Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper WHERE column = 'value' clause. This both reduces the amount of data send from Oracle to Spark, and may also improve performance of the query. Especially if there are indexes or partitions for columns used in where clause.

Options

OracleSQLOptions

Bases: JDBCSQLOptions

fetchsize = 100000 class-attribute instance-attribute

Fetch N rows from an opened cursor per one read round.

Tuning this option can influence performance of reading.

Warning

Default value is different from Spark.

Spark uses driver's own value, and it may be different in different drivers, and even versions of the same driver. For example, Oracle has default fetchsize=10, which is absolutely not usable.

Thus we've overridden default value with 100_000, which should increase reading performance.

Changed in 0.2.0

Set explicit default value to 100_000

lower_bound = Field(default=None, alias='lowerBound') class-attribute instance-attribute

Defines the lower boundary for partitioning the query's data. Mandatory if partition_column is set

num_partitions = Field(default=None, alias='numPartitions') class-attribute instance-attribute

Number of jobs created by Spark to read the table content in parallel.

partition_column = Field(default=None, alias='partitionColumn') class-attribute instance-attribute

Column used to partition data across multiple executors for parallel query processing.

Warning

It is highly recommended to use primary key, or column with an index to avoid performance issues.

Example of using partitionColumn="id" with partitioning_mode="range"
-- If partition_column is 'id', with numPartitions=4, lowerBound=1, and upperBound=100:
-- Executor 1 processes IDs from 1 to 25
SELECT ... FROM table WHERE id >= 1 AND id < 26
-- Executor 2 processes IDs from 26 to 50
SELECT ... FROM table WHERE id >= 26 AND id < 51
-- Executor 3 processes IDs from 51 to 75
SELECT ... FROM table WHERE id >= 51 AND id < 76
-- Executor 4 processes IDs from 76 to 100
SELECT ... FROM table WHERE id >= 76 AND id <= 100


-- General case for Executor N
SELECT ... FROM table
WHERE partition_column >= (lowerBound + (N-1) * stride)
AND partition_column <= upperBound
-- Where `stride` is calculated as `(upperBound - lowerBound) / numPartitions`.

query_timeout = Field(default=None, alias='queryTimeout') class-attribute instance-attribute

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

session_init_statement = Field(default=None, alias='sessionInitStatement') class-attribute instance-attribute

After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block).

Use this to implement session initialization code.

Example:

sessionInitStatement = """
    BEGIN
        execute immediate
        'alter session set "_serial_direct_read"=true';
    END;
"""

upper_bound = Field(default=None, alias='upperBound') class-attribute instance-attribute

Sets the lower boundary for data partitioning. Mandatory if partition_column is set

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised