Skip to content

Defining Transformations Outside of Feature Views

Compared to defining a transformation inside of a Feature View, the main advantages of defining a transformation outside of a Feature View are:

  • Reusability
    • Transformations can be reused by multiple Feature Views.
    • A Feature View can call multiple transformations.
  • Discoverability: Transformations can be searched in the Web UI.

To define a transformation outside of a Feature View, define a function using the @transformation decorator, with mode set to a supported transformation mode. For example:

@transformation(mode="spark_sql")
def my_transformation(input_data):
    return f"""
        SELECT
          entity_id,
          timestamp,
          column_a AS feature_one,
          column_b AS feature_two
        FROM {input_data}
    """

In the definition of the Feature View that will use the transformation, specify mode="pipeline", and call the transformation function from the Feature View function.

@batch_feature_view(
    mode="pipeline",
    ...
)
def my_feature_view(input_data):
    return my_transformation(input_data)

Guidelines for using @transformation functions

  • @transformation function outputs can only be referenced inside mode=“pipeline” Feature View definitions.
  • A @transformation function cannot reference another @transformation function. Workarounds:
    • Chained invocations must be entirely declared inside a mode=“pipeline” Feature View definition; all @transformation references must be inside the Feature View function.
    • Convert upstream @transformations to a native Python function to reference them inside a @transformation.
  • mode="pipeline" Feature View definitions cannot invoke any PySpark operations. They can only pass DataFrame objects in and out of upstream @transformations.

An alternative to using a @transformation function

As an alternative to implementing a transformation function using the @transformation decorator, a native Python function can be used. Native Python functions:

  • Can be called from inside Feature View definitions (that use any mode)
  • Can be called from @transformation functions.
  • Are not discoverable in the Web GUI.

Examples of transformations defined outside of Feature Views

Note

The following code is referenced by the examples that follow it. The Hive table catalog.content has the following schema and data:

content_id other_vals event_timestamp
c1 1 2022-01-01T12
c2 2 2022-01-02T12
test_source = BatchSource(
    name="test_source",
    batch_config=HiveConfig(
        table="content",
        database="catalog",
        timestamp_field="event_timestamp")
)

from tecton import Entity

content = Entity(
    name="Content",
    join_keys=["content_id"]
)

A Feature View that calls a pyspark @transformation

@transformation(mode="pyspark")
def transform1(input_df):
    return df2.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return transform1(input_df)

A Feature View that calls two pyspark @transformations with chaining

@transformation(mode="pyspark")
def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

@transformation(mode="pyspark")
def transform1(input_df):
    return input_df.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return transform1(add_impression(input_df))

A Feature View that calls a pyspark @transformation, which calls another pyspark @transformation

@transformation(mode="pyspark")
def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

@transformation(mode="pyspark")
def transform1(input_df):
    input_df = add_impression(input_df)
    # Invoking .select() will raise an error
    # since @transformations cannot invoke methods on the output of other @transformations.**
    return input_df.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return transform1(input_df)

A Feature View that calls a pyspark @transformation, which calls a native Python function

def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

@transformation(mode="pyspark")
def transform1(input_df):
    input_df = add_impression(input_df)
    return input_df.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return transform1(input_df)

A pyspark Feature View that calls two native Python functions

def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

def transform1(input_df):
    input_df = add_impression(input_df)
    return input_df.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pyspark",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)

def fv(input_df):
    intermediate_df = add_impression(input_df)
    return transform1(intermediate_df)

A pyspark Feature View that calls a native Python function and invokes the pyspark method withColumn

def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

def transform1(input_df):
    input_df = add_impression(input_df)
    return input_df.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pyspark",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    from pyspark.sql import functions as F
    intermediate_df = input_df.withColumn("impression", F.lit(1))
    return transform1(intermediate_df)

A Feature View that calls a pyspark @transformation, passing two pyspark @transformation outputs

@transformation(mode="pyspark")
def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

@transformation(mode="pyspark")
def transform1(input_df):
    return input_df.select("content_id", "event_timestamp", "other_vals")

@transformation(mode="pyspark")
def pick1(df1, df2):
    return df1

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return pick1(transform1(input_df), add_impression(input_df))

A pipeline Feature View that calls a spark_sql @transformation, passing two pyspark @transformation outputs

@transformation(mode="pyspark")
def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

@transformation(mode="pyspark")
def transform1(input_df):
    return input_df.select("content_id", "event_timestamp", "other_vals")

@transformation(mode="spark_sql")
def pick1(df1, df2):
    return f"SELECT * FROM {df1}"

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return pick1(transform1(input_df), add_impression(input_df))

A str_split implementation that uses two @transformations

In this example, we implement a generic str_split transformation on a specified column, followed by another transformation to calculate some summary statistics for the feature.

Note that passing constants to a transformations requires using const which can be imported from tecton.

from tecton import transformation, batch_feature_view, const, FilteredSource
from entities import auction
from data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime

@transformation(mode="spark_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
    return f"""
        SELECT
            *,
            split({column_to_split}, {delimiter}) AS {new_column_name}
        FROM {input_data}
        """

@transformation(mode="spark_sql")
def keyword_stats(input_data, keyword_column):
    return f"""
        SELECT
            auction_id,
            timestamp,
            {keyword_column} AS keyword_list,
            size({keyword_column}) AS num_keywords,
            array_contains({keyword_column}, "bitcoin") AS keyword_contains_bitcoin
        FROM {input_data}
        """

@batch_feature_view(
    mode='pipeline',
    sources=[FilteredSource(ad_impressions_batch)],
    entities=[auction],
    batch_schedule=timedelta(days=1),
    online=True,
    offline=True,
    feature_start_time=datetime(2020, 5, 1),
    ttl=timedelta(days=365),
)
def auction_keywords(ad_impressions):
    split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("\' \'"))
    return keyword_stats(split_keywords, const("keywords"))

FAQ: Why can’t I directly invoke PySpark methods on the output from a @transformation?

Transformations are intended to be interchangeable within a supported compute environment. For Spark, this means mode="pyspark" and 'mode="spark_sql" transformations can be mixed. For example, this is a completely valid pipeline:

@transformation(mode="pyspark")
def transform1(input_df):
    return input_df.select("a", "b", "c")

@transformation(mode="spark_sql")
def select_all(df):
    return f"SELECT * FROM {df}"

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
      return select_all(transform1(input_df))