Skip to content

Using Third Party Dependencies in Spark UDFs

Overview

You can use third party Python packages in UDFs used by Transformations by declaring them in extra_pip_dependencies within the cluster config object, that is either tecton.EMRClusterConfig or tecton.DatabricksClusterConfig.

Limitations

The installed packages will be available to use from Spark UDFs but not from the transformation code itself. For example, if you set extra_pip_dependencies=["tensorflow"], you can use tensorflow in a UDF like this:

@transformation(mode="pyspark")
def test_transformation(transformation_input):
    from pyspark.sql import functions as F
    from pyspark.sql.types import LongType

    # tensorflow can be used within my_tensorflow_udf,
    # but not outside of it
    def my_tensorflow_udf(x):
        import tensorflow as tf
        return int(tf.math.log1p(float(x)).numpy())

    my_tensorflow_udf = F.udf(my_tensorflow_udf, LongType())

    ...
Note that the import statement has to be within the UDF body as well. Putting it at the top of the file or inside transformation function won't work.

Complete example: using third party python package with EMR

In this example we're using an imported function log1p from Tensorflow in a UDF applied within a PySpark transformation.

from datetime import datetime, batch_feature_view
from tecton import MaterializationConfig, EMRClusterConfig, BackfillConfig
from shared import entities, data_sources


new_emr = EMRClusterConfig(
    instance_type="m4.xlarge",
    number_of_workers=4,
    extra_pip_dependencies=["tensorflow==2.2.0"],
)

@batch_feature_view(
    mode="pyspark"
    Inputs = {"user_clicks" : Input(data_sources.user_clicks)}
    entities=[entities.product],
    online=False,
    offline=False,
    feature_start_time=datetime(year=2021, month=1, day=1),
    batch_cluster_config=new_emr,
    stream_cluster_config=new_emr,
    backfill_config=BackfillConfig("multiple_batch_schedule_intervals_per_job"),
)
def my_example_feature_view(user_clicks):
  from pyspark.sql import functions as F
  from pyspark.sql.types import LongType

  def my_udf(x):
      import tensorflow as tf
      return int(tf.math.log1p(float(x)).numpy())

  extra_udf = F.udf(my_udf, LongType())

  return transformation_input.select(
      "product_id",
      extra_udf("clicked").alias("log1p_clicked"),
      F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
  )