Skip to content

CSV

Bases: ReadWriteFileFormat

CSV file format. support hooks

Based on Spark CSV file format.

Supports reading/writing files with .csv extension with content like:

example.csv
"some","value"
"another","value"

Added in 0.9.0

Examples:

Note

You can pass any option mentioned in official documentation. Option names should be in camelCase!

The set of supported options depends on Spark version.

from onetl.file.format import CSV

csv = CSV(header=True, inferSchema=True, mode="PERMISSIVE")
from onetl.file.format import CSV

csv = CSV(header=True, compression="gzip")

charToEscapeQuoteEscaping = Field(default=None, max_length=1) class-attribute instance-attribute

If CSV field value contains [escape][onetl.file.format.csv.CSV.escape] character, it should be escaped as well. For example, if escape="\", when line:

"some \" quoted value",other
"some \\ backslashed value",another
will be parsed as:

[
    ('some " quoted value', "other"),
    ("some \ backslashed value", "another"),
]
And vice-versa, for writing CSV rows to file.

Default is same as [escape][onetl.file.format.csv.CSV.escape].

columnNameOfCorruptRecord = Field(default=None, min_length=1) class-attribute instance-attribute

Name of column to put corrupt records in. Default is _corrupt_record.

Warning

If DataFrame schema is provided, this column should be added to schema explicitly:

from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV

from pyspark.sql.types import StructType, StructField, TimestampType, StringType

spark = ...

schema = StructType(
    [
        StructField("my_field", TimestampType()),
        StructField("_corrupt_record", StringType()),  # <-- important
    ]
)

csv = CSV(mode="PERMISSIVE", columnNameOfCorruptRecord="_corrupt_record")

reader = FileDFReader(
    connection=connection,
    format=csv,
    df_schema=schema,  # < ---
)
df = reader.run(["/some/file.csv"])

Note

Used only for reading files and parse_column method.

comment = Field(default=None, max_length=1) class-attribute instance-attribute

If set, all lines starting with specified character (e.g. #) are considered a comment, and skipped. Default is not set, meaning that CSV lines should not contain comments.

Note

Used only for reading files and parse_column method.

compression = None class-attribute instance-attribute

Compression codec of the CSV file. Default none.

Note

Used only for writing files. Ignored by parse_column method.

dateFormat = Field(default=None, min_length=1) class-attribute instance-attribute

String format for DateType() representation. Default is yyyy-MM-dd.

delimiter = Field(default=',', alias=(avoid_alias('sep'))) class-attribute instance-attribute

Character used to separate fields in CSV row.

emptyValue = None class-attribute instance-attribute

Value used for empty string fields.

Defaults:

  • empty string for reading.
  • "" for writing.

enforceSchema = None class-attribute instance-attribute

If True, inferred or user-provided schema has higher priority than CSV file headers. This means that all input files should have the same structure.

If False, CSV headers are used as a primary source of information about column names and their position.

Default True.

Note

Used only for reading files. Ignored by parse_column function.

escapeQuotes = None class-attribute instance-attribute

If True, escape quotes within CSV field.

any,field with \"quote,123,
If False, wrap fields containing quote symbols with quotes.

any,"field with ""quote ",123,
Default True.

Note

Used only for writing files.

header = None class-attribute instance-attribute

If True, the first row of the file is considered a header. Default False.

ignoreLeadingWhiteSpace = None class-attribute instance-attribute

If True, trim leading whitespaces in field value.

Defaults:

  • True for reading.
  • False for writing.

ignoreTrailingWhiteSpace = None class-attribute instance-attribute

If True, trim trailing whitespaces in field value.

Defaults:

  • True for reading.
  • False for writing.

inferSchema = None class-attribute instance-attribute

If True, try to infer the input schema by reading a sample of the file (see samplingRatio). Default False which means that all parsed columns will be StringType().

Note

Used only for reading files, and only if user haven't provider explicit DataFrame schema. Ignored by parse_column function.

locale = Field(default=None, min_length=1) class-attribute instance-attribute

Locale name used to parse dates and timestamps. Default is en-US

Note

Used only for reading files and parse_column method.

maxCharsPerColumn = None class-attribute instance-attribute

Maximum number of characters to read per column. Default is -1, which means no limit.

Note

Used only for reading files and parse_column method.

mode = None class-attribute instance-attribute

How to handle parsing errors:

  • PERMISSIVE - set field value as null, move raw data to columnNameOfCorruptRecord column.
  • DROPMALFORMED - skip the malformed row.
  • FAILFAST - throw an error immediately.

Default is PERMISSIVE.

Note

Used only for reading files and parse_column method.

multiLine = None class-attribute instance-attribute

If True, fields may contain line separators. If False, the input is expected to have one record per file.

Default is True.

Note

Used only for reading files. Ignored by parse_column method, as it expects that each DataFrame row will contain exactly one CSV line.

nanValue = Field(default=None) class-attribute instance-attribute

If set, this string will be considered as Not-A-Number (NaN) value for FloatType() and DoubleType(). Default is NaN.

Note

Used only for reading files and parse_column method.

negativeInf = Field(default=None, min_length=1) class-attribute instance-attribute

If set, this string will be considered as negative infinity value for FloatType() and DoubleType(). Default is -Inf.

Note

Used only for reading files and parse_column method.

nullValue = None class-attribute instance-attribute

If set, this value will be converted to null. Default is empty string.

positiveInf = Field(default=None, min_length=1) class-attribute instance-attribute

If set, this string will be considered as positive infinity value for FloatType() and DoubleType(). Default is Inf.

Note

Used only for reading files and parse_column method.

preferDate = None class-attribute instance-attribute

If True and inferSchema=True and column does match dateFormat, consider it as DateType(). For columns matching both timestampFormat and dateFormat, consider it as TimestampType().

If False, date and timestamp columns will be considered as StringType().

Default True.

Note

Used only for reading files. Ignored by parse_column function.

quote = Field(default='"', max_length=1) class-attribute instance-attribute

Character used to quote field values within CSV field.

Empty string is considered as \u0000 (NUL) character.

quoteAll = None class-attribute instance-attribute

If True, all fields are quoted:

"some","field with \"quote","123",""
If False, only quote fields containing quote symbols.

any,"field with \"quote",123,
Default False.

Note

Used only for writing files.

samplingRatio = Field(default=None, ge=0, le=1) class-attribute instance-attribute

For inferSchema=True, read the specified fraction of rows to infer the schema. Default 1.

Note

Used only for reading files. Ignored by parse_column function.

timestampFormat = Field(default=None, min_length=1) class-attribute instance-attribute

String format for TimestampType() representation. Default is yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX].

