Connecting to a Kafka Streams Data Source
Tecton can use Kafka as a data source for feature materialization. Connecting to Kafka requires setting up authentication and, if using Amazon MSK, establishing Virtual Private Cloud (VPC) connectivity.
Establish network connectivity
For Amazon MSK
Because your data platform (Databricks or EMR) resides on a different Amazon VPC than the VPC where Amazon MSK resides, you will need to configure access between the two VPCs.
For Confluent
See the Confluent documentation that explains the available options for establishing network connectivity.
Configure authentication
Tecton can connect to Kafka using TLS and SASL.
TLS authentication
Configuring authentication with SSL requires configuring a keystore, and optionally, a truststore.
Set up a keystore and truststore
To set up a keystore, upload your keystore file (a .jks
file) to either S3 or DBFS (Databricks only) and set the Tecton ssl_keystore_location
parameter of a KafkaConfig
object.
If accessing the keystore file requires a password, follow these steps, based on your data platform:
-
Create a secret using the scope you created in Connecting Databricks. The secret key name must begin with
SECRET_
. -
In your
KafkaConfig
object, setssl_keystore_password_secret_id
to the secret key name you created in the first step.
-
In AWS Secrets Manager, create a secret key having the format:
<prefix>/SECRET_<rest of the secret name>
where
<prefix>
is:<deployment name>
, if your deployment name begins withtecton
tecton-<deployment name>
, otherwise
and
<deployment name>
is the first part of the URL used to access the Tecton UI:https://<deployment name>.tecton.ai
and
<rest of the secret name>
is a string of your choice. -
In your
KafkaConfig
object, setssl_keystore_password_secret_id
to the secret key name you created in the first step.
To set up a truststore, upload your truststore file (a .jks
file) to either S3 or DBFS (Databricks only) and set the Tecton ssl_keystore_location
parameter of a KafkaConfig
object.
If accessing the truststore file requires a password, follow these steps, based on your data platform:
-
Create a secret using the scope you created in Connecting Databricks. The secret key name must begin with
SECRET_
. -
In your
KafkaConfig
object, setssl_truststore_password_secret_id
to the secret key name you created in the first step.
-
In AWS Secrets Manager, create a secret key having the format:
<prefix>/SECRET_<rest of the secret name>
where
<prefix>
is:<deployment name>
, if your deployment name begins withtecton
tecton-<deployment name>
, otherwise
and
<deployment name>
is the first part of the URL used to access the Tecton UI:https://<deployment name>.tecton.ai
and
<rest of the secret name>
is a string of your choice. -
In your
KafkaConfig
object, setssl_truststore_password_secret_id
to the secret key name you created in the first step.
Note
In the following cases, use of a truststore is required.
- Kafka configured with a custom Certificate Authority
- Using Amazon MSK with Databricks
In many other cases, use of a truststore is optional.
You can store keystore and truststore files in any location. The following are example locations (these are set as parameters in the KafkaConfig
object):
ssl_keystore_location
:dbfs:/kafka-credentials/kafka_client_keystore.jks
ssl_truststore_location
:dbfs:/kafka-credentials/amazon_truststore.jks
You must store your keystore and/or truststore files in the location s3://tecton-<deployment name>/kafka-credentials
.
Here, mycompany.tecton.ai
, then <deployment name>
is mycompany
.
At materialization time, Tecton will download all of the files in this location to the materialization cluster.
The ssl_keystore_location
and ssl_truststore_location
parameters in the KafkaConfig
object are as follows:
ssl_keystore_location
:s3://tecton-<deployment name>/kafka-credentials/kafka_client_keystore.jks
ssl_truststore_location
:s3://tecton-<deployment name>/kafka-credentials/amazon_truststore.jks
You will also need to add the following bootstrap script to your cluster:
s3://tecton.ai.public/install_scripts/setup_emr_notebook_cluster_copy_kafka_credentials.sh
. Pass the following argument as input: s3://tecton-<deployment name>
. The script will look in the /kafka-credentials
folder inside that S3 bucket.
Resources
The following resources are helpful in understanding how to configure TLS.
-
Encrypt and Authenticate with TLS in the Confluent documentation.
-
Mutual TLS Authentication in the Amazon documentation.
Specify your Kafka configuration in Tecton
The following KafkaConfig
definition shows the parameters that need to be set to configure a Tecton connection to Kafka using TLS.
ssl_keystore_password_secret_id
, ssl_truststore_location
and ssl_truststore_password_secret_id
may not be required; see Set up a keystore and truststore, above.
stream_config = KafkaConfig(
kafka_bootstrap_servers=os.environ["bootstrap_servers"],
topics="<Kafka topic name(s)>",
ssl_keystore_location=os.environ["ssl_keystore_location"],
ssl_keystore_password_secret_id=os.environ["ssl_keystore_password_secret_id"],
ssl_truststore_location=os.environ["ssl_truststore_location"],
ssl_truststore_password_secret_id=os.environ["ssl_truststore_password_secret_id"],
security_protocol="SSL",
<other parameters>
)
SASL authentication
A Tecton connection to Kafka using SASL is configured using Java Authentication and Authorization Service (JAAS) settings.
For information on JAAS and its supported SASL authentication mechanisms, see this page in the Confluent documentation.
The following KafkaConfig
definition shows the parameters that need to be set to configure a Tecton connection to Kafka using the SASL PLAIN
mechanism. To use a different connection mechanism, you would modify the settings in the kafka_settings()
function.
def get_sasl_username():
return os.environ["sasl_username"]
def get_sasl_password():
return os.environ["sasl_password"]
def get_kafka_settings():
return {
"kafka.sasl.jaas.config":(
f'org.apache.kafka.common.security.plain.PlainLoginModule '
f'required username="' + get_sasl_username() + '" password="' + get_sasl_password()+ '";'
),
"kafka.sasl.mechanism":"PLAIN"
}
stream_config=KafkaConfig(
kafka_bootstrap_servers=os.environ["bootstrap_servers"],
topics="<Kafka topic name(s)>",
options=get_kafka_settings(),
security_protocol="SASL_SSL"