Defining Transformations Outside of Feature Views
Compared to defining a transformation inside of a Feature View, the main advantages of defining a transformation outside of a Feature View are:
- Reusability
- Transformations can be reused by multiple Feature Views.
- A Feature View can call multiple transformations.
- Discoverability: Transformations can be searched in the Web UI.
To define a transformation outside of a Feature View, define a function using the @transformation
decorator, with mode
set to a supported transformation mode. For example:
@transformation(mode="spark_sql")
def my_transformation(input_data):
return f"""
SELECT
entity_id,
timestamp,
column_a AS feature_one,
column_b AS feature_two
FROM {input_data}
"""
In the definition of the Feature View that will use the transformation, specify mode="pipeline"
, and call the transformation function from the Feature View function.
@batch_feature_view(
mode="pipeline",
...
)
def my_feature_view(input_data):
return my_transformation(input_data)
Guidelines for using @transformation
functions
@transformation
function outputs can only be referenced insidemode=“pipeline”
Feature View definitions.- A
@transformation
function cannot reference another@transformation
function. Workarounds:- Chained invocations must be entirely declared inside a
mode=“pipeline”
Feature View definition; all@transformation
references must be inside the Feature View function. - Convert upstream
@transformation
s to a native Python function to reference them inside a@transformation
.
- Chained invocations must be entirely declared inside a
mode="pipeline"
Feature View definitions cannot invoke any PySpark operations. They can only pass DataFrame objects in and out of upstream@transformations
.
An alternative to using a @transformation
function
As an alternative to implementing a transformation function using the @transformation
decorator, a native Python function can be used. Native Python functions:
- Can be called from inside Feature View definitions (that use any
mode
) - Can be called from
@transformation
functions. - Are not discoverable in the Web GUI.
Examples of transformations defined outside of Feature Views
Note
The following code is referenced by the examples that follow it. The Hive table catalog.content
has the following schema and data:
content_id | other_vals | event_timestamp |
---|---|---|
c1 | 1 | 2022-01-01T12 |
c2 | 2 | 2022-01-02T12 |
test_source = BatchSource(
name="test_source",
batch_config=HiveConfig(
table="content",
database="catalog",
timestamp_field="event_timestamp")
)
from tecton import Entity
content = Entity(
name="Content",
join_keys=["content_id"]
)
A Feature View that calls a pyspark
@transformation
@transformation(mode="pyspark")
def transform1(input_df):
return df2.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return transform1(input_df)
A Feature View that calls two pyspark
@transformation
s with chaining
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return transform1(add_impression(input_df))
A Feature View that calls a pyspark
@transformation
, which calls another pyspark
@transformation
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
input_df = add_impression(input_df)
# Invoking .select() will raise an error
# since @transformations cannot invoke methods on the output of other @transformations.**
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return transform1(input_df)
A Feature View that calls a pyspark
@transformation
, which calls a native Python function
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return transform1(input_df)
A pyspark
Feature View that calls two native Python functions
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pyspark",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
intermediate_df = add_impression(input_df)
return transform1(intermediate_df)
A pyspark
Feature View that calls a native Python function and invokes the pyspark method withColumn
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pyspark",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
from pyspark.sql import functions as F
intermediate_df = input_df.withColumn("impression", F.lit(1))
return transform1(intermediate_df)
A Feature View that calls a pyspark
@transformation
, passing two pyspark
@transformation
outputs
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")
@transformation(mode="pyspark")
def pick1(df1, df2):
return df1
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return pick1(transform1(input_df), add_impression(input_df))
A pipeline
Feature View that calls a spark_sql
@transformation
, passing two pyspark
@transformation
outputs
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")
@transformation(mode="spark_sql")
def pick1(df1, df2):
return f"SELECT * FROM {df1}"
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return pick1(transform1(input_df), add_impression(input_df))
A str_split
implementation that uses two @transformation
s
In this example, we implement a generic str_split
transformation on a specified column, followed by another transformation to calculate some summary statistics for the feature.
Note that passing constants to a transformations requires using const
which can be imported from tecton
.
from tecton import transformation, batch_feature_view, const, FilteredSource
from entities import auction
from data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime
@transformation(mode="spark_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
return f"""
SELECT
*,
split({column_to_split}, {delimiter}) AS {new_column_name}
FROM {input_data}
"""
@transformation(mode="spark_sql")
def keyword_stats(input_data, keyword_column):
return f"""
SELECT
auction_id,
timestamp,
{keyword_column} AS keyword_list,
size({keyword_column}) AS num_keywords,
array_contains({keyword_column}, "bitcoin") AS keyword_contains_bitcoin
FROM {input_data}
"""
@batch_feature_view(
mode='pipeline',
sources=[FilteredSource(ad_impressions_batch)],
entities=[auction],
batch_schedule=timedelta(days=1),
online=True,
offline=True,
feature_start_time=datetime(2020, 5, 1),
ttl=timedelta(days=365),
)
def auction_keywords(ad_impressions):
split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("\' \'"))
return keyword_stats(split_keywords, const("keywords"))
FAQ: Why can’t I directly invoke PySpark methods on the output from a @transformation
?
Transformations are intended to be interchangeable within a supported compute environment. For Spark, this means mode="pyspark"
and 'mode="spark_sql"
transformations can be mixed. For example, this is a completely valid pipeline:
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("a", "b", "c")
@transformation(mode="spark_sql")
def select_all(df):
return f"SELECT * FROM {df}"
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(
column='other_vals',
function='sum',
time_window=timedelta(days=7)
),
],
)
def fv(input_df):
return select_all(transform1(input_df))