timestampNTZFormat = Field(default=None, min_length=1) class-attribute instance-attribute

String format for TimestampNTZType() representation. Default is yyyy-MM-dd'T'HH:mm:ss[.SSS].

Note

Added in Spark 3.2.0

unescapedQuoteHandling = None class-attribute instance-attribute

Define how to handle unescaped quotes within CSV field.

  • STOP_AT_CLOSING_QUOTE - collect all characters until closing quote.
  • BACK_TO_DELIMITER - collect all characters until delimiter or line end.
  • STOP_AT_DELIMITER - collect all characters until delimiter or line end. If quotes are not closed, this may produce incorrect results (e.g. including delimiter inside field value).
  • SKIP_VALUE - skip field and consider it as nullValue.
  • RAISE_ERROR - raise error immediately.

Default STOP_AT_DELIMITER.

Note

Used only for reading files and parse_column method.

parse_column(column, schema)

Parses a CSV string column to a structured Spark SQL column using Spark's from_csv function, based on the provided schema.

Note

Can be used only with Spark 3.x+

Added in 0.11.0

Parameters:

  • column (str | Column) –

    The name of the column or the column object containing CSV strings/bytes to parse.

  • schema (StructType) –

    The schema to apply when parsing the CSV data. This defines the structure of the output DataFrame column.

Returns:

  • Column

    Column with deserialized data, with the same structure as the provided schema. Column name is the same as input column.

Examples:

>>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType
>>> from onetl.file.format import CSV
>>> df.show()
+--+--------+
|id|value   |
+--+--------+
|1 |Alice;20|
|2 |Bob;25  |
+--+--------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)
>>> csv = CSV(delimiter=";")
>>> csv_schema = StructType(
...     [
...         StructField("name", StringType(), nullable=True),
...         StructField("age", IntegerType(), nullable=True),
...     ],
... )
>>> parsed_df = df.select("id", csv.parse_column("value", csv_schema))
>>> parsed_df.show()
+--+-----------+
|id|value      |
+--+-----------+
|1 |{Alice, 20}|
|2 |  {Bob, 25}|
+--+-----------+
>>> parsed_df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- age: integer (nullable = true)

serialize_column(column)

Serializes a structured Spark SQL column into a CSV string column using Spark's to_csv function.

Note

Can be used only with Spark 3.x+

Added in 0.11.0

Parameters:

  • column (str | Column) –

    The name of the column or the Column object containing the data to serialize to CSV.

Returns:

  • Column

    Column with string CSV data. Column name is the same as input column.

Examples:

>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import CSV
>>> df.show()
+--+-----------+
|id|value      |
+--+-----------+
|1 |{Alice, 20}|
|2 |  {Bob, 25}|
+--+-----------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- age: integer (nullable = true)
>>> # serializing data into CSV format
>>> csv = CSV(delimiter=";")
>>> serialized_df = df.select("id", csv.serialize_column("value"))
>>> serialized_df.show(truncate=False)
+--+--------+
|id|value   |
+--+--------+
|1 |Alice;20|
|2 |Bob;25  |
+--+--------+
>>> serialized_df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)