Defining Transformations Outside of Feature Views
Compared to defining a transformation inside of a Feature View, the main advantages of defining a transformation outside of a Feature View are:
- Reusability
- Transformations can be reused by multiple Feature Views.
- A Feature View can call multiple transformations.
- Discoverability: Transformations can be searched in the Web UI.
To define a transformation outside of a Feature View, define a function using the @transformation
decorator, with mode
set to a supported transformation mode. For example:
@transformation(mode="spark_sql")
def my_transformation(input_data):
return f"""
SELECT
entity_id,
timestamp,
column_a AS feature_one,
column_b AS feature_two
FROM {input_data}
"""
In the definition of the Feature View that will use the transformation, specify mode="pipeline"
, and call the transformation function from the Feature View function.
@batch_feature_view(
mode="pipeline",
...
)
def my_feature_view(input_data):
return my_transformation(input_data)
Guidelines for using @transformation
functions
@transformation
function outputs can only be referenced insidemode=“pipeline”
Feature View definitions.- A
@transformation
function cannot reference another@transformation
function. Workarounds:- Chained invocations must be entirely declared inside a
mode=“pipeline”
Feature View definition; all@transformation
references must be inside the Feature View function. - Convert upstream
@transformation
s to a native Python function to reference them inside a@transformation
.
- Chained invocations must be entirely declared inside a
mode="pipeline"
Feature View definitions cannot invoke any PySpark operations. They can only pass DataFrame objects in and out of upstream@transformations
.
An alternative to using a @transformation
function
As an alternative to implementing a transformation function using the @transformation
decorator, a native Python function can be used. Native Python functions:
- Can be called from inside Feature View definitions (that use any
mode
) - Can be called from
@transformation
functions. - Are not discoverable in the Web GUI.
Examples of transformations defined outside of Feature Views
Note
The following code is referenced by the examples that follow it.
ds_func
is a data source function. The function generates a DataFrame
, which is used by the transformations in the examples.
@spark_batch_config()
def ds_func(spark):
import pandas as pd
content_ids = ["c1", "c2"]
other_vals = [1,2]
ts = [pd.Timestamp('2022-01-01T12'), pd.Timestamp('2022-01-02T12')]
return spark.createDataFrame(pd.DataFrame({
"content_id": content_ids,
"other_vals": other_vals,
"event_timestamp": ts
}))
test_source = BatchSource(
name="test_source",
batch_config=ds_func
)
from tecton import Entity
content = Entity(
name="Content",
join_keys=["content_id"]
)
A Feature View that calls a pyspark
@transformation
@transformation(mode="pyspark")
def transform1(input_df):
return df2.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return transform1(input_df)
A Feature View that calls two pyspark
@transformation
s with chaining
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return transform1(add_impression(input_df))
A Feature View that calls a pyspark
@transformation
, which calls another pyspark
@transformation
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
input_df = add_impression(input_df)
# Invoking .select() will raise an error
# since @transformations cannot invoke methods on the output of other @transformations.**
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return transform1(input_df)
A Feature View that calls a pyspark
@transformation
, which calls a native Python function
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return transform1(input_df)
A pyspark
Feature View that calls two native Python functions
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pyspark",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
intermediate_df = add_impression(input_df)
return transform1(intermediate_df)
A pyspark
Feature View that calls a native Python function and invokes the pyspark method withColumn
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pyspark",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
from pyspark.sql import functions as F
intermediate_df = input_df.withColumn("impression", F.lit(1))
return transform1(intermediate_df)
A Feature View that calls a pyspark
@transformation
, passing two pyspark
@transformation
outputs
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")
@transformation(mode="pyspark")
def pick1(df1, df2):
return df1
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return pick1(transform1(input_df), add_impression(input_df))
A pipeline
Feature View that calls a spark_sql
@transformation
, passing two pyspark
@transformation
outputs
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")
@transformation(mode="spark_sql")
def pick1(df1, df2):
return f"SELECT * FROM {df1}"
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return pick1(transform1(input_df), add_impression(input_df))
A str_split
implementation that uses two @transformation
s
In this example, we implement a generic str_split
transformation on a specified column, followed by another transformation to calculate some summary statistics for the feature.
Note that passing constants to a transformations requires using const
which can be imported from tecton
.
from tecton import transformation, batch_feature_view, const, FilteredSource
from entities import auction
from data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime
@transformation(mode="spark_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
return f"""
SELECT
*,
split({column_to_split}, {delimiter}) AS {new_column_name}
FROM {input_data}
"""
@transformation(mode="spark_sql")
def keyword_stats(input_data, keyword_column):
return f"""
SELECT
auction_id,
timestamp,
{keyword_column} AS keyword_list,
size({keyword_column}) AS num_keywords,
array_contains({keyword_column}, "bitcoin") AS keyword_contains_bitcoin
FROM {input_data}
"""
@batch_feature_view(
mode='pipeline',
sources=[FilteredSource(ad_impressions_batch)],
entities=[auction],
batch_schedule=timedelta(days=1),
online=True,
offline=True,
feature_start_time=datetime(2020, 5, 1),
ttl=timedelta(days=365),
)
def auction_keywords(ad_impressions):
split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("\' \'"))
return keyword_stats(split_keywords, const("keywords"))
FAQ: Why can’t I directly invoke PySpark methods on the output from a @transformation
?
Transformations are intended to be interchangeable within a supported compute environment. For Spark, this means mode="pyspark"
and 'mode="spark_sql"
transformations can be mixed. For example, this is a completely valid pipeline:
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("a", "b", "c")
@transformation(mode="spark_sql")
def select_all(df):
return f"SELECT * FROM {df}"
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return select_all(transform1(input_df))