Skip to content

Avro

Bases: ReadWriteFileFormat

Avro file format. support hooks

Based on Spark Avro file format.

Supports reading/writing files with .avro extension.

Version compatibility
  • Spark versions: 2.4.x - 3.5.x
  • Java versions: 8 - 20

See documentation from link above.

Added in 0.9.0

Examples:

Note

You can pass any option mentioned in official documentation. Option names should be in camelCase!

The set of supported options depends on Spark version.

from pyspark.sql import SparkSession
from onetl.file.format import Avro

# Create Spark session with Avro package loaded
maven_packages = Avro.get_packages(spark_version="3.5.8")
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

schema = {
    "type": "record",
    "name": "Person",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"},
    ],
}
avro = Avro(avroSchema=schema)  # or avroSchemaUrl=...
# Create Spark session with Avro package loaded
spark = ...

from onetl.file.format import Avro

schema = {
    "type": "record",
    "name": "Person",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"},
    ],
}
avro = Avro(
    avroSchema=schema,  # or avroSchemaUrl=...
    compression="snappy",
)

schema_dict = Field(default=None, alias='avroSchema') class-attribute instance-attribute

Avro schema in JSON format representation.

avro = Avro(
    avroSchema={
        "type": "record",
        "name": "Person",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "age", "type": "int"},
        ],
    },
)
If set, all records should match this schema.

Warning

Mutually exclusive with schema_url.

schema_url = Field(default=None, alias='avroSchemaUrl') class-attribute instance-attribute

URL to Avro schema in JSON format. Usually points to Schema Registry, like:

schema_registry = "http://some.schema.registry.domain"
name = "MyAwesomeSchema"
version = "latest"

schema_url = f"{schema_registry}/subjects/{name}/versions/{version}/schema"
avro = Avro(avroSchemaUrl=schema_url)
If set, schema is fetched before any records are parsed, so all records should match this schema.

Warning

Mutually exclusive with schema_dict.

recordName = None class-attribute instance-attribute

Record name in written Avro schema. Default is topLevelRecord.

Note

Used only for writing files and by serialize_column.

recordNamespace = None class-attribute instance-attribute

Record namespace in written Avro schema. Default is not set.

Note

Used only for writing files and by serialize_column.

compression = None class-attribute instance-attribute

Compression codec. By default, Spark config value spark.sql.avro.compression.codec (snappy) is used.

Note

Used only for writing files. Ignored by serialize_column.

mode = None class-attribute instance-attribute

How to handle parsing errors:

  • PERMISSIVE - set field value as null.
  • FAILFAST - throw an error immediately.

Default is FAILFAST.

Note

Used only by parse_column method.

datetimeRebaseMode = None class-attribute instance-attribute

While converting dates/timestamps from Julian to Proleptic Gregorian calendar, handle value ambiguity:

  • EXCEPTION - fail if ancient dates/timestamps are ambiguous between the two calendars.
  • CORRECTED - load dates/timestamps without as-is.
  • LEGACY - rebase ancient dates/timestamps from the Julian to Proleptic Gregorian calendar.

By default, Spark config value spark.sql.avro.datetimeRebaseModeInRead (CORRECTED) is used.

Note

Used only for reading files and by parse_column.

positionalFieldMatching = None class-attribute instance-attribute

If True, match Avro schema field and DataFrame column by position. If False, match by name.

Default is False.

enableStableIdentifiersForUnionType = None class-attribute instance-attribute

Avro schema may contain union types, which are not supported by Spark. Different variants of union are split to separated DataFrame columns with respective type.

If option value is True, DataFrame column names are based on Avro variant names, e.g. member_int, member_string. If False, DataFrame column names are generated using field position, e.g. member0, member1.

Default is False.

Note

Used only for reading files and by parse_column.

get_packages(spark_version, scala_version=None) classmethod

Get package names to be downloaded by Spark. support hooks

See Maven package index for all available packages.

Added in 0.9.0

