Skip to content

XML

Bases: ReadWriteFileFormat

XML file format. support hooks

Based on Databricks Spark XML file format.

Supports reading/writing files with .xml extension.

Added in 0.9.5

Version compatibility
  • Spark versions: 3.2.x - 3.5.x
  • Java versions: 8 - 20

See official documentation.

Examples:

Note

You can pass any option mentioned in official documentation. Option names should be in camelCase!

The set of supported options depends on spark-xml version.

from onetl.file.format import XML
from pyspark.sql import SparkSession

# Create Spark session with XML package loaded
maven_packages = XML.get_packages(spark_version="3.5.8")
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

xml = XML(rowTag="item", mode="PERMISSIVE")

Warning

Due to bug written files currently do not have .xml extension.

# Create Spark session with XML package loaded
spark = ...

from onetl.file.format import XML

xml = XML(rowTag="item", rootTag="data", compression="gzip")

arrayElementName = None class-attribute instance-attribute

If DataFrame column is ArrayType, its content will be written to XML inside <arrayElementName>...</arrayElementName> tag. Default is item.

Note

Used only for writing files.

attributePrefix = None class-attribute instance-attribute

While parsing tags containing attributes like <sometag attr="value">, attributes are stored as DataFrame schema columns with specified prefix, e.g. _attr. Default _.

Note

Used only for reading files or by parse_column function.

charset = None class-attribute instance-attribute

File encoding. Default is UTF-8

Note

Used only for reading files or by parse_column function.

columnNameOfCorruptRecord = None class-attribute instance-attribute

Name of DataFrame column there corrupted row is stored with mode=PERMISSIVE.

Warning

If DataFrame schema is provided, this column should be added to schema explicitly:

from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import XML

from pyspark.sql.types import StructType, StructField, TImestampType, StringType

spark = ...
schema = StructType(
    [
        StructField("my_field", TimestampType()),
        StructField("_corrupt_record", StringType()),  # <-- important
    ]
)
xml = XML(rowTag="item", columnNameOfCorruptRecord="_corrupt_record")

reader = FileDFReader(
    connection=connection,
    format=xml,
    df_schema=schema,  # < ---
)
df = reader.run(["/some/file.xml"])

Note

Used only for reading files or by parse_column function.

compression = None class-attribute instance-attribute

Compression codec. By default no compression is used.

Note

Used only for writing files.

dateFormat = None class-attribute instance-attribute

Format string used for parsing or serializing date values. By default, ISO 8601 format is used (yyyy-MM-dd).

declaration = None class-attribute instance-attribute

Content of <?XML ... ?> declaration. Default is version="1.0" encoding="UTF-8" standalone="yes".

Note

Used only for writing files.

excludeAttribute = None class-attribute instance-attribute

If True, exclude attributes while parsing tags like <sometag attr="value">. Default false.

Note

Used only for reading files or by parse_column function.

ignoreNamespace = None class-attribute instance-attribute

If True, all namespaces like <ns:tag> will be ignored and treated as just <tag>. Default False.

Note

Used only for reading files or by parse_column function.

ignoreSurroundingSpaces = None class-attribute instance-attribute

If True, trim surrounding spaces while parsing values. Default false.

Note

Used only for reading files or by parse_column function.

inferSchema = None class-attribute instance-attribute

If True, try to infer the input schema by reading a sample of the file (see samplingRatio). Default False which means that all parsed columns will be StringType().

Note

Used only for reading files. Ignored by parse_column function.

mode = None class-attribute instance-attribute

How to handle parsing errors:

  • PERMISSIVE - set field value as null, move raw data to columnNameOfCorruptRecord column.
  • DROPMALFORMED - skip the malformed row.
  • FAILFAST - throw an error immediately.

Default is PERMISSIVE.

Note

Used only for reading files or by parse_column function.

nullValue = None class-attribute instance-attribute

String value used to represent null. Default is null string.

rootTag = None class-attribute instance-attribute

XML tag that encloses content of all DataFrame. Default is ROWS.

Note

Used only for writing files.

row_tag = Field(alias='rowTag') class-attribute instance-attribute

XML tag that encloses each row in XML. Required.

rowValidationXSDPath = None class-attribute instance-attribute

Path to XSD file which should be used to validate each row. If row does not match XSD, it will be treated as error, behavior depends on mode value.

Default is no validation.

Note

If Spark session is created with master=yarn or master=k8s, XSD file should be accessible from all Spark nodes. This can be achieved by calling:

