Batch Feature View
A Batch Feature View defines transformations against one or more Batch Sources, which are data sources (e.g. S3, Hive, Redshift, Snowflake). Batch Feature Views can be scheduled to materialize new feature data to the Online and Offline Feature Stores on a regular cadence. They can also run automatic backfills when they’re first created.
Note
With Tecton on Snowflake, a Batch Feature View can define a transformation against a Snowflake data source, only.
Common Feature Examples:
- determining if a user's credit score is over a pre-defined threshold
- counting the total number of transactions over a time window in batch
- batch ingesting pre-computed feature values from an existing batch source
Batch Feature Transformations
A Batch Feature View defines a transformation that turns your raw data into feature values. When Tecton executes the transformation, it always runs directly in the data platform you connected to Tecton (e.g. your Snowflake warehouse or your Databricks cluster).
from tecton import batch_feature_view
@batch_feature_view(
sources=[transactions],
entities=[user],
mode='spark_sql'
)
def user_last_transaction_amount(transactions):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
'''
from tecton import batch_feature_view
@batch_feature_view(
sources=[transactions],
entities=[user],
mode='snowflake_sql'
)
def user_last_transaction_amount(transactions):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
'''
Materialization to the Online and Offline Feature Store
Online Materialization
You can easily make batch features available for low-latency online retrieval to feed an online model. Simply set online=True
, a batch_schedule
that determines how frequently Tecton runs your transformation and a feature_start_time
date to backfill feature data from.
from tecton import batch_feature_view
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode='spark_sql',
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10)
)
def user_last_transaction_amount(transactions):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
'''
from tecton import batch_feature_view
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode='snowflake_sql',
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10)
)
def user_last_transaction_amount(transactions):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
'''
Offline Materialization
Tecton also supports offline materialization. This can speed up some expensive queries considerably. When you set offline=True
, Tecton will materialize offline feature data to an offline table according to the batch_schedule
and feature_start_time
.
Note
Offline materialization is not yet available for Tecton on Snowflake.
from tecton import batch_feature_view
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode='spark_sql',
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True
)
def user_last_transaction_amount(transactions):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
'''
If you don’t materialize Batch Feature Views offline, Tecton will always execute your transformation when you use the Tecton SDK to generate offline data. Speaking in SQL terms, a Batch Feature View without offline materialization is simply a “View”. A Batch Feature View with offline materialization is a “Materialized View”.
Offline materialization has additional benefits including:
- Online-offline skew is minimized because the data in the online and offline store is incrementally updated at the same time.
- Offline feature data is saved so it is resilient to any losses of historical data upstream.
Materialization Feature Data Filtering
Every materialization run is expected to produce feature values for a specific time range. This time range is known as the “materialization window”. The materialization window is different for backfills and incremental runs:
- During the initial backfill of feature data to the Online and Offline Feature Store, the materialization time window starts with
feature_start_time
and ends with Tecton’s “current wall clock time” at which the feature’s materialization is enabled. - On incremental runs, the materialization time window starts with the previous run’s
start_time
and ends withstart_time + batch_schedule
.
Tecton only materializes feature values that fall within the materialization time window. It automatically applies a filter to the Feature View transformation as shown with the WHERE
clause below:
--Tecton applies this filter to the user-provided transformation
SELECT * FROM {batch_feature_view_transformation}
WHERE {timestamp_field} >= {start_time}
AND {timestamp_field} < {end_time}
Info
The start time of the window is inclusive and the end time is exclusive. This means that a feature value whose timestamp is exactly equal to the end_time
is not part of the window.
Creating Efficient Transformations using Raw Data Filtering
In many cases, running the transformation against the full set of raw data is unnecessary and inefficient. Some data platforms will push down Tecton’s feature data filter (shown above) to the raw data, only reading what is needed, but this is not always the case.
Warning
We strongly recommend adding raw data filtering to Spark-based Feature Views in order to achieve good performance.
Note
Input data filtering is not yet available for Tecton on Snowflake, however Snowflake's query planner will often optimize the query as noted above.
Filtering Using the Context Object
In your Batch Feature View transformation, you can filter for the raw data needed to produce feature values on each run by leveraging a context
object that Tecton passes into the transformation. context.start_time
and context.end_time
are equal to the expected materialization time window as shown in the diagram below:
The example transformation below filters for the required raw data in the WHERE
clause.
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
online=True,
mode='spark_sql',
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True
)
def user_last_transaction_amount(transactions, context=materialization_context()):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}")
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
'''
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
online=True,
mode='snowflake_sql',
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True
)
def user_last_transaction_amount(transactions, context=materialization_context()):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}")
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
'''
Info
In cases where you read from a time-partitioned data source, like a Glue table or partitioned data on S3, you will also want to filter by partition columns.
Filtering Using the FilteredSource Class
For convenience, Tecton offers a FilteredSource
class that applies timestamp and partition filtering to the data source automatically based on parameters set on the Batch Data Source object: timestamp_field
and datetime_partition_columns
(if using Hive). This replaces the need for the WHERE
clause in the transformation above. Tecton will automatically filter the data source’s data based on its timestamp_field
and, if applicable, its datetime_partition_columns
.
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions)],
entities=[user],
mode='spark_sql',
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True
)
def user_last_transaction_amount(transactions):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
'''
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions)],
entities=[user],
mode='snowflake_sql',
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True
)
def user_last_transaction_amount(transactions):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
'''
By default, FilteredSource
filters for data between context.start_time
and context.end_time
. If needed, a start_time_offset
parameter can be set with a negative offset to increase the amount of raw data that gets read. For example, setting start_time_offset
with a negative offset of 10 days (timedelta(days=-10)
) will filter for data in the range [context.start_time
- 10 days, context.end_time
):
FilteredSource(transactions, start_time_offset=timedelta(days=-10))
Creating Features that use Time-Windowed Aggregations
Using Built-in Time-Windowed Aggregations
Tecton provides built-in implementations of common time-windowed aggregations that simplify transformation logic and ensure correct feature value computation.
Tecton currently supports the following built-in aggregations: count
, sum
, mean
, max
, min
, last-distinct
, var_samp
, variance
- alias for var_samp
, var_pop
, stddev_samp
, stddev
- alias for stddev_samp
, stddev_pop
. If the aggregation you need is not supported, please make a feature request or check out our page on defining your own custom aggregations.
Time-windowed aggregations can be specified in the Batch Feature View decorator using the aggregations
and aggregation_interval
parameters as shown below. Tecton expects the provided SQL query to select the raw events (with timestamps) to be aggregated.
from tecton import batch_feature_view, Aggregation, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode='spark_sql',
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column='AMOUNT', function='mean', time_window=timedelta(days=30), name='average_amount_30d'),
Aggregation(column='AMOUNT', function='mean', time_window=timedelta(days=60), name='average_amount_60d'),
Aggregation(column='AMOUNT', function='mean', time_window=timedelta(days=90), name='average_amount_90d'),
]
)
def user_transaction_features(transactions, context=materialization_context()):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}")
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
'''
from tecton import batch_feature_view, Aggregation, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode='snowflake_sql',
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column='AMOUNT', function='mean', time_window=timedelta(days=30), name='average_amount_30d'),
Aggregation(column='AMOUNT', function='mean', time_window=timedelta(days=60), name='average_amount_60d'),
Aggregation(column='AMOUNT', function='mean', time_window=timedelta(days=90), name='average_amount_90d'),
]
)
def user_transaction_features(transactions, context=materialization_context()):
return f'''
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}")
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
'''
Defining Custom Time-Windowed Aggregations
If you want to use an time-windowed aggregation that is not built-in to Tecton, you can define a custom aggregation. You can use incremental_backfills=True
to simplify this process.
How Time-Windowed Aggregations are Computed
Behind the scenes, Tecton stores partial aggregations in the form of "tiles". The tile size is defined by the aggregation_interval
parameter. At feature request-time, Tecton's online and offline feature serving capabilities automatically roll up the persisted tiles to produce the final feature value over the entire time window. This has several key benefits:
- Significantly reduced storage requirements if you define several time windows, because all time windows will share the same underlying tiles
- Reduced precompute resource requirements, given that Tecton needs to only compute incremental tiles and not the entire time window
Parameters
See the API reference for the full list of parameters.