Using Third Party Dependencies in PySpark Transformations
Overview
You can use third party Python packages in UDFs used by PySpark transformations (mode="pyspark") by declaring them in extra_pip_dependencies
within the cluster config object, that is either tecton.EMRClusterConfig or tecton.DatabricksClusterConfig.
Limitations
The installed packages will be available to use from Spark UDFs but not from the transformation code itself.
For example, if you set extra_pip_dependencies=["tensorflow"], you can use tensorflow in a UDF like this:
@transformation(mode="pyspark")
def test_transformation(transformation_input):
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
# tensorflow can be used within my_tensorflow_udf,
# but not outside of it
def my_tensorflow_udf(x):
import tensorflow as tf
return int(tf.math.log1p(float(x)).numpy())
my_tensorflow_udf = F.udf(my_tensorflow_udf, LongType())
...
Complete example: using third party python package with EMR
In this example we're using an imported function log1p from Tensorflow in a UDF applied within a PySpark transformation.
from datetime import datetime, timedelta
from tecton import batch_feature_view, EMRClusterConfig, FilteredSource
from shared import entities, data_sources
new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
)
@batch_feature_view(
mode="pyspark",
sources=[FilteredSource(source=user_clicks)],
entities=[entities.product],
online=False,
offline=False,
feature_start_time=datetime(year=2021, month=1, day=1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
batch_compute=new_emr
)
def my_example_feature_view(user_clicks):
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
def my_udf(x):
import tensorflow as tf
return int(tf.math.log1p(float(x)).numpy())
extra_udf = F.udf(my_udf, LongType())
return user_clicks.select(
"product_id",
extra_udf("clicked").alias("log1p_clicked"),
F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
)