Skip to content

Defining Transformations Outside of Feature Views

Compared to defining a transformation inside of a Feature View, the main advantages of defining a transformation outside of a Feature View are:

  • Reusability
    • Transformations can be reused by multiple Feature Views.
    • A Feature View can call multiple transformations.
  • Discoverability: Transformations can be searched in the Web UI.

To define a transformation outside of a Feature View, define a function using the @transformation decorator, with mode set to a supported transformation mode. For example:

@transformation(mode="spark_sql")
def my_transformation(input_data):
    return f"""
        SELECT
          entity_id,
          timestamp,
          column_a AS feature_one,
          column_b AS feature_two
        FROM {input_data}
    """

In the definition of the Feature View that will use the transformation, specify mode="pipeline", and call the transformation function from the Feature View function.

@batch_feature_view(
    mode="pipeline",
    ...
)
def my_feature_view(input_data):
    return my_transformation(input_data)

Guidelines for using @transformation functions

  • @transformation function outputs can only be referenced inside mode=“pipeline” Feature View definitions.
  • A @transformation function cannot reference another @transformation function. Workarounds:
    • Chained invocations must be entirely declared inside a mode=“pipeline” Feature View definition; all @transformation references must be inside the Feature View function.
    • Convert upstream @transformations to a native Python function to reference them inside a @transformation.
  • mode="pipeline" Feature View definitions cannot invoke any PySpark operations. They can only pass DataFrame objects in and out of upstream @transformations.

An alternative to using a @transformation function

As an alternative to implementing a transformation function using the @transformation decorator, a native Python function can be used. Native Python functions:

  • Can be called from inside Feature View definitions (that use any mode)
  • Can be called from @transformation functions.
  • Are not discoverable in the Web GUI.

Examples of transformations defined outside of Feature Views

Note

The following code is referenced by the examples that follow it.

ds_func is a data source function. The function generates a DataFrame, which is used by the transformations in the examples.

@spark_batch_config()
def ds_func(spark):

    import pandas as pd

    content_ids = ["c1", "c2"]
    other_vals = [1,2]
    ts = [pd.Timestamp('2022-01-01T12'), pd.Timestamp('2022-01-02T12')]
    return spark.createDataFrame(pd.DataFrame({
        "content_id": content_ids,
        "other_vals": other_vals,
        "event_timestamp": ts
    }))

test_source = BatchSource(
        name="test_source",
        batch_config=ds_func
)

from tecton import Entity

content = Entity(
    name="Content",
    join_keys=["content_id"]
)

A Feature View that calls a pyspark @transformation

@transformation(mode="pyspark")
def transform1(input_df):
    return df2.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return transform1(input_df)

A Feature View that calls two pyspark @transformations with chaining

@transformation(mode="pyspark")
def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

@transformation(mode="pyspark")
def transform1(input_df):
    return input_df.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return transform1(add_impression(input_df))

A Feature View that calls a pyspark @transformation, which calls another pyspark @transformation

@transformation(mode="pyspark")
def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

@transformation(mode="pyspark")
def transform1(input_df):
    input_df = add_impression(input_df)
    # Invoking .select() will raise an error
    # since @transformations cannot invoke methods on the output of other @transformations.**
    return input_df.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return transform1(input_df)

A Feature View that calls a pyspark @transformation, which calls a native Python function

def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

@transformation(mode="pyspark")
def transform1(input_df):
    input_df = add_impression(input_df)
    return input_df.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return transform1(input_df)

A pyspark Feature View that calls two native Python functions

def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

def transform1(input_df):
    input_df = add_impression(input_df)
    return input_df.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pyspark",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)

def fv(input_df):
    intermediate_df = add_impression(input_df)
    return transform1(intermediate_df)

A pyspark Feature View that calls a native Python function and invokes the pyspark method withColumn

def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

def transform1(input_df):
    input_df = add_impression(input_df)
    return input_df.select("content_id", "event_timestamp", "other_vals")

@batch_feature_view(
    mode="pyspark",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    from pyspark.sql import functions as F
    intermediate_df = input_df.withColumn("impression", F.lit(1))
    return transform1(intermediate_df)

A Feature View that calls a pyspark @transformation, passing two pyspark @transformation outputs

@transformation(mode="pyspark")
def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

@transformation(mode="pyspark")
def transform1(input_df):
    return input_df.select("content_id", "event_timestamp", "other_vals")

@transformation(mode="pyspark")
def pick1(df1, df2):
    return df1

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return pick1(transform1(input_df), add_impression(input_df))

A pipeline Feature View that calls a spark_sql @transformation, passing two pyspark @transformation outputs

@transformation(mode="pyspark")
def add_impression(input_df):
    from pyspark.sql import functions as F
    return input_df.withColumn("impression", F.lit(1))

@transformation(mode="pyspark")
def transform1(input_df):
    return input_df.select("content_id", "event_timestamp", "other_vals")

@transformation(mode="spark_sql")
def pick1(df1, df2):
    return f"SELECT * FROM {df1}"

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
    return pick1(transform1(input_df), add_impression(input_df))

A str_split implementation that uses two @transformations

In this example, we implement a generic str_split transformation on a specified column, followed by another transformation to calculate some summary statistics for the feature.

Note that passing constants to a transformations requires using const which can be imported from tecton.

from tecton import transformation, batch_feature_view, const, FilteredSource
from entities import auction
from data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime

@transformation(mode="spark_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
    return f"""
        SELECT
            *,
            split({column_to_split}, {delimiter}) AS {new_column_name}
        FROM {input_data}
        """

@transformation(mode="spark_sql")
def keyword_stats(input_data, keyword_column):
    return f"""
        SELECT
            auction_id,
            timestamp,
            {keyword_column} AS keyword_list,
            size({keyword_column}) AS num_keywords,
            array_contains({keyword_column}, "bitcoin") AS keyword_contains_bitcoin
        FROM {input_data}
        """

@batch_feature_view(
    mode='pipeline',
    sources=[FilteredSource(ad_impressions_batch)],
    entities=[auction],
    batch_schedule=timedelta(days=1),
    online=True,
    offline=True,
    feature_start_time=datetime(2020, 5, 1),
    ttl=timedelta(days=365),
)
def auction_keywords(ad_impressions):
    split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("\' \'"))
    return keyword_stats(split_keywords, const("keywords"))

FAQ: Why can’t I directly invoke PySpark methods on the output from a @transformation?

Transformations are intended to be interchangeable within a supported compute environment. For Spark, this means mode="pyspark" and 'mode="spark_sql" transformations can be mixed. For example, this is a completely valid pipeline:

@transformation(mode="pyspark")
def transform1(input_df):
    return input_df.select("a", "b", "c")

@transformation(mode="spark_sql")
def select_all(df):
    return f"SELECT * FROM {df}"

@batch_feature_view(
    mode="pipeline",
    sources=[test_source],
    entities=[content],
    feature_start_time=datetime(2022, 2, 1),
    timestamp_field="event_timestamp",
    aggregation_interval=timedelta(days=1),
    aggregations=[
        Aggregation(
            column='other_vals',
            function='sum',
            time_window=timedelta(days=7)
        ),
    ],
)
def fv(input_df):
      return select_all(transform1(input_df))