JSON¶
Bases: ReadOnlyFileFormat
Based on Spark JSON file format.
Supports reading (but NOT writing) files with .json extension with content like:
[
{"key": "value1"},
{"key": "value2"}
]
Added in 0.9.0
Examples:
Note
You can pass any option mentioned in
official documentation.
Option names should be in camelCase!
The set of supported options depends on Spark version.
Reading files:
from onetl.file.format import JSON
json = JSON(encoding="UTF-8")
Warning
Not supported. Use JSONLine.
allowBackslashEscapingAnyCharacter = None
class-attribute
instance-attribute
¶
If True, prefix \ can escape any character.
Default False.
Note
Used only for reading files and parse_column method.
allowComments = None
class-attribute
instance-attribute
¶
If True, add support for C/C++/Java style comments (//, /* */).
Default False, meaning that JSON files should not contain comments.
Note
Used only for reading files and parse_column method.
allowNonNumericNumbers = None
class-attribute
instance-attribute
¶
If True, allow numbers to contain non-numeric characters, like:
- scientific notation (e.g.
12e10). - positive infinity floating point value (
Infinity,+Infinity,+INF). - negative infinity floating point value (
-Infinity,-INF). - Not-a-Number floating point value (
NaN).
Default True.
Note
Used only for reading files and parse_column method.
allowNumericLeadingZeros = None
class-attribute
instance-attribute
¶
If True, allow leading zeros in numbers (e.g. 00012).
Default False.
Note
Used only for reading files and parse_column method.
allowSingleQuotes = None
class-attribute
instance-attribute
¶
If True, allow JSON object field names to be wrapped with single quotes (').
Default True.
Note
Used only for reading files and parse_column method.
allowUnquotedControlChars = None
class-attribute
instance-attribute
¶
If True, allow unquoted control characters (ASCII values 0-31) in strings without escaping them with \.
Default False.
Note
Used only for reading files and parse_column method.
allowUnquotedFieldNames = None
class-attribute
instance-attribute
¶
If True, allow JSON object field names without quotes (JavaScript-style).
Default False.
Note
Used only for reading files and parse_column method.
columnNameOfCorruptRecord = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
Name of column to put corrupt records in.
Default is _corrupt_record.
Warning
If DataFrame schema is provided, this column should be added to schema explicitly:
from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import JSON
from pyspark.sql.types import StructType, StructField, TimestampType, StringType
spark = ...
schema = StructType(
[
StructField("my_field", TimestampType()),
StructField("_corrupt_record", StringType()), # <-- important
]
)
json = JSON(mode="PERMISSIVE", columnNameOfCorruptRecord="_corrupt_record")
reader = FileDFReader(
connection=connection,
format=json,
df_schema=schema, # < ---
)
df = reader.run(["/some/file.json"])
Note
Used only for reading files and parse_column method.
dateFormat = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
String format for DateType() representation.
Default is yyyy-MM-dd.
dropFieldIfAllNull = None
class-attribute
instance-attribute
¶
If True and inferred column is always null or empty array, exclude if from DataFrame schema.
Default False.
Note
Used only for reading files. Ignored by parse_column method.
encoding = None
class-attribute
instance-attribute
¶
Encoding of the JSON file.
Default UTF-8.
lineSep = None
class-attribute
instance-attribute
¶
Character used to separate lines in the JSON file.
Defaults:
- Try to detect for reading (
\r\n,\r,\n) \nfor writing
Note
Used only for reading and writing files.
Ignored by parse_column and serialize_column methods, as they handle each DataFrame row separately.
locale = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
Locale name used to parse dates and timestamps.
Default is en-US.
Note
Used only for reading files and parse_column method.
mode = None
class-attribute
instance-attribute
¶
How to handle parsing errors:
PERMISSIVE- set field value asnull, move raw data to columnNameOfCorruptRecord column.DROPMALFORMED- skip the malformed row.FAILFAST- throw an error immediately.
Default is PERMISSIVE.
Note
Used only for reading files and parse_column method.
prefersDecimal = None
class-attribute
instance-attribute
¶
If True, infer all floating-point values as Decimal.
Default False.
Note
Used only for reading files. Ignored by parse_column method.
primitivesAsString = None
class-attribute
instance-attribute
¶
If True, infer all primitive types (string, integer, float, boolean) as strings.
Default False.
Note
Used only for reading files. Ignored by parse_column method.
samplingRatio = Field(default=None, ge=0, le=1)
class-attribute
instance-attribute
¶
While inferring schema, read the specified fraction of file rows.
Default 1.
Note
Used only for reading files. Ignored by parse_column function.
timestampFormat = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
String format for TimestampType() representation.
Default is yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX].
timestampNTZFormat = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
String format for TimestampNTZType() representation.
Default is yyyy-MM-dd'T'HH:mm:ss[.SSS].
Note
Added in Spark 3.2.0
timezone = Field(default=None, min_length=1, alias='timeZone')
class-attribute
instance-attribute
¶
Allows to override timezone used for parsing or serializing date and timestamp values.
By default, spark.sql.session.timeZone is used.
parse_column(column, schema)
¶
Parses a JSON string column to a structured Spark SQL column using Spark's from_json function, based on the provided schema.
Added in 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the column object containing JSON strings/bytes to parse.
-
schema(StructType | ArrayType | MapType) –The schema to apply when parsing the JSON data. This defines the structure of the output DataFrame column.
Returns:
-
Column–Column with deserialized data, with the same structure as the provided schema. Column name is the same as input column.
Examples:
>>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType
>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import JSON
>>> df.show()
+----+--------------------+----------+---------+------+-----------------------+-------------+
|key |value |topic |partition|offset|timestamp |timestampType|
+----+--------------------+----------+---------+------+-----------------------+-------------+
|[31]|[7B 22 6E 61 6D 6...|topicJSON |0 |0 |2024-04-24 16:51:11.739|0 |
|[32]|[7B 22 6E 61 6D 6...|topicJSON |0 |1 |2024-04-24 16:51:11.749|0 |
+----+--------------------+----------+---------+------+-----------------------+-------------+
>>> df.printSchema()
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: integer (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
>>> json = JSON()
>>> json_schema = StructType(
... [
... StructField("name", StringType(), nullable=True),
... StructField("age", IntegerType(), nullable=True),
... ],
... )
>>> parsed_df = df.select(decode("key", "UTF-8").alias("key"), json.parse_column("value", json_schema))
>>> parsed_df.show()
+---+-----------+
|key|value |
+---+-----------+
|1 |{Alice, 20}|
|2 | {Bob, 25}|
+---+-----------+
>>> parsed_df.printSchema()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
serialize_column(column)
¶
Serializes a structured Spark SQL column into a JSON string column using Spark's to_json function.
Added in 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the column object containing the data to serialize to JSON format.
Returns:
-
Column–Column with string JSON data. Column name is the same as input column.
Examples:
>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import JSON
>>> df.show()
+---+-----------+
|key|value |
+---+-----------+
|1 |{Alice, 20}|
|2 | {Bob, 25}|
+---+-----------+
>>> df.printSchema()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
>>> # serializing data into JSON format
>>> json = JSON()
>>> serialized_df = df.select("key", json.serialize_column("value"))
>>> serialized_df.show(truncate=False)
+---+-------------------------+
|key|value |
+---+-------------------------+
| 1|{"name":"Alice","age":20}|
| 2|{"name":"Bob","age":25} |
+---+-------------------------+
>>> serialized_df.printSchema()
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)