XML¶
Bases: ReadWriteFileFormat
Based on Databricks Spark XML file format.
Supports reading/writing files with .xml extension.
Added in 0.9.5
Version compatibility
- Spark versions: 3.2.x - 3.5.x
- Java versions: 8 - 20
Examples:
Note
You can pass any option mentioned in
official documentation.
Option names should be in camelCase!
The set of supported options depends on spark-xml version.
from onetl.file.format import XML
from pyspark.sql import SparkSession
# Create Spark session with XML package loaded
maven_packages = XML.get_packages(spark_version="3.5.8")
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
xml = XML(rowTag="item", mode="PERMISSIVE")
Warning
Due to bug written files
currently do not have .xml extension.
# Create Spark session with XML package loaded
spark = ...
from onetl.file.format import XML
xml = XML(rowTag="item", rootTag="data", compression="gzip")
arrayElementName = None
class-attribute
instance-attribute
¶
If DataFrame column is ArrayType, its content will be written to XML
inside <arrayElementName>...</arrayElementName> tag.
Default is item.
Note
Used only for writing files.
attributePrefix = None
class-attribute
instance-attribute
¶
While parsing tags containing attributes like <sometag attr="value">, attributes are stored as
DataFrame schema columns with specified prefix, e.g. _attr.
Default _.
Note
Used only for reading files or by parse_column function.
charset = None
class-attribute
instance-attribute
¶
File encoding. Default is UTF-8
Note
Used only for reading files or by parse_column function.
columnNameOfCorruptRecord = None
class-attribute
instance-attribute
¶
Name of DataFrame column there corrupted row is stored with mode=PERMISSIVE.
Warning
If DataFrame schema is provided, this column should be added to schema explicitly:
from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import XML
from pyspark.sql.types import StructType, StructField, TImestampType, StringType
spark = ...
schema = StructType(
[
StructField("my_field", TimestampType()),
StructField("_corrupt_record", StringType()), # <-- important
]
)
xml = XML(rowTag="item", columnNameOfCorruptRecord="_corrupt_record")
reader = FileDFReader(
connection=connection,
format=xml,
df_schema=schema, # < ---
)
df = reader.run(["/some/file.xml"])
Note
Used only for reading files or by parse_column function.
compression = None
class-attribute
instance-attribute
¶
Compression codec. By default no compression is used.
Note
Used only for writing files.
dateFormat = None
class-attribute
instance-attribute
¶
Format string used for parsing or serializing date values.
By default, ISO 8601 format is used (yyyy-MM-dd).
declaration = None
class-attribute
instance-attribute
¶
Content of <?XML ... ?> declaration.
Default is version="1.0" encoding="UTF-8" standalone="yes".
Note
Used only for writing files.
excludeAttribute = None
class-attribute
instance-attribute
¶
If True, exclude attributes while parsing tags like <sometag attr="value">.
Default false.
Note
Used only for reading files or by parse_column function.
ignoreNamespace = None
class-attribute
instance-attribute
¶
If True, all namespaces like <ns:tag> will be ignored and treated as just <tag>.
Default False.
Note
Used only for reading files or by parse_column function.
ignoreSurroundingSpaces = None
class-attribute
instance-attribute
¶
If True, trim surrounding spaces while parsing values. Default false.
Note
Used only for reading files or by parse_column function.
inferSchema = None
class-attribute
instance-attribute
¶
If True, try to infer the input schema by reading a sample of the file (see samplingRatio).
Default False which means that all parsed columns will be StringType().
Note
Used only for reading files. Ignored by parse_column function.
mode = None
class-attribute
instance-attribute
¶
How to handle parsing errors:
PERMISSIVE- set field value asnull, move raw data to columnNameOfCorruptRecord column.DROPMALFORMED- skip the malformed row.FAILFAST- throw an error immediately.
Default is PERMISSIVE.
Note
Used only for reading files or by parse_column function.
nullValue = None
class-attribute
instance-attribute
¶
String value used to represent null. Default is null string.
rootTag = None
class-attribute
instance-attribute
¶
XML tag that encloses content of all DataFrame. Default is ROWS.
Note
Used only for writing files.
row_tag = Field(alias='rowTag')
class-attribute
instance-attribute
¶
XML tag that encloses each row in XML. Required.
rowValidationXSDPath = None
class-attribute
instance-attribute
¶
Path to XSD file which should be used to validate each row. If row does not match XSD, it will be treated as error, behavior depends on mode value.
Default is no validation.
Note
If Spark session is created with master=yarn or master=k8s, XSD
file should be accessible from all Spark nodes. This can be achieved by calling:
spark.addFile("/path/to/file.xsd")
rowValidationXSDPath=file.xsd (relative path).
Note
Used only for reading files or by parse_column function.
samplingRatio = Field(default=None, ge=0, le=1)
class-attribute
instance-attribute
¶
For inferSchema=True, read the specified fraction of rows to infer the schema.
Default 1.
Note
Used only for reading files. Ignored by parse_column function.
timestampFormat = None
class-attribute
instance-attribute
¶
Format string used for parsing or serializing timestamp values.
By default, ISO 8601 format is used (yyyy-MM-ddTHH:mm:ss.SSSZ).
valueTag = None
class-attribute
instance-attribute
¶
Value used to replace missing values while parsing attributes like <sometag someattr>.
Default _VALUE.
Note
Used only for reading files or by parse_column function.
wildcardColName = None
class-attribute
instance-attribute
¶
Name of column or columns which should be preserved as raw XML string, and not parsed.
Warning
If DataFrame schema is provided, this column should be added to schema explicitly. See columnNameOfCorruptRecord example.
Note
Used only for reading files or by parse_column function.
get_packages(spark_version, scala_version=None, package_version=None)
classmethod
¶
Get package names to be downloaded by Spark.
Note
For Spark 4.x this is not required anymore.
Added in 0.9.5
Parameters:
-
spark_version(str) –Spark version in format
major.minor.patch. -
scala_version(str, default:None) –Scala version in format
major.minor.If
None,spark_versionis used to determine Scala version. -
package_version(str, default:None) –Package version in format
major.minor.patch. Default is0.18.0.See Maven index for list of available versions.
Warning
Version
0.13and below are not supported.Note
It is not guaranteed that custom package versions are supported. Tests are performed only for default version.
Examples:
from onetl.file.format import XML
XML.get_packages(spark_version="3.5.8")
XML.get_packages(spark_version="3.5.8", scala_version="2.12")
XML.get_packages(
spark_version="3.5.8",
scala_version="2.12",
package_version="0.18.0",
)
parse_column(column, schema)
¶
Parses an XML string column into a structured Spark SQL column using the from_xml function
provided by the Databricks Spark XML library
based on the provided schema.
Note
This method assumes that the spark-xml package is installed: get_packages.
Note
This method parses each DataFrame row individually. Therefore, for a specific column,
each row must contain exactly one occurrence of the rowTag specified.
If your XML data includes a root tag that encapsulates multiple row tags, you can adjust the schema
to use an ArrayType to keep all child elements under the single root.
<books>
<book><title>Book One</title><author>Author A</author></book>
<book><title>Book Two</title><author>Author B</author></book>
</books>
ArrayType:
from pyspark.sql.types import StructType, StructField, ArrayType, StringType
from onetl.file.format import XML
# each DataFrame row has exactly one <books> tag
xml = XML(rowTag="books")
# each <books> tag have multiple <book> tags, so using ArrayType for such field
schema = StructType(
[
StructField(
"book",
ArrayType(
StructType(
[
StructField("title", StringType(), nullable=True),
StructField("author", StringType(), nullable=True),
],
),
),
nullable=True,
),
],
)
Added in 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the column object containing XML strings/bytes to parse.
-
schema(StructType) –The schema to apply when parsing the XML data. This defines the structure of the output DataFrame column.
Returns:
-
Column–Column with deserialized data, with the same structure as the provided schema. Column name is the same as input column.
Examples:
>>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType
>>> from onetl.file.format import XML
>>> df.show()
+--+------------------------------------------------+
|id|value |
+--+------------------------------------------------+
|1 |<person><name>Alice</name><age>20</age></person>|
|2 |<person><name>Bob</name><age>25</age></person> |
+--+------------------------------------------------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)
>>> xml = XML(rowTag="person")
>>> xml_schema = StructType(
... [
... StructField("name", StringType(), nullable=True),
... StructField("age", IntegerType(), nullable=True),
... ],
... )
>>> parsed_df = df.select("key", xml.parse_column("value", xml_schema))
>>> parsed_df.show()
+--+-----------+
|id|value |
+--+-----------+
|1 |{Alice, 20}|
|2 | {Bob, 25}|
+--+-----------+
>>> parsed_df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)