CSV¶
Bases: ReadWriteFileFormat
Based on Spark CSV file format.
Supports reading/writing files with .csv extension with content like:
"some","value"
"another","value"
Added in 0.9.0
Examples:
Note
You can pass any option mentioned in
official documentation.
Option names should be in camelCase!
The set of supported options depends on Spark version.
from onetl.file.format import CSV
csv = CSV(header=True, inferSchema=True, mode="PERMISSIVE")
from onetl.file.format import CSV
csv = CSV(header=True, compression="gzip")
charToEscapeQuoteEscaping = Field(default=None, max_length=1)
class-attribute
instance-attribute
¶
If CSV field value contains [escape][onetl.file.format.csv.CSV.escape] character, it should be escaped as well.
For example, if escape="\", when line:
"some \" quoted value",other
"some \\ backslashed value",another
[
('some " quoted value', "other"),
("some \ backslashed value", "another"),
]
Default is same as [escape][onetl.file.format.csv.CSV.escape].
columnNameOfCorruptRecord = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
Name of column to put corrupt records in.
Default is _corrupt_record.
Warning
If DataFrame schema is provided, this column should be added to schema explicitly:
from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV
from pyspark.sql.types import StructType, StructField, TimestampType, StringType
spark = ...
schema = StructType(
[
StructField("my_field", TimestampType()),
StructField("_corrupt_record", StringType()), # <-- important
]
)
csv = CSV(mode="PERMISSIVE", columnNameOfCorruptRecord="_corrupt_record")
reader = FileDFReader(
connection=connection,
format=csv,
df_schema=schema, # < ---
)
df = reader.run(["/some/file.csv"])
Note
Used only for reading files and parse_column method.
comment = Field(default=None, max_length=1)
class-attribute
instance-attribute
¶
If set, all lines starting with specified character (e.g. #) are considered a comment, and skipped.
Default is not set, meaning that CSV lines should not contain comments.
Note
Used only for reading files and parse_column method.
compression = None
class-attribute
instance-attribute
¶
Compression codec of the CSV file.
Default none.
Note
Used only for writing files. Ignored by parse_column method.
dateFormat = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
String format for DateType() representation.
Default is yyyy-MM-dd.
delimiter = Field(default=',', alias=(avoid_alias('sep')))
class-attribute
instance-attribute
¶
Character used to separate fields in CSV row.
emptyValue = None
class-attribute
instance-attribute
¶
Value used for empty string fields.
Defaults:
- empty string for reading.
""for writing.
enforceSchema = None
class-attribute
instance-attribute
¶
If True, inferred or user-provided schema has higher priority than CSV file headers.
This means that all input files should have the same structure.
If False, CSV headers are used as a primary source of information about column names and their position.
Default True.
Note
Used only for reading files. Ignored by parse_column function.
escapeQuotes = None
class-attribute
instance-attribute
¶
If True, escape quotes within CSV field.
any,field with \"quote,123,
False, wrap fields containing quote symbols with quotes.
any,"field with ""quote ",123,
True.
Note
Used only for writing files.
header = None
class-attribute
instance-attribute
¶
If True, the first row of the file is considered a header.
Default False.
ignoreLeadingWhiteSpace = None
class-attribute
instance-attribute
¶
If True, trim leading whitespaces in field value.
Defaults:
Truefor reading.Falsefor writing.
ignoreTrailingWhiteSpace = None
class-attribute
instance-attribute
¶
If True, trim trailing whitespaces in field value.
Defaults:
Truefor reading.Falsefor writing.
inferSchema = None
class-attribute
instance-attribute
¶
If True, try to infer the input schema by reading a sample of the file (see samplingRatio).
Default False which means that all parsed columns will be StringType().
Note
Used only for reading files, and only if user haven't provider explicit DataFrame schema. Ignored by parse_column function.
locale = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
Locale name used to parse dates and timestamps.
Default is en-US
Note
Used only for reading files and parse_column method.
maxCharsPerColumn = None
class-attribute
instance-attribute
¶
Maximum number of characters to read per column.
Default is -1, which means no limit.
Note
Used only for reading files and parse_column method.
mode = None
class-attribute
instance-attribute
¶
How to handle parsing errors:
PERMISSIVE- set field value asnull, move raw data to columnNameOfCorruptRecord column.DROPMALFORMED- skip the malformed row.FAILFAST- throw an error immediately.
Default is PERMISSIVE.
Note
Used only for reading files and parse_column method.
multiLine = None
class-attribute
instance-attribute
¶
If True, fields may contain line separators.
If False, the input is expected to have one record per file.
Default is True.
Note
Used only for reading files. Ignored by parse_column method, as it expects that each DataFrame row will contain exactly one CSV line.
nanValue = Field(default=None)
class-attribute
instance-attribute
¶
If set, this string will be considered as Not-A-Number (NaN) value for FloatType() and DoubleType().
Default is NaN.
Note
Used only for reading files and parse_column method.
negativeInf = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
If set, this string will be considered as negative infinity value for FloatType() and DoubleType().
Default is -Inf.
Note
Used only for reading files and parse_column method.
nullValue = None
class-attribute
instance-attribute
¶
If set, this value will be converted to null.
Default is empty string.
positiveInf = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
If set, this string will be considered as positive infinity value for FloatType() and DoubleType().
Default is Inf.
Note
Used only for reading files and parse_column method.
preferDate = None
class-attribute
instance-attribute
¶
If True and inferSchema=True and column does match dateFormat, consider it as DateType().
For columns matching both timestampFormat and dateFormat, consider it as TimestampType().
If False, date and timestamp columns will be considered as StringType().
Default True.
Note
Used only for reading files. Ignored by parse_column function.
quote = Field(default='"', max_length=1)
class-attribute
instance-attribute
¶
Character used to quote field values within CSV field.
Empty string is considered as \u0000 (NUL) character.
quoteAll = None
class-attribute
instance-attribute
¶
If True, all fields are quoted:
"some","field with \"quote","123",""
False, only quote fields containing quote symbols.
any,"field with \"quote",123,
False.
Note
Used only for writing files.
samplingRatio = Field(default=None, ge=0, le=1)
class-attribute
instance-attribute
¶
For inferSchema=True, read the specified fraction of rows to infer the schema.
Default 1.
Note
Used only for reading files. Ignored by parse_column function.
timestampFormat = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
String format for TimestampType() representation.
Default is yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX].
timestampNTZFormat = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
String format for TimestampNTZType() representation.
Default is yyyy-MM-dd'T'HH:mm:ss[.SSS].
Note
Added in Spark 3.2.0
unescapedQuoteHandling = None
class-attribute
instance-attribute
¶
Define how to handle unescaped quotes within CSV field.
STOP_AT_CLOSING_QUOTE- collect all characters until closing quote.BACK_TO_DELIMITER- collect all characters until delimiter or line end.STOP_AT_DELIMITER- collect all characters until delimiter or line end. If quotes are not closed, this may produce incorrect results (e.g. including delimiter inside field value).SKIP_VALUE- skip field and consider it as nullValue.RAISE_ERROR- raise error immediately.
Default STOP_AT_DELIMITER.
Note
Used only for reading files and parse_column method.
parse_column(column, schema)
¶
Parses a CSV string column to a structured Spark SQL column using Spark's from_csv function, based on the provided schema.
Note
Can be used only with Spark 3.x+
Added in 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the column object containing CSV strings/bytes to parse.
-
schema(StructType) –The schema to apply when parsing the CSV data. This defines the structure of the output DataFrame column.
Returns:
-
Column–Column with deserialized data, with the same structure as the provided schema. Column name is the same as input column.
Examples:
>>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType
>>> from onetl.file.format import CSV
>>> df.show()
+--+--------+
|id|value |
+--+--------+
|1 |Alice;20|
|2 |Bob;25 |
+--+--------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)
>>> csv = CSV(delimiter=";")
>>> csv_schema = StructType(
... [
... StructField("name", StringType(), nullable=True),
... StructField("age", IntegerType(), nullable=True),
... ],
... )
>>> parsed_df = df.select("id", csv.parse_column("value", csv_schema))
>>> parsed_df.show()
+--+-----------+
|id|value |
+--+-----------+
|1 |{Alice, 20}|
|2 | {Bob, 25}|
+--+-----------+
>>> parsed_df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
serialize_column(column)
¶
Serializes a structured Spark SQL column into a CSV string column using Spark's to_csv function.
Note
Can be used only with Spark 3.x+
Added in 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the Column object containing the data to serialize to CSV.
Returns:
-
Column–Column with string CSV data. Column name is the same as input column.
Examples:
>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import CSV
>>> df.show()
+--+-----------+
|id|value |
+--+-----------+
|1 |{Alice, 20}|
|2 | {Bob, 25}|
+--+-----------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
>>> # serializing data into CSV format
>>> csv = CSV(delimiter=";")
>>> serialized_df = df.select("id", csv.serialize_column("value"))
>>> serialized_df.show(truncate=False)
+--+--------+
|id|value |
+--+--------+
|1 |Alice;20|
|2 |Bob;25 |
+--+--------+
>>> serialized_df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)