Skip to content

Stream Feature View

A Stream Feature View defines transformations against a Stream Data Source and can compute features in near-real-time. It processes raw data from a streaming source (e.g. Kafka or Kinesis) and can be backfilled from a batch source that contains a historical log of stream events.

Info

Currently, Stream Feature Views can only be used with Tecton on Spark (Databricks or EMR).

Stream Feature Transformations

Stream Feature Views can run row-level Spark SQL or PySpark transformations and can apply optional time-windowed aggregations (see section below). Tecton executes these transformations as Spark jobs on your connected data platform (Databricks or EMR).

from tecton import stream_feature_view

@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode='spark_sql'
)
def user_last_transaction_amount(transactions_stream):
    return f'''
        SELECT
            USER_ID,
            AMOUNT,
            TIMESTAMP
        FROM
            {transactions_stream}
        '''

Materialization to the Online and Offline Feature Store

Online Materialization

When online=True is set, Tecton will run the Stream Feature View transformation on each event that comes in from the underlying stream source and write it to the online store. Any previous values will be overwritten, so the online store only has the most recent value. Feature data will be backfilled from the Stream Data Source's log of historical events (configured via its batch_config). The feature_start_time specifies how far back to backfill features.

from tecton import stream_feature_view
from datetime import datetime, timedelta

@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode='spark_sql',
    online=True,
    feature_start_time=datetime(2020, 10, 10)
)
def user_last_transaction_amount(transactions_stream):
    return f'''
        SELECT
            USER_ID,
            AMOUNT,
            TIMESTAMP
        FROM
            {transactions_stream}
        '''

Offline Materialization

Feature data can also be materialized to the Offline Feature Store in order to speed up offline queries (for testing and training data generation). When offline=True is set, Tecton will run the same Stream Feature View transformation pipeline against the batch source (the historical log of stream events) that backs the stream source. The batch_schedule parameter determines how often Tecton will run offline materialization jobs.

from tecton import stream_feature_view
from datetime import datetime, timedelta

@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode='spark_sql',
    online=True,
    offline=True,
    feature_start_time=datetime(2020, 10, 10),
    batch_schedule=timedelta(days=1)
)
def user_last_transaction_amount(transactions_stream):
    return f'''
        SELECT
            USER_ID,
            AMOUNT,
            TIMESTAMP
        FROM
            {transactions_stream}
        '''

Time-Windowed Aggregations

Tecton provides built-in implementations of common time-windowed aggregations that can be applied to Stream Feature Views. These time-windowed aggregations are optimized for performance and efficiency and are applied consistently online and offline.

Tip

For a technical deep dive, check out our two-part blog post on Real-Time Aggregation Features for Machine Learning.

Tecton currently supports the following built-in aggregations: countsummeanmaxminlast-distinct,  var_samp,  variance - alias for  var_samp,  var_pop,  stddev_samp,  stddev - alias for  stddev_samp,  stddev_pop. Time-windowed aggregations can be specified in the Stream Feature View decorator using the aggregations parameter. Tecton expects the Stream Feature View transformation to select the raw events (with timestamps) to be aggregated.

Aggregations can be updated either on a sliding time window or with continuous processing for the fresher feature values.

Sliding Time-Windows

Sliding time-windows are configured with the aggregation_interval parameter. Tecton will update the feature value in the online store after the aggregation interval has elapsed, assuming there was at least one event for that key.

from tecton import stream_feature_view, Aggregation
from datetime import datetime, timedelta

@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode='spark_sql',
    online=True,
    offline=True,
    feature_start_time=datetime(2020, 10, 10),
    batch_schedule=timedelta(days=1),
    aggregation_interval=timedelta(minutes=10),
    aggregations=[
        Aggregation(column='AMOUNT',function='mean', time_window=timedelta(hours=1), name='average_amount_1h'),
        Aggregation(column='AMOUNT',function='mean', time_window=timedelta(hours=12), name='average_amount_12h'),
        Aggregation(column='AMOUNT',function='mean', time_window=timedelta(hours=24), name='average_amount_24h'),
    ]
)
def user_transaction_amount_averages(transactions_stream):
    return f'''
        SELECT
            USER_ID,
            AMOUNT,
            TIMESTAMP
        FROM
            {transactions_stream}
        '''

When using a sliding time-window, Tecton stores partial aggregations in the form of tiles. The tile size is defined by the aggregation_interval parameter. At feature request-time, Tecton's online and offline feature serving capabilities automatically roll up the persisted tiles. This design has several key benefits:

  • Significantly reduced storage requirements if you define several time windows.
  • Reduced stream job memory requirements.
  • Streaming features with a time window that exceeds the streaming source's retention period can be backfilled from the batch source (the historical log of stream events) that backs the stream source. Tecton will backfill historical tiles from the batch source and combine tiles that were written by the streaming source transparently at request time.

Continuous Processing for Low-Latency Ingestion

Continuous processing for Stream Feature Views can update feature values in less than a second after the event is available in the stream data source, rather than waiting for the slide interval to complete. To enable continuous mode, set aggregation_mode=AggregationMode.CONTINUOUS as shown in the example below:

from tecton import stream_feature_view, Aggregation, AggregationMode
from datetime import datetime, timedelta

@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode='spark_sql',
    online=True,
    offline=True,
    feature_start_time=datetime(2020, 10, 10),
    batch_schedule=timedelta(days=1),
    aggregation_mode=AggregationMode.CONTINUOUS,
    aggregations=[
        Aggregation(column='AMOUNT',function='mean', time_window=timedelta(hours=1), name='average_amount_1h'),
        Aggregation(column='AMOUNT',function='mean', time_window=timedelta(hours=12), name='average_amount_12h'),
        Aggregation(column='AMOUNT',function='mean', time_window=timedelta(hours=24), name='average_amount_24h'),
    ]
)
def user_transaction_amount_averages(transactions_stream):
    return f'''
        SELECT
            USER_ID,
            AMOUNT,
            TIMESTAMP
        FROM
            {transactions_stream}
        '''

When using continuous processing, Tecton will store all transformed events in the online store and run the full aggregations at the time of request.

When to Use Continuous Processing

You should use continuous processing if your model performance depends on extremely fresh features. For example, it may be important for a fraud detection use-case for features to include previous transactions made even a few seconds prior.

You should not use continuous processing if your model can tolerate features updating every few minutes, and you are trying to optimize costs. Continuous processing may lead to higher infrastructure costs due to more frequent feature writes and checkpointing updates. Using a longer aggregation interval can have lower costs, especially if a single key may have multiple events in a short period of time that can be grouped into a single interval.

Checkpointing costs with continuous processing on EMR

Continuous mode can cause significant S3 costs for customers using Tecton with EMR due to the frequency of writing Spark Streaming checkpoints to S3. The Databricks implementation of Spark Streaming has much lower checkpointing costs.

Productionizing a Stream

For a Stream Feature View used in production where late data loss is unacceptable, it is recommended to set the Stream Data Source watermark_delay_threshold to your stream retention period, or at least 24 hours. This will configure Spark Structured Streaming to not drop data in the event that it processes the events late or out-of-order. The tradeoff of a longer watermark delay is greater amount of in-memory state used by the streaming job.

Parameters

See the API reference for the full list of parameters.