Skip to content

Reading from Kafka

Data can be read from Kafka to Spark using DBReader. It also supports strategy for incremental data reading.

Supported DBReader features

Dataframe schema

Unlike other DB connections, Kafka does not have concept of columns. All the topics messages have the same set of fields, see structure below:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = false)
|-- partition: integer (nullable = false)
|-- offset: integer (nullable = false)
|-- timestamp: timestamp (nullable = false)
|-- timestampType: integer (nullable = false)
|-- headers: struct (nullable = true)
    |-- key: string (nullable = false)
    |-- value: binary (nullable = true)

headers field is present in the dataframe only if Kafka.ReadOptions(include_headers=True) is passed (compatibility with Kafka 1.x).

Value deserialization

To read value or key of other type than bytes (e.g. struct or integer), users have to deserialize values manually.

This could be done using following methods:

Or any other method provided by Spark or third-larty libraries which can parse BinaryType() column into useful data.

GroupIds and offsets

Regular Kafka consumers use subscrube(topic) method to notify Kafka that some new data from Kafka should be send to consumer if available. Offsets read by group are committed to Kafka, to guarantee at-least-once even if consumer failed somethere.

Spark connector for Kafka is very different. It uses assign(topic) to read data manually from a topic. It doesn't commit offsets to Kafka, as the same data can be read multiple times, e.g. task failed and lost all its memory, new task will read this data again.

Examples

Snapshot strategy, value is Avro binary data:

from onetl.connection import Kafka
from onetl.db import DBReader, DBWriter
from onetl.file.format import Avro
from pyspark.sql.functions import decode

# read all topic data from Kafka
kafka = Kafka(...)
reader = DBReader(connection=kafka, source="avro_topic")
read_df = reader.run()

# parse Avro format to Spark struct
avro = Avro(
    schema_dict={
        "type": "record",
        "name": "Person",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "age", "type": "int"},
        ],
    }
)
deserialized_df = read_df.select(
    # cast binary key to string
    decode("key", "UTF-8").alias("key"),
    avro.parse_column("value"),
)

Incremental strategy, value is JSON string:

Note

Currently Kafka connector does support only HWMs based on offset field. Other fields, like timestamp, are not yet supported.

from onetl.connection import Kafka
from onetl.db import DBReader, DBWriter
from onetl.file.format import JSON
from pyspark.sql.functions import decode

kafka = Kafka(...)

# read only new data from Kafka topic
reader = DBReader(
    connection=kafka,
    source="topic_name",
    hwm=DBReader.AutoDetectHWM(name="kafka_hwm", expression="offset"),
)

with IncrementalStrategy():
    read_df = reader.run()

# parse JSON format to Spark struct
json = JSON()
schema = StructType(
    [
        StructField("name", StringType(), nullable=True),
        StructField("age", IntegerType(), nullable=True),
    ],
)
deserialized_df = read_df.select(
    # cast binary key to string
    decode("key", "UTF-8").alias("key"),
    json.parse_column("value", json),
)

Options

KafkaReadOptions

Bases: GenericOptions

Reading options for Kafka connector.

Warning

Options:

  • assign
  • endingOffsets
  • endingOffsetsByTimestamp
  • kafka.*
  • startingOffsets
  • startingOffsetsByTimestamp
  • startingTimestamp
  • subscribe
  • subscribePattern

are populated from connection attributes, and cannot be overridden by the user in ReadOptions to avoid issues.

Added in 0.9.0

Examples:

Note

You can pass any value supported by connector, even if it is not mentioned in this documentation. Option names should be in camelCase!

The set of supported options depends on connector version.

from onetl.connection import Kafka

options = Kafka.ReadOptions(
    includeHeaders=False,
    minPartitions=50,
)

include_headers = Field(default=False, alias='includeHeaders') class-attribute instance-attribute

If True, add headers column to output DataFrame.

If False, column will not be added.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised