Using Third Party Dependencies in PySpark Transformations
Overview
You can use third party Python packages in UDFs used by PySpark transformations (mode="pyspark"
) by declaring them in extra_pip_dependencies
within the cluster config object, that is either tecton.EMRClusterConfig
or tecton.DatabricksClusterConfig
.
Limitations
The installed packages will be available to use from Spark UDFs but not from the transformation code itself.
For example, if you set extra_pip_dependencies=["tensorflow"]
, you can use tensorflow in a UDF like this:
@transformation(mode="pyspark")
def test_transformation(transformation_input):
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
# tensorflow can be used within my_tensorflow_udf,
# but not outside of it
def my_tensorflow_udf(x):
import tensorflow as tf
return int(tf.math.log1p(float(x)).numpy())
my_tensorflow_udf = F.udf(my_tensorflow_udf, LongType())
...
Complete example: using third party python package with EMR
In this example we're using an imported function log1p
from Tensorflow in a UDF applied within a PySpark transformation.
from datetime import datetime, timedelta
from tecton import batch_feature_view, EMRClusterConfig, FilteredSource
from shared import entities, data_sources
new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
)
@batch_feature_view(
mode="pyspark",
sources=[FilteredSource(source=user_clicks)],
entities=[entities.product],
online=False,
offline=False,
feature_start_time=datetime(year=2021, month=1, day=1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
batch_compute=new_emr
)
def my_example_feature_view(user_clicks):
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
def my_udf(x):
import tensorflow as tf
return int(tf.math.log1p(float(x)).numpy())
extra_udf = F.udf(my_udf, LongType())
return user_clicks.select(
"product_id",
extra_udf("clicked").alias("log1p_clicked"),
F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
)