Reading from MSSQL using MSSQL.sql¶
MSSQL.sql allows passing custom SQL query, but does not support incremental strategies.
Warning
Please take into account MSSQL types
Warning
Statement is executed in read-write connection, so if you're calling some functions/procedures with DDL/DML statements inside, they can change data in your database.
Syntax support¶
Only queries with the following syntax are supported:
- ✅︎
SELECT ... FROM ... - ❌
WITH alias AS (...) SELECT ... - ❌
SET ...; SELECT ...;- multiple statements not supported
Examples¶
from onetl.connection import MSSQL
mssql = MSSQL(...)
df = mssql.sql(
"""
SELECT
id,
key,
CAST(value AS text) value,
updated_at
FROM
some.mytable
WHERE
key = 'something'
""",
options=MSSQL.SQLOptions(
partitionColumn="id",
numPartitions=10,
lowerBound=0,
upperBound=1000,
),
)
Recommendations¶
Select only required columns¶
Instead of passing SELECT * FROM ... prefer passing exact column names SELECT col1, col2, ....
This reduces the amount of data passed from MSSQL to Spark.
Pay attention to where value¶
Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper WHERE column = 'value' clause.
This both reduces the amount of data send from MSSQL to Spark, and may also improve performance of the query.
Especially if there are indexes or partitions for columns used in where clause.
Options¶
MSSQLSQLOptions
¶
Bases: JDBCSQLOptions
fetchsize = 100000
class-attribute
instance-attribute
¶
Fetch N rows from an opened cursor per one read round.
Tuning this option can influence performance of reading.
Warning
Default value is different from Spark.
Spark uses driver's own value, and it may be different in different drivers,
and even versions of the same driver. For example, Oracle has
default fetchsize=10, which is absolutely not usable.
Thus we've overridden default value with 100_000, which should increase reading performance.
Changed in 0.2.0
Set explicit default value to 100_000
lower_bound = Field(default=None, alias='lowerBound')
class-attribute
instance-attribute
¶
Defines the lower boundary for partitioning the query's data. Mandatory if partition_column is set
num_partitions = Field(default=None, alias='numPartitions')
class-attribute
instance-attribute
¶
Number of jobs created by Spark to read the table content in parallel.
partition_column = Field(default=None, alias='partitionColumn')
class-attribute
instance-attribute
¶
Column used to partition data across multiple executors for parallel query processing.
Warning
It is highly recommended to use primary key, or column with an index to avoid performance issues.
Example of using partitionColumn="id" with partitioning_mode="range"
-- If partition_column is 'id', with numPartitions=4, lowerBound=1, and upperBound=100:
-- Executor 1 processes IDs from 1 to 25
SELECT ... FROM table WHERE id >= 1 AND id < 26
-- Executor 2 processes IDs from 26 to 50
SELECT ... FROM table WHERE id >= 26 AND id < 51
-- Executor 3 processes IDs from 51 to 75
SELECT ... FROM table WHERE id >= 51 AND id < 76
-- Executor 4 processes IDs from 76 to 100
SELECT ... FROM table WHERE id >= 76 AND id <= 100
-- General case for Executor N
SELECT ... FROM table
WHERE partition_column >= (lowerBound + (N-1) * stride)
AND partition_column <= upperBound
-- Where `stride` is calculated as `(upperBound - lowerBound) / numPartitions`.
query_timeout = Field(default=None, alias='queryTimeout')
class-attribute
instance-attribute
¶
The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.
This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.
session_init_statement = Field(default=None, alias='sessionInitStatement')
class-attribute
instance-attribute
¶
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block).
Use this to implement session initialization code.
Example:
sessionInitStatement = """
BEGIN
execute immediate
'alter session set "_serial_direct_read"=true';
END;
"""
upper_bound = Field(default=None, alias='upperBound')
class-attribute
instance-attribute
¶
Sets the lower boundary for data partitioning. Mandatory if partition_column is set
parse(options)
classmethod
¶
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised