Simplifying Custom Batch Aggregations with Incremental Backfills
When Tecton’s built-in aggregations are not an option, defining your own aggregation features can be simplified by configuring backfills to execute incrementally for each batch_schedule
using incremental_backfills=True
.
In this case, when Tecton backfills your feature it will execute your query for every batch_schedule
interval between the specified feature_start_time
and the feature registration time, as opposed to executing the provided query once against the entire set of raw data. This allows each job to be responsible for producing a single time-window aggregation.
The context
object can be used to filter for the right time window of raw data needed for each transformation run. That data can then be aggregated to produce the feature values for that time period.
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode='spark_sql',
online=True,
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
ttl=timedelta(days=1),
incremental_backfills=True
)
def user_distinct_merchant_transaction_count_7d(transactions, context=materialization_context()):
return f'''
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}") - INTERVAL 6 DAYS
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
GROUP by USER_ID
'''
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode='snowflake_sql',
online=True,
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
ttl=timedelta(days=1),
incremental_backfills=True
)
def user_distinct_merchant_transaction_count_7d(transactions, context=materialization_context()):
return f'''
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}") - INTERVAL 6 DAYS
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
GROUP by USER_ID
'''
Note
The timestamps of the features are also set to the end of the aggregation window: TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND
(context.end_time
is not inclusive).
For this example, you could also use a FilteredSource
with a start_time_offset
to replace the WHERE
clause. This is often helpful in cases with complicated partition filtering.
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions, start_time_offset=timedelta(days=-6))],
entities=[user],
mode='spark_sql',
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
ttl=timedelta(days=1),
incremental_backfills=True
)
def user_distinct_merchant_transaction_count_7d(transactions):
return f'''
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
FROM
{transactions}
GROUP by USER_ID
'''
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions, start_time_offset=timedelta(days=-6))],
entities=[user],
mode='snowflake_sql',
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
ttl=timedelta(days=1),
incremental_backfills=True
)
def user_distinct_merchant_transaction_count_7d(transactions):
return f'''
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
FROM
{transactions}
GROUP by USER_ID
'''
Warning
While incremental backfills will simplify your transformation logic, it will also result in many more backfill jobs which can become inefficient at scale. We recommend checking the output of your tecton plan
to see how many materialization jobs will be created when applied.
Because of the need for many jobs, Batch Feature Views with incremental backfills must materialize feature data to the Offline Feature Store in order to be fetched offline.