Skip to content

Simplifying Custom Batch Aggregations with Incremental Backfills

When Tecton’s built-in aggregations are not an option, defining your own aggregation features can be simplified by configuring backfills to execute incrementally for each batch_schedule using incremental_backfills=True.

In this case, when Tecton backfills your feature it will execute your query for every batch_schedule interval between the specified feature_start_time and the feature registration time, as opposed to executing the provided query once against the entire set of raw data. This allows each job to be responsible for producing a single time-window aggregation.

Materialization Incremental Backfills

The context object can be used to filter for the right time window of raw data needed for each transformation run. That data can then be aggregated to produce the feature values for that time period.

from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta

@batch_feature_view(
    sources=[transactions],
    entities=[user],
    mode='spark_sql',
    online=True,
    offline=True,
    batch_schedule=timedelta(days=1),
    feature_start_time=datetime(2020, 10, 10),
    ttl=timedelta(days=1),
    incremental_backfills=True
)
def user_distinct_merchant_transaction_count_7d(transactions, context=materialization_context()):
    return f'''
        SELECT
            USER_ID,
            TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
            COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
        FROM
            {transactions}
        WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}") - INTERVAL 6 DAYS
            AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
        GROUP by USER_ID
        '''
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta

@batch_feature_view(
    sources=[transactions],
    entities=[user],
    mode='snowflake_sql',
    online=True,
    offline=True,
    batch_schedule=timedelta(days=1),
    feature_start_time=datetime(2020, 10, 10),
    ttl=timedelta(days=1),
    incremental_backfills=True
)
def user_distinct_merchant_transaction_count_7d(transactions, context=materialization_context()):
    return f'''
        SELECT
            USER_ID,
            TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
            COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
        FROM
            {transactions}
        WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}") - INTERVAL 6 DAYS
            AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
        GROUP by USER_ID
        '''

Note

The timestamps of the features are also set to the end of the aggregation window: TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND (context.end_time is not inclusive).

For this example, you could also use a FilteredSource with a start_time_offset to replace the WHERE clause. This is often helpful in cases with complicated partition filtering.

from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta

@batch_feature_view(
    sources=[FilteredSource(transactions, start_time_offset=timedelta(days=-6))],
    entities=[user],
    mode='spark_sql',
    online=True,
    batch_schedule=timedelta(days=1),
    feature_start_time=datetime(2020, 10, 10),
    offline=True,
    ttl=timedelta(days=1),
    incremental_backfills=True
)
def user_distinct_merchant_transaction_count_7d(transactions):
    return f'''
        SELECT
            USER_ID,
            TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
            COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
        FROM
            {transactions}
        GROUP by USER_ID
        '''
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta

@batch_feature_view(
    sources=[FilteredSource(transactions, start_time_offset=timedelta(days=-6))],
    entities=[user],
    mode='snowflake_sql',
    online=True,
    batch_schedule=timedelta(days=1),
    feature_start_time=datetime(2020, 10, 10),
    offline=True,
    ttl=timedelta(days=1),
    incremental_backfills=True
)
def user_distinct_merchant_transaction_count_7d(transactions):
    return f'''
        SELECT
            USER_ID,
            TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
            COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
        FROM
            {transactions}
        GROUP by USER_ID
        '''

Warning

While incremental backfills will simplify your transformation logic, it will also result in many more backfill jobs which can become inefficient at scale. We recommend checking the output of your tecton plan to see how many materialization jobs will be created when applied.

Because of the need for many jobs, Batch Feature Views with incremental backfills must materialize feature data to the Offline Feature Store in order to be fetched offline.