Avro¶
Bases: ReadWriteFileFormat
Based on Spark Avro file format.
Supports reading/writing files with .avro extension.
Version compatibility
- Spark versions: 2.4.x - 3.5.x
- Java versions: 8 - 20
See documentation from link above.
Added in 0.9.0
Examples:
Note
You can pass any option mentioned in
official documentation.
Option names should be in camelCase!
The set of supported options depends on Spark version.
from pyspark.sql import SparkSession
from onetl.file.format import Avro
# Create Spark session with Avro package loaded
maven_packages = Avro.get_packages(spark_version="3.5.8")
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
schema = {
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
],
}
avro = Avro(avroSchema=schema) # or avroSchemaUrl=...
# Create Spark session with Avro package loaded
spark = ...
from onetl.file.format import Avro
schema = {
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
],
}
avro = Avro(
avroSchema=schema, # or avroSchemaUrl=...
compression="snappy",
)
schema_dict = Field(default=None, alias='avroSchema')
class-attribute
instance-attribute
¶
Avro schema in JSON format representation.
avro = Avro(
avroSchema={
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
],
},
)
Warning
Mutually exclusive with schema_url.
schema_url = Field(default=None, alias='avroSchemaUrl')
class-attribute
instance-attribute
¶
URL to Avro schema in JSON format. Usually points to Schema Registry, like:
schema_registry = "http://some.schema.registry.domain"
name = "MyAwesomeSchema"
version = "latest"
schema_url = f"{schema_registry}/subjects/{name}/versions/{version}/schema"
avro = Avro(avroSchemaUrl=schema_url)
Warning
Mutually exclusive with schema_dict.
recordName = None
class-attribute
instance-attribute
¶
Record name in written Avro schema.
Default is topLevelRecord.
Note
Used only for writing files and by serialize_column.
recordNamespace = None
class-attribute
instance-attribute
¶
Record namespace in written Avro schema. Default is not set.
Note
Used only for writing files and by serialize_column.
compression = None
class-attribute
instance-attribute
¶
Compression codec.
By default, Spark config value spark.sql.avro.compression.codec (snappy) is used.
Note
Used only for writing files. Ignored by serialize_column.
mode = None
class-attribute
instance-attribute
¶
How to handle parsing errors:
PERMISSIVE- set field value asnull.FAILFAST- throw an error immediately.
Default is FAILFAST.
Note
Used only by parse_column method.
datetimeRebaseMode = None
class-attribute
instance-attribute
¶
While converting dates/timestamps from Julian to Proleptic Gregorian calendar, handle value ambiguity:
EXCEPTION- fail if ancient dates/timestamps are ambiguous between the two calendars.CORRECTED- load dates/timestamps without as-is.LEGACY- rebase ancient dates/timestamps from the Julian to Proleptic Gregorian calendar.
By default, Spark config value spark.sql.avro.datetimeRebaseModeInRead (CORRECTED) is used.
Note
Used only for reading files and by parse_column.
positionalFieldMatching = None
class-attribute
instance-attribute
¶
If True, match Avro schema field and DataFrame column by position.
If False, match by name.
Default is False.
enableStableIdentifiersForUnionType = None
class-attribute
instance-attribute
¶
Avro schema may contain union types, which are not supported by Spark. Different variants of union are split to separated DataFrame columns with respective type.
If option value is True, DataFrame column names are based on Avro variant names,
e.g. member_int, member_string.
If False, DataFrame column names are generated using field position, e.g. member0, member1.
Default is False.
Note
Used only for reading files and by parse_column.
get_packages(spark_version, scala_version=None)
classmethod
¶
Get package names to be downloaded by Spark.
See Maven package index for all available packages.
Added in 0.9.0
Parameters:
-
spark_version(str) –Spark version in format
major.minor.patch. -
scala_version(str, default:None) –Scala version in format
major.minor.If
None,spark_versionis used to determine Scala version.
Examples:
from onetl.file.format import Avro
Avro.get_packages(spark_version="3.5.8")
Avro.get_packages(spark_version="3.5.8", scala_version="2.12")
parse_column(column)
¶
Parses an Avro binary column into a structured Spark SQL column using Spark's from_avro function, based on the schema provided within the class.
Note
Can be used only with Spark 3.x+
Warning
If schema_url is provided, requests library is used to fetch the schema from the URL.
It should be installed manually, like this:
pip install requests
Added in 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the column object containing Avro bytes to deserialize. Schema should match the provided Avro schema.
Returns:
-
Column–Column with deserialized data. Schema is matching the provided Avro schema. Column name is the same as input column.
Raises:
-
ValueError–If neither
avroSchemanoravroSchemaUrlare defined. -
ImportError–If
schema_urlis used and therequestslibrary is not installed.
Examples:
>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import Avro
>>> df.show()
+----+----------------------+----------+---------+------+-----------------------+-------------+
|key |value |topic |partition|offset|timestamp |timestampType|
+----+----------------------+----------+---------+------+-----------------------+-------------+
|[31]|[0A 41 6C 69 63 65 28]|topicAvro |0 |0 |2024-04-24 13:02:25.911|0 |
|[32]|[06 42 6F 62 32] |topicAvro |0 |1 |2024-04-24 13:02:25.922|0 |
+----+----------------------+----------+---------+------+-----------------------+-------------+
>>> df.printSchema()
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: integer (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
>>> avro = Avro(
... avroSchema={ # or avroSchemaUrl=...
... "type": "record",
... "name": "Person",
... "fields": [
... {"name": "name", "type": "string"},
... {"name": "age", "type": "int"},
... ],
... }
... )
>>> parsed_df = df.select(decode("key", "UTF-8").alias("key"), avro.parse_column("value"))
>>> parsed_df.show(truncate=False)
+---+-----------+
|key|value |
+---+-----------+
|1 |{Alice, 20}|
|2 |{Bob, 25} |
+---+-----------+
>>> parsed_df.printSchema()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
serialize_column(column)
¶
Serializes a structured Spark SQL column into an Avro binary column using Spark's to_avro function.
Note
Can be used only with Spark 3.x+
Warning
If schema_url is provided, requests library is used to fetch the schema from the URL.
It should be installed manually, like this:
pip install requests
Added in 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the column object containing the data to serialize to Avro format.
Returns:
-
Column–Column with binary Avro data. Column name is the same as input column.
Raises:
-
ValueError–If the Spark version is less than 3.x.
-
ImportError–If
schema_urlis used and therequestslibrary is not installed.
Examples:
>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import Avro
>>> df.show()
+---+-----------+
|key|value |
+---+-----------+
|1 |{Alice, 20}|
|2 | {Bob, 25}|
+---+-----------+
>>> df.printSchema()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
>>> # serializing data into Avro format
>>> avro = Avro(
... avroSchema={ # or avroSchemaUrl=...
... "type": "record",
... "name": "Person",
... "fields": [
... {"name": "name", "type": "string"},
... {"name": "age", "type": "int"},
... ],
... }
... )
>>> serialized_df = df.select("key", avro.serialize_column("value"))
>>> serialized_df.show(truncate=False)
+---+----------------------+
|key|value |
+---+----------------------+
| 1|[0A 41 6C 69 63 65 28]|
| 2|[06 42 6F 62 32] |
+---+----------------------+
>>> serialized_df.printSchema()
root
|-- key: string (nullable = true)
|-- value: binary (nullable = true)