Kafka SSLProtocol¶
KafkaSSLProtocol
¶
Bases: KafkaProtocol, GenericOptions
Connect to Kafka using SSL or SASL_SSL security protocols.
For more details see:
Added in 0.9.0
Examples:
Pass PEM certificate as files located on Spark driver host:
from pathlib import Path
# Just read existing files located on host, and pass key and certificates as strings
protocol = Kafka.SSLProtocol(
truststore_type="PEM",
truststore_certificates=Path("/path/to/server.crt").read_text(),
)
protocol = Kafka.SSLProtocol(
truststore_type="PEM",
truststore_certificates="-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
)
Pass PEM key and certificates as files located on Spark driver host:
from pathlib import Path
# Just read existing files located on host, and pass key and certificates as strings
protocol = Kafka.SSLProtocol(
keystore_type="PEM",
keystore_certificate_chain=Path("path/to/user.crt").read_text(),
keystore_key=Path("path/to/user.key").read_text(),
truststore_type="PEM",
truststore_certificates=Path("/path/to/server.crt").read_text(),
)
protocol = Kafka.SSLProtocol(
keystore_type="PEM",
keystore_certificate_chain="-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
keystore_key="-----BEGIN PRIVATE KEY...\n...END PRIVATE KEY-----",
truststore_type="PEM",
truststore_certificates="-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
)
protocol = Kafka.SSLProtocol.parse(
{
# Just the same options as above, but using Kafka config naming with dots
"ssl.keystore.type": "PEM",
"ssl.keystore.certificate_chain": "-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
"ssl.keystore.key": "-----BEGIN PRIVATE KEY...\n...END PRIVATE KEY-----",
"ssl.truststore.type": "PEM",
"ssl.truststore.certificates": "-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
# Any option starting from "ssl." is passed to Kafka client as-is
"ssl.protocol": "TLSv1.3",
}
)
Not recommended
These options are error-prone and have several drawbacks, so it is not recommended to use them.
Passing PEM certificates as files:
- ENCRYPT
user.keyfile with password"some password"using PKCS#8 scheme. - Save encrypted key to file
/path/to/user/encrypted_key_with_certificate_chain.pem. - Then append user certificate to the end of this file.
- Deploy this file (and server certificate too) to EVERY host Spark could run (both driver and executors).
- Then pass file locations and password for key decryption to options below.
protocol = Kafka.SSLProtocol(
keystore_type="PEM",
keystore_location="/path/to/user/encrypted_key_with_certificate_chain.pem",
key_password="some password",
truststore_type="PEM",
truststore_location="/path/to/server.crt",
)
- Add user key and certificate to JKS keystore.
- Add server certificate to JKS truststore.
- This should be done on EVERY host Spark could run (both driver and executors).
- Pass keystore and truststore paths to options below, as well as passwords for accessing these stores:
protocol = Kafka.SSLProtocol(
keystore_type="JKS",
keystore_location="/usr/lib/jvm/default/lib/security/keystore.jks",
keystore_password="changeit",
truststore_type="JKS",
truststore_location="/usr/lib/jvm/default/lib/security/truststore.jks",
truststore_password="changeit",
)
cleanup(kafka)
¶
This method is called while closing Kafka connection.
Implement it to cleanup resources like temporary files.
Parameters:
-
kafka(Kafka) –Connection instance
get_options(kafka)
¶
Get options for Kafka connection
Parameters:
-
kafka(Kafka) –Connection instance
Returns:
-
dict(dict) –Kafka client options
parse(options)
classmethod
¶
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised