Skip to content

Connecting to a Kafka Streams Data Source

Tecton can use Kafka as a data source for feature materialization. Connecting to Kafka requires setting up authentication and, if using Amazon MSK, establishing Virtual Private Cloud (VPC) connectivity.

Establish network connectivity

For Amazon MSK

Because your data platform (Databricks or EMR) resides on a different Amazon VPC than the VPC where Amazon MSK resides, you will need to configure access between the two VPCs.

For Confluent

See the Confluent documentation that explains the available options for establishing network connectivity.

Configure authentication

Tecton can connect to Kafka using TLS and SASL.

TLS authentication

Configuring authentication with SSL requires configuring a keystore, and optionally, a truststore.

Set up a keystore and truststore

To set up a keystore, upload your keystore file (a .jks file) to either S3 or DBFS (Databricks only) and set the Tecton ssl_keystore_location parameter of a KafkaConfig object.

If accessing the keystore file requires a password, follow these steps, based on your data platform:

  1. Create a secret using the scope you created in Connecting Databricks. The secret key name must begin with SECRET_.

  2. In your KafkaConfig object, set ssl_keystore_password_secret_id to the secret key name you created in the first step.

  1. In AWS Secrets Manager, create a secret key having the format:

    <prefix>/SECRET_<rest of the secret name>

    where <prefix> is:

    • <deployment name>, if your deployment name begins with tecton
    • tecton-<deployment name>, otherwise

    and <deployment name> is the first part of the URL used to access the Tecton UI: https://<deployment name>.tecton.ai

    and <rest of the secret name> is a string of your choice.

  2. In your KafkaConfig object, set ssl_keystore_password_secret_id to the secret key name you created in the first step.

To set up a truststore, upload your truststore file (a .jks file) to either S3 or DBFS (Databricks only) and set the Tecton ssl_keystore_location parameter of a KafkaConfig object.

If accessing the truststore file requires a password, follow these steps, based on your data platform:

  1. Create a secret using the scope you created in Connecting Databricks. The secret key name must begin with SECRET_.

  2. In your KafkaConfig object, set ssl_truststore_password_secret_id to the secret key name you created in the first step.

  1. In AWS Secrets Manager, create a secret key having the format:

    <prefix>/SECRET_<rest of the secret name>

    where <prefix> is:

    • <deployment name>, if your deployment name begins with tecton
    • tecton-<deployment name>, otherwise

    and <deployment name> is the first part of the URL used to access the Tecton UI: https://<deployment name>.tecton.ai

    and <rest of the secret name> is a string of your choice.

  2. In your KafkaConfig object, set ssl_truststore_password_secret_id to the secret key name you created in the first step.

Note

In the following cases, use of a truststore is required.

  • Kafka configured with a custom Certificate Authority
  • Using Amazon MSK with Databricks

In many other cases, use of a truststore is optional.

You can store keystore and truststore files in any location. The following are example locations (these are set as parameters in the KafkaConfig object):

  • ssl_keystore_location: dbfs:/kafka-credentials/kafka_client_keystore.jks
  • ssl_truststore_location: dbfs:/kafka-credentials/amazon_truststore.jks

You must store your keystore and/or truststore files in the location s3://tecton-<deployment name>/kafka-credentials.

Here, is your deployment name, and is the hostname of your Tecton Web UI; If your cluster is mycompany.tecton.ai, then <deployment name> is mycompany.

At materialization time, Tecton will download all of the files in this location to the materialization cluster.

The ssl_keystore_location and ssl_truststore_location parameters in the KafkaConfig object are as follows:

  • ssl_keystore_location: s3://tecton-<deployment name>/kafka-credentials/kafka_client_keystore.jks
  • ssl_truststore_location: s3://tecton-<deployment name>/kafka-credentials/amazon_truststore.jks

You will also need to add the following bootstrap script to your cluster: s3://tecton.ai.public/install_scripts/setup_emr_notebook_cluster_copy_kafka_credentials.sh. Pass the following argument as input: s3://tecton-<deployment name>. The script will look in the /kafka-credentials folder inside that S3 bucket.

Resources

The following resources are helpful in understanding how to configure TLS.

Specify your Kafka configuration in Tecton

The following KafkaConfig definition shows the parameters that need to be set to configure a Tecton connection to Kafka using TLS.

ssl_keystore_password_secret_id, ssl_truststore_location and ssl_truststore_password_secret_id may not be required; see Set up a keystore and truststore, above.

stream_config = KafkaConfig(
    kafka_bootstrap_servers=os.environ["bootstrap_servers"],
    topics="<Kafka topic name(s)>",
    ssl_keystore_location=os.environ["ssl_keystore_location"],
    ssl_keystore_password_secret_id=os.environ["ssl_keystore_password_secret_id"],
    ssl_truststore_location=os.environ["ssl_truststore_location"],
    ssl_truststore_password_secret_id=os.environ["ssl_truststore_password_secret_id"],
    security_protocol="SSL",
    <other parameters>
    )

SASL authentication

A Tecton connection to Kafka using SASL is configured using Java Authentication and Authorization Service (JAAS) settings.

For information on JAAS and its supported SASL authentication mechanisms, see this page in the Confluent documentation.

The following KafkaConfig definition shows the parameters that need to be set to configure a Tecton connection to Kafka using the SASL PLAIN mechanism. To use a different connection mechanism, you would modify the settings in the kafka_settings() function.

def get_sasl_username():
    return os.environ["sasl_username"]

def get_sasl_password():
    return os.environ["sasl_password"]

def get_kafka_settings():
    return {
        "kafka.sasl.jaas.config":(
            f'org.apache.kafka.common.security.plain.PlainLoginModule '
            f'required username="' + get_sasl_username() + '" password="' + get_sasl_password()+ '";'
            ),
        "kafka.sasl.mechanism":"PLAIN"
    }

stream_config=KafkaConfig(
        kafka_bootstrap_servers=os.environ["bootstrap_servers"],
        topics="<Kafka topic name(s)>",
        options=get_kafka_settings(),
        security_protocol="SASL_SSL"