Writing to Kafka¶
For writing data to Kafka, use DBWriter with specific options (see below).
Dataframe schema¶
Unlike other DB connections, Kafka does not have concept of columns. All the topics messages have the same set of fields. Only some of them can be written:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- headers: struct (nullable = true)
|-- key: string (nullable = false)
|-- value: binary (nullable = true)
headers can be passed only with Kafka.WriteOptions(include_headers=True) (compatibility with Kafka 1.x).
Field topic should not be present in the dataframe, as it is passed to DBWriter(target=...).
Other fields, like partition, offset, timestamp are set by Kafka, and cannot be passed explicitly.
Value serialization¶
To write value or key of other type than bytes (e.g. struct or integer), users have to serialize values manually.
This could be done using following methods:
Examples¶
Convert value to JSON string, and write to Kafka:
from onetl.connection import Kafka
from onetl.db import DBWriter
from onetl.file.format import JSON
df = ... # original data is here
# serialize struct data as JSON
json = JSON()
write_df = df.select(
df.key,
json.serialize_column(df.value),
)
# write data to Kafka
kafka = Kafka(...)
writer = DBWriter(
connection=kafka,
target="topic_name",
)
writer.run(write_df)
Options¶
KafkaWriteOptions
¶
Bases: GenericOptions
Writing options for Kafka connector.
Warning
Options:
kafka.*topic
are populated from connection attributes,
and cannot be overridden by the user in WriteOptions to avoid issues.
Added in 0.9.0
Examples:
Note
You can pass any value
supported by connector,
even if it is not mentioned in this documentation. Option names should be in camelCase!
The set of supported options depends on connector version.
from onetl.connection import Kafka
options = Kafka.WriteOptions(
if_exists="append",
includeHeaders=True,
)
if_exists = Field(default=(KafkaTopicExistBehaviorKafka.APPEND))
class-attribute
instance-attribute
¶
Behavior of writing data into existing topic.
Same as df.write.mode(...).
Possible values:
append(default) - Adds new objects into existing topic.error- Raises an error if topic already exists.
include_headers = Field(default=False, alias='includeHeaders')
class-attribute
instance-attribute
¶
If True, headers column from dataframe can be written to Kafka (requires Kafka 2.0+).
If False and dataframe contains headers column, an exception will be raised.
parse(options)
classmethod
¶
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised