Skip to content

Kafka SSLProtocol

KafkaSSLProtocol

Bases: KafkaProtocol, GenericOptions

Connect to Kafka using SSL or SASL_SSL security protocols.

For more details see:

Added in 0.9.0

Examples:

Pass PEM certificate as files located on Spark driver host:

from pathlib import Path

# Just read existing files located on host, and pass key and certificates as strings
protocol = Kafka.SSLProtocol(
    truststore_type="PEM",
    truststore_certificates=Path("/path/to/server.crt").read_text(),
)
Pass PEM certificate as raw string:

protocol = Kafka.SSLProtocol(
    truststore_type="PEM",
    truststore_certificates="-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
)

Pass PEM key and certificates as files located on Spark driver host:

from pathlib import Path

# Just read existing files located on host, and pass key and certificates as strings
protocol = Kafka.SSLProtocol(
    keystore_type="PEM",
    keystore_certificate_chain=Path("path/to/user.crt").read_text(),
    keystore_key=Path("path/to/user.key").read_text(),
    truststore_type="PEM",
    truststore_certificates=Path("/path/to/server.crt").read_text(),
)
Pass PEM key and certificates as raw strings:

protocol = Kafka.SSLProtocol(
    keystore_type="PEM",
    keystore_certificate_chain="-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
    keystore_key="-----BEGIN PRIVATE KEY...\n...END PRIVATE KEY-----",
    truststore_type="PEM",
    truststore_certificates="-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
)
protocol = Kafka.SSLProtocol.parse(
    {
        # Just the same options as above, but using Kafka config naming with dots
        "ssl.keystore.type": "PEM",
        "ssl.keystore.certificate_chain": "-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
        "ssl.keystore.key": "-----BEGIN PRIVATE KEY...\n...END PRIVATE KEY-----",
        "ssl.truststore.type": "PEM",
        "ssl.truststore.certificates": "-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
        # Any option starting from "ssl." is passed to Kafka client as-is
        "ssl.protocol": "TLSv1.3",
    }
)
Not recommended

These options are error-prone and have several drawbacks, so it is not recommended to use them.

Passing PEM certificates as files:

  • ENCRYPT user.key file with password "some password" using PKCS#8 scheme.
  • Save encrypted key to file /path/to/user/encrypted_key_with_certificate_chain.pem.
  • Then append user certificate to the end of this file.
  • Deploy this file (and server certificate too) to EVERY host Spark could run (both driver and executors).
  • Then pass file locations and password for key decryption to options below.

protocol = Kafka.SSLProtocol(
    keystore_type="PEM",
    keystore_location="/path/to/user/encrypted_key_with_certificate_chain.pem",
    key_password="some password",
    truststore_type="PEM",
    truststore_location="/path/to/server.crt",
)
Passing JKS (Java Key Store) location:

protocol = Kafka.SSLProtocol(
    keystore_type="JKS",
    keystore_location="/usr/lib/jvm/default/lib/security/keystore.jks",
    keystore_password="changeit",
    truststore_type="JKS",
    truststore_location="/usr/lib/jvm/default/lib/security/truststore.jks",
    truststore_password="changeit",
)

cleanup(kafka)

This method is called while closing Kafka connection.

Implement it to cleanup resources like temporary files.

Parameters:

  • kafka (Kafka) –

    Connection instance

get_options(kafka)

Get options for Kafka connection

Parameters:

  • kafka (Kafka) –

    Connection instance

Returns:

  • dict ( dict ) –

    Kafka client options

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised