Using Third Party Dependencies in Spark UDFs
Overview
You can use third party Python packages in UDFs used by Transformations by declaring them in extra_pip_dependencies
within the cluster config object, that is either tecton.EMRClusterConfig
or tecton.DatabricksClusterConfig
.
Limitations
The installed packages will be available to use from Spark UDFs but not from the transformation code itself.
For example, if you set extra_pip_dependencies=["tensorflow"]
, you can use tensorflow in a UDF like this:
@transformation(mode="pyspark")
def test_transformation(transformation_input):
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
# tensorflow can be used within my_tensorflow_udf,
# but not outside of it
def my_tensorflow_udf(x):
import tensorflow as tf
return int(tf.math.log1p(float(x)).numpy())
my_tensorflow_udf = F.udf(my_tensorflow_udf, LongType())
...
Complete example: using third party python package with EMR
In this example we're using an imported function log1p
from Tensorflow in a UDF applied within a PySpark transformation.
from datetime import datetime, batch_feature_view
from tecton import MaterializationConfig, EMRClusterConfig, BackfillConfig
from shared import entities, data_sources
new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
)
@batch_feature_view(
mode="pyspark"
Inputs = {"user_clicks" : Input(data_sources.user_clicks)}
entities=[entities.product],
online=False,
offline=False,
feature_start_time=datetime(year=2021, month=1, day=1),
batch_cluster_config=new_emr,
stream_cluster_config=new_emr,
backfill_config=BackfillConfig("multiple_batch_schedule_intervals_per_job"),
)
def my_example_feature_view(user_clicks):
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
def my_udf(x):
import tensorflow as tf
return int(tf.math.log1p(float(x)).numpy())
extra_udf = F.udf(my_udf, LongType())
return transformation_input.select(
"product_id",
extra_udf("clicked").alias("log1p_clicked"),
F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
)