Using Third Party Dependencies in Spark UDFs
Overview
You can use third party Python packages in UDFs used by Transformations by declaring them in extra_pip_dependencies
within the cluster config object, that is either tecton.EMRClusterConfig or tecton.DatabricksClusterConfig.
Limitations
The installed packages will be available to use from Spark UDFs but not from the transformation code itself.
For example, if you set extra_pip_dependencies=["tensorflow"], you can use tensorflow in a UDF like this:
@transformation(mode="pyspark")
def test_transformation(transformation_input):
    from pyspark.sql import functions as F
    from pyspark.sql.types import LongType
    # tensorflow can be used within my_tensorflow_udf,
    # but not outside of it
    def my_tensorflow_udf(x):
        import tensorflow as tf
        return int(tf.math.log1p(float(x)).numpy())
    my_tensorflow_udf = F.udf(my_tensorflow_udf, LongType())
    ...
Complete example: using third party python package with EMR
In this example we're using an imported function log1p from Tensorflow in a UDF applied within a PySpark transformation.
from datetime import datetime, batch_feature_view
from tecton import MaterializationConfig, EMRClusterConfig, BackfillConfig
from shared import entities, data_sources
new_emr = EMRClusterConfig(
    instance_type="m4.xlarge",
    number_of_workers=4,
    extra_pip_dependencies=["tensorflow==2.2.0"],
)
@batch_feature_view(
    mode="pyspark"
    Inputs = {"user_clicks" : Input(data_sources.user_clicks)}
    entities=[entities.product],
    online=False,
    offline=False,
    feature_start_time=datetime(year=2021, month=1, day=1),
    batch_cluster_config=new_emr,
    stream_cluster_config=new_emr,
    backfill_config=BackfillConfig("multiple_batch_schedule_intervals_per_job"),
)
def my_example_feature_view(user_clicks):
  from pyspark.sql import functions as F
  from pyspark.sql.types import LongType
  def my_udf(x):
      import tensorflow as tf
      return int(tf.math.log1p(float(x)).numpy())
  extra_udf = F.udf(my_udf, LongType())
  return transformation_input.select(
      "product_id",
      extra_udf("clicked").alias("log1p_clicked"),
      F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
  )