Parameters:

  • spark_version (str) –

    Spark version in format major.minor.patch.

  • scala_version (str, default: None ) –

    Scala version in format major.minor.

    If None, spark_version is used to determine Scala version.

Examples:

from onetl.file.format import Avro

Avro.get_packages(spark_version="3.5.8")
Avro.get_packages(spark_version="3.5.8", scala_version="2.12")

parse_column(column)

Parses an Avro binary column into a structured Spark SQL column using Spark's from_avro function, based on the schema provided within the class.

Note

Can be used only with Spark 3.x+

Warning

If schema_url is provided, requests library is used to fetch the schema from the URL. It should be installed manually, like this:

pip install requests

Added in 0.11.0

Parameters:

  • column (str | Column) –

    The name of the column or the column object containing Avro bytes to deserialize. Schema should match the provided Avro schema.

Returns:

  • Column

    Column with deserialized data. Schema is matching the provided Avro schema. Column name is the same as input column.

Raises:

  • ValueError

    If neither avroSchema nor avroSchemaUrl are defined.

  • ImportError

    If schema_url is used and the requests library is not installed.

Examples:

>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import Avro
>>> df.show()
+----+----------------------+----------+---------+------+-----------------------+-------------+
|key |value                 |topic     |partition|offset|timestamp              |timestampType|
+----+----------------------+----------+---------+------+-----------------------+-------------+
|[31]|[0A 41 6C 69 63 65 28]|topicAvro |0        |0     |2024-04-24 13:02:25.911|0            |
|[32]|[06 42 6F 62 32]      |topicAvro |0        |1     |2024-04-24 13:02:25.922|0            |
+----+----------------------+----------+---------+------+-----------------------+-------------+
>>> df.printSchema()
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: integer (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
>>> avro = Avro(
...     avroSchema={  # or avroSchemaUrl=...
...         "type": "record",
...         "name": "Person",
...         "fields": [
...             {"name": "name", "type": "string"},
...             {"name": "age", "type": "int"},
...         ],
...     }
... )
>>> parsed_df = df.select(decode("key", "UTF-8").alias("key"), avro.parse_column("value"))
>>> parsed_df.show(truncate=False)
+---+-----------+
|key|value      |
+---+-----------+
|1  |{Alice, 20}|
|2  |{Bob, 25}  |
+---+-----------+
>>> parsed_df.printSchema()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- age: integer (nullable = true)

serialize_column(column)

Serializes a structured Spark SQL column into an Avro binary column using Spark's to_avro function.

Note

Can be used only with Spark 3.x+

Warning

If schema_url is provided, requests library is used to fetch the schema from the URL. It should be installed manually, like this:

pip install requests

Added in 0.11.0

Parameters:

  • column (str | Column) –

    The name of the column or the column object containing the data to serialize to Avro format.

Returns:

  • Column

    Column with binary Avro data. Column name is the same as input column.

Raises:

  • ValueError

    If the Spark version is less than 3.x.

  • ImportError

    If schema_url is used and the requests library is not installed.

Examples:

>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import Avro
>>> df.show()
+---+-----------+
|key|value      |
+---+-----------+
|1  |{Alice, 20}|
|2  |  {Bob, 25}|
+---+-----------+
>>> df.printSchema()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- age: integer (nullable = true)
>>> # serializing data into Avro format
>>> avro = Avro(
...     avroSchema={  # or avroSchemaUrl=...
...         "type": "record",
...         "name": "Person",
...         "fields": [
...             {"name": "name", "type": "string"},
...             {"name": "age", "type": "int"},
...         ],
...     }
... )
>>> serialized_df = df.select("key", avro.serialize_column("value"))
>>> serialized_df.show(truncate=False)
+---+----------------------+
|key|value                 |
+---+----------------------+
|  1|[0A 41 6C 69 63 65 28]|
|  2|[06 42 6F 62 32]      |
+---+----------------------+
>>> serialized_df.printSchema()
root
|-- key: string (nullable = true)
|-- value: binary (nullable = true)