Skip to content

Using Third Party Dependencies in PySpark Transformations

Overview

You can use third party Python packages in UDFs used by PySpark transformations (mode="pyspark") by declaring them in extra_pip_dependencies within the cluster config object, that is either tecton.EMRClusterConfig or tecton.DatabricksClusterConfig.

Limitations

The installed packages will be available to use from Spark UDFs but not from the transformation code itself. For example, if you set extra_pip_dependencies=["tensorflow"], you can use tensorflow in a UDF like this:

@transformation(mode="pyspark")
def test_transformation(transformation_input):
    from pyspark.sql import functions as F
    from pyspark.sql.types import LongType

    # tensorflow can be used within my_tensorflow_udf,
    # but not outside of it
    def my_tensorflow_udf(x):
        import tensorflow as tf
        return int(tf.math.log1p(float(x)).numpy())

    my_tensorflow_udf = F.udf(my_tensorflow_udf, LongType())

    ...
Note that the import statement has to be within the UDF body as well. Putting it at the top of the file or inside transformation function won't work.

Complete example: using third party python package with EMR

In this example we're using an imported function log1p from Tensorflow in a UDF applied within a PySpark transformation.

from datetime import datetime, timedelta
from tecton import batch_feature_view, EMRClusterConfig, FilteredSource
from shared import entities, data_sources

new_emr = EMRClusterConfig(
    instance_type="m4.xlarge",
    number_of_workers=4,
    extra_pip_dependencies=["tensorflow==2.2.0"],
)

@batch_feature_view(
    mode="pyspark",
    sources=[FilteredSource(source=user_clicks)],
    entities=[entities.product],
    online=False,
    offline=False,
    feature_start_time=datetime(year=2021, month=1, day=1),
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=30),
    batch_compute=new_emr
)
def my_example_feature_view(user_clicks):
  from pyspark.sql import functions as F
  from pyspark.sql.types import LongType

  def my_udf(x):
      import tensorflow as tf
      return int(tf.math.log1p(float(x)).numpy())

  extra_udf = F.udf(my_udf, LongType())

  return user_clicks.select(
      "product_id",
      extra_udf("clicked").alias("log1p_clicked"),
      F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
  )