spark.addFile("/path/to/file.xsd")
And then by passing rowValidationXSDPath=file.xsd (relative path).

Note

Used only for reading files or by parse_column function.

samplingRatio = Field(default=None, ge=0, le=1) class-attribute instance-attribute

For inferSchema=True, read the specified fraction of rows to infer the schema. Default 1.

Note

Used only for reading files. Ignored by parse_column function.

timestampFormat = None class-attribute instance-attribute

Format string used for parsing or serializing timestamp values. By default, ISO 8601 format is used (yyyy-MM-ddTHH:mm:ss.SSSZ).

valueTag = None class-attribute instance-attribute

Value used to replace missing values while parsing attributes like <sometag someattr>. Default _VALUE.

Note

Used only for reading files or by parse_column function.

wildcardColName = None class-attribute instance-attribute

Name of column or columns which should be preserved as raw XML string, and not parsed.

Warning

If DataFrame schema is provided, this column should be added to schema explicitly. See columnNameOfCorruptRecord example.

Note

Used only for reading files or by parse_column function.

get_packages(spark_version, scala_version=None, package_version=None) classmethod

Get package names to be downloaded by Spark. support hooks

Note

For Spark 4.x this is not required anymore.

Added in 0.9.5

Parameters:

  • spark_version (str) –

    Spark version in format major.minor.patch.

  • scala_version (str, default: None ) –

    Scala version in format major.minor.

    If None, spark_version is used to determine Scala version.

  • package_version (str, default: None ) –

    Package version in format major.minor.patch. Default is 0.18.0.

    See Maven index for list of available versions.

    Warning

    Version 0.13 and below are not supported.

    Note

    It is not guaranteed that custom package versions are supported. Tests are performed only for default version.

Examples:

from onetl.file.format import XML

XML.get_packages(spark_version="3.5.8")
XML.get_packages(spark_version="3.5.8", scala_version="2.12")
XML.get_packages(
    spark_version="3.5.8",
    scala_version="2.12",
    package_version="0.18.0",
)

parse_column(column, schema)

Parses an XML string column into a structured Spark SQL column using the from_xml function provided by the Databricks Spark XML library based on the provided schema.

Note

This method assumes that the spark-xml package is installed: get_packages.

Note

This method parses each DataFrame row individually. Therefore, for a specific column, each row must contain exactly one occurrence of the rowTag specified.

If your XML data includes a root tag that encapsulates multiple row tags, you can adjust the schema to use an ArrayType to keep all child elements under the single root.

<books>
    <book><title>Book One</title><author>Author A</author></book>
    <book><title>Book Two</title><author>Author B</author></book>
</books>
And the corresponding schema in Spark using an ArrayType:

from pyspark.sql.types import StructType, StructField, ArrayType, StringType
from onetl.file.format import XML

# each DataFrame row has exactly one <books> tag
xml = XML(rowTag="books")
# each <books> tag have multiple <book> tags, so using ArrayType for such field
schema = StructType(
    [
        StructField(
            "book",
            ArrayType(
                StructType(
                    [
                        StructField("title", StringType(), nullable=True),
                        StructField("author", StringType(), nullable=True),
                    ],
                ),
            ),
            nullable=True,
        ),
    ],
)

Added in 0.11.0

Parameters:

  • column (str | Column) –

    The name of the column or the column object containing XML strings/bytes to parse.

  • schema (StructType) –

    The schema to apply when parsing the XML data. This defines the structure of the output DataFrame column.

Returns:

  • Column

    Column with deserialized data, with the same structure as the provided schema. Column name is the same as input column.

Examples:

>>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType
>>> from onetl.file.format import XML
>>> df.show()
+--+------------------------------------------------+
|id|value                                           |
+--+------------------------------------------------+
|1 |<person><name>Alice</name><age>20</age></person>|
|2 |<person><name>Bob</name><age>25</age></person>  |
+--+------------------------------------------------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)
>>> xml = XML(rowTag="person")
>>> xml_schema = StructType(
...     [
...         StructField("name", StringType(), nullable=True),
...         StructField("age", IntegerType(), nullable=True),
...     ],
... )
>>> parsed_df = df.select("key", xml.parse_column("value", xml_schema))
>>> parsed_df.show()
+--+-----------+
|id|value      |
+--+-----------+
|1 |{Alice, 20}|
|2 |  {Bob, 25}|
+--+-----------+
>>> parsed_df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- age: integer (nullable = true)