Kafka Slots¶
Kafka slots that could be implemented by third-party plugins
Added in 0.9.0
get_cluster_addresses(cluster)
staticmethod
¶
Retrieve a collection of known broker addresses for the specified Kafka cluster.
This can be used to obtain the broker addresses dynamically.
Added in 0.9.0
Parameters:
-
cluster(str) –The Kafka cluster name.
Returns:
-
list[str] | None–A collection of broker addresses for the specified Kafka cluster. If the hook cannot be applied, return
None.
Examples:
from onetl.connection import Kafka
from onetl.hooks import hook
@Kafka.Slots.get_cluster_addresses.bind
@hook
def get_cluster_addresses(cluster: str) -> list[str] | None:
if cluster == "kafka_cluster":
return ["192.168.1.1:9092", "192.168.1.2:9092", "192.168.1.3:9092"]
return None
get_known_clusters()
staticmethod
¶
Retrieve the collection of known Kafka clusters.
This can be used to validate if the provided Kafka cluster name is recognized in the system.
Added in 0.9.0
Returns:
-
set[str] | None–A collection of known Kafka cluster names. If the hook cannot be applied, return
None.
Examples:
from onetl.connection import Kafka
from onetl.hooks import hook
@Kafka.Slots.get_known_clusters.bind
@hook
def get_known_clusters() -> set[str] | None:
return {"kafka-cluster", "local"}
normalize_address(address, cluster)
staticmethod
¶
Normalize the given broker address for a specific Kafka cluster.
This can be used to format the broker address according to specific rules, such as adding default ports.
Added in 0.9.0
Parameters:
-
address(str) –The original broker address.
-
cluster(str) –The Kafka cluster name for which the address should be normalized.
Returns:
-
str | None–The normalized broker address. If the hook cannot be applied to the specific address, return
None.
Examples:
from onetl.connection import Kafka
from onetl.hooks import hook
@Kafka.Slots.normalize_address.bind
@hook
def normalize_address(address: str, cluster: str) -> str | None:
if cluster == "kafka-cluster" and ":" not in address:
return f"{address}:9092"
return None
normalize_cluster_name(cluster)
staticmethod
¶
Normalize the given Kafka cluster name.
This can be used to ensure that the Kafka cluster name conforms to specific naming conventions.
Added in 0.9.0
Parameters:
-
cluster(str) –The original Kafka cluster name.
Returns:
-
str | None–The normalized Kafka cluster name. If the hook cannot be applied, return
None.
Examples:
from onetl.connection import Kafka
from onetl.hooks import hook
@Kafka.Slots.normalize_cluster_name.bind
@hook
def normalize_cluster_name(cluster: str) -> str | None:
return cluster.lower()