Skip to content

Writing to Kafka

For writing data to Kafka, use DBWriter with specific options (see below).

Dataframe schema

Unlike other DB connections, Kafka does not have concept of columns. All the topics messages have the same set of fields. Only some of them can be written:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- headers: struct (nullable = true)
    |-- key: string (nullable = false)
    |-- value: binary (nullable = true)

headers can be passed only with Kafka.WriteOptions(include_headers=True) (compatibility with Kafka 1.x).

Field topic should not be present in the dataframe, as it is passed to DBWriter(target=...).

Other fields, like partition, offset, timestamp are set by Kafka, and cannot be passed explicitly.

Value serialization

To write value or key of other type than bytes (e.g. struct or integer), users have to serialize values manually.

This could be done using following methods:

Examples

Convert value to JSON string, and write to Kafka:

from onetl.connection import Kafka
from onetl.db import DBWriter
from onetl.file.format import JSON

df = ...  # original data is here

# serialize struct data as JSON
json = JSON()
write_df = df.select(
    df.key,
    json.serialize_column(df.value),
)

# write data to Kafka
kafka = Kafka(...)

writer = DBWriter(
    connection=kafka,
    target="topic_name",
)
writer.run(write_df)

Options

KafkaWriteOptions

Bases: GenericOptions

Writing options for Kafka connector.

Warning

Options:

  • kafka.*
  • topic

are populated from connection attributes, and cannot be overridden by the user in WriteOptions to avoid issues.

Added in 0.9.0

Examples:

Note

You can pass any value supported by connector, even if it is not mentioned in this documentation. Option names should be in camelCase!

The set of supported options depends on connector version.

from onetl.connection import Kafka

options = Kafka.WriteOptions(
    if_exists="append",
    includeHeaders=True,
)

if_exists = Field(default=(KafkaTopicExistBehaviorKafka.APPEND)) class-attribute instance-attribute

Behavior of writing data into existing topic.

Same as df.write.mode(...).

Possible values:

  • append (default) - Adds new objects into existing topic.
  • error - Raises an error if topic already exists.

include_headers = Field(default=False, alias='includeHeaders') class-attribute instance-attribute

If True, headers column from dataframe can be written to Kafka (requires Kafka 2.0+).

If False and dataframe contains headers column, an exception will be raised.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised