Batch Feature View aggregations with tecton_sliding_window¶
Note: tecton_sliding_window
is deprecated in later versions of Tecton¶
We recommend upgrading to the latest version of Tecton and using the alternate functionality for windowed aggregations.
Introduction¶
The built-in tecton_sliding_window
transformation enables running backfills for time-windowed aggregations in a single job. This notebook shows you how tecton_sliding_window
works, and how to use it to write a custom aggregation.
For example, we want to count the number of distinct merchants a user purchased from the in last 7 days, and I want to update that feature daily. We'll configure batch_schedule='1d'
and window='7d'
. Because Batch Window Aggregate Feature Views don't currently support count distinct aggregations, we need to implement this feature with a Batch Feature View.
In steady-state, Tecton will schedule a job every day, and expect only to have output within the most recent time period. For our example, if the job is running at 2021-01-15 00:00, Tecton will pass in transactions for a user from 2021-01-08 00:00 (inclusive) up until 2021-01-15 00:00 (exclusive), and our pipeline should output one row with the number of distinct merchants at the end of the period, 2021-01-14 23:59.
For more efficient backfills, Tecton will run multiple days' worth of feature transformations in a single job. Continuing our example, if we're backfilling the feature values from 2021-01-01 to 2021-01-15 then we need to output feature values for each day in between, where each day is an aggregation over the prior 7 days.
Feature Definition in Feature Repo¶
Here's what the full Batch Feature View definition will look like.
@transformation(mode='spark_sql')
def user_distinct_merchant_transaction_count_transformation(window_input_df):
return f'''
SELECT
nameorig AS user_id,
COUNT(DISTINCT namedest) AS distinct_merchant_count,
window_end AS timestamp
FROM {window_input_df}
GROUP BY
nameorig,
window_end
'''
@batch_feature_view(
inputs={'transactions_batch': Input(transactions_batch, window='7d')},
entities=[user],
mode='pipeline',
ttl='1d',
batch_schedule='1d',
online=True,
offline=True,
feature_start_time=datetime(2021, 4, 1),
family='fraud',
tags={'release': 'production'},
owner='matt@tecton.ai',
description='How many transactions the user has made to distinct merchants in the last 7 days.'
)
def user_distinct_merchant_transaction_count_7d(transactions_batch):
return user_distinct_merchant_transaction_count_transformation(
tecton_sliding_window(transactions_batch,
timestamp_key=const('timestamp'),
window_size=const('7d')))
Using tecton_sliding_window for time windowed aggregations¶
First, add the tecton_sliding_window()
transformation to your transformation pipeline.The tecton_sliding_window()
has 3 primary inputs:
df
: the input data.timestamp_key
: the timestamp column in your input data that represents the time of the event.window_size
: how far back in time the window should go. For example, if my feature is the number of distinct IDs in the last week, then the window size is 7 days.
In the example above, our transformation pipeline now looks like this:
def user_distinct_merchant_transaction_count_7d(transactions_batch):
return user_distinct_merchant_transaction_count_transformation(
tecton_sliding_window(transactions_batch,
timestamp_key=const('timestamp'),
window_size=const('7d')))
In the following transformation, you will 'group by' the window_end
column, alongside any entity columns. In the example above, our second transformation looks like this:
@transformation(mode='spark_sql')
def user_distinct_merchant_transaction_count_transformation(window_input_df):
return f'''
SELECT
nameorig AS user_id,
COUNT(DISTINCT namedest) AS distinct_merchant_count,
window_end AS timestamp
FROM {window_input_df}
GROUP BY
nameorig,
window_end
'''
And that's it! Tecton will now be able to calculate your feature that aggregates over the trailing 7 days.
Read on if you want to learn more about what happens inside tecton_sliding_window
.
tecton_sliding_window definition¶
The tecton_sliding_window explodes the input data such that a single input row turns in to window_size / slide_interval number of rows. This explode step allows you to easily group by each trailing time window. For example, if our aggregation window is 7 days, and the slide interval is 1 day, then each input row will output 7 rows, with timestamps corresponding to the 7 days in the window.
The following method illustrates how the tecton_sliding_window transformation is defined.
def tecton_sliding_window_demo(
df,
timestamp_key,
window_size,
slide_interval, # Defaults to Batch Schedule
feature_start_time, # Determined by Tecton at runtime
feature_end_time, # Determined by Tecton at runtime
window_column_name="window_end"
):
from pyspark.sql import functions as F
from tecton_spark.udfs import tecton_sliding_window_udf
return df.withColumn(
window_column_name,
F.explode(
tecton_sliding_window_udf(
F.col(timestamp_key),
F.lit(window_size),
F.lit(slide_interval),
F.lit(feature_start_time),
F.lit(feature_end_time),
)
),
)
Example walk-through¶
In the next few cells we'll show how some sample input data is transformed by tecton_sliding_window, and then how to aggregate the results in your feature.
Our sample data here contains historical transactions with the amount, time of transaction, and the destination account.
from pyspark.sql.functions import *
df = spark.createDataFrame(
[('2021-05-01', 100, 1, 'a'), \
('2021-05-02', 200, 1, 'a'), \
('2021-05-03', 300, 1, 'b'), \
('2021-05-04', 400, 1, 'a'), \
('2021-05-05', 500, 1, 'a')
],
["timestamp", "amount", "user_id", "merchant_id"]
)
df = df.select(to_timestamp(col("timestamp")).alias("timestamp"), col("amount"), col("user_id"), col("merchant_id"))
display(df)
timestamp | amount | user_id | merchant_id |
---|---|---|---|
2021-05-01T00:00:00.000+0000 | 100 | 1 | a |
2021-05-02T00:00:00.000+0000 | 200 | 1 | a |
2021-05-03T00:00:00.000+0000 | 300 | 1 | b |
2021-05-04T00:00:00.000+0000 | 400 | 1 | a |
2021-05-05T00:00:00.000+0000 | 500 | 1 | a |
tecton_sliding_window output¶
Now that we've run the input through tecton_sliding_window, you can see that each transaction has been repeated 7 times, each with a different value for window_end
. Each row represents the different windows the event is included in.
For example, a transaction occurring on 2021-05-30 will be included in the 7-day windows ending on 2021-05-01, 2021-05-02, ...
import datetime
end_time = datetime.datetime.utcnow() - datetime.timedelta(days=1)
start_time = datetime.datetime.utcnow() - datetime.timedelta(days=180)
window_df = tecton_sliding_window_demo(df, timestamp_key="timestamp", window_size="7d", slide_interval="1d", feature_start_time=start_time, feature_end_time=end_time)
window_df.createOrReplaceTempView("window_df")
display(window_df)
timestamp | amount | user_id | merchant_id | window_end |
---|---|---|---|---|
2021-05-01T00:00:00.000+0000 | 100 | 1 | a | 2021-05-01T23:59:59.999+0000 |
2021-05-01T00:00:00.000+0000 | 100 | 1 | a | 2021-05-02T23:59:59.999+0000 |
2021-05-01T00:00:00.000+0000 | 100 | 1 | a | 2021-05-03T23:59:59.999+0000 |
2021-05-01T00:00:00.000+0000 | 100 | 1 | a | 2021-05-04T23:59:59.999+0000 |
2021-05-01T00:00:00.000+0000 | 100 | 1 | a | 2021-05-05T23:59:59.999+0000 |
2021-05-01T00:00:00.000+0000 | 100 | 1 | a | 2021-05-06T23:59:59.999+0000 |
2021-05-01T00:00:00.000+0000 | 100 | 1 | a | 2021-05-07T23:59:59.999+0000 |
2021-05-02T00:00:00.000+0000 | 200 | 1 | a | 2021-05-02T23:59:59.999+0000 |
2021-05-02T00:00:00.000+0000 | 200 | 1 | a | 2021-05-03T23:59:59.999+0000 |
2021-05-02T00:00:00.000+0000 | 200 | 1 | a | 2021-05-04T23:59:59.999+0000 |
2021-05-02T00:00:00.000+0000 | 200 | 1 | a | 2021-05-05T23:59:59.999+0000 |
2021-05-02T00:00:00.000+0000 | 200 | 1 | a | 2021-05-06T23:59:59.999+0000 |
2021-05-02T00:00:00.000+0000 | 200 | 1 | a | 2021-05-07T23:59:59.999+0000 |
2021-05-02T00:00:00.000+0000 | 200 | 1 | a | 2021-05-08T23:59:59.999+0000 |
2021-05-03T00:00:00.000+0000 | 300 | 1 | b | 2021-05-03T23:59:59.999+0000 |
2021-05-03T00:00:00.000+0000 | 300 | 1 | b | 2021-05-04T23:59:59.999+0000 |
2021-05-03T00:00:00.000+0000 | 300 | 1 | b | 2021-05-05T23:59:59.999+0000 |
2021-05-03T00:00:00.000+0000 | 300 | 1 | b | 2021-05-06T23:59:59.999+0000 |
2021-05-03T00:00:00.000+0000 | 300 | 1 | b | 2021-05-07T23:59:59.999+0000 |
2021-05-03T00:00:00.000+0000 | 300 | 1 | b | 2021-05-08T23:59:59.999+0000 |
2021-05-03T00:00:00.000+0000 | 300 | 1 | b | 2021-05-09T23:59:59.999+0000 |
2021-05-04T00:00:00.000+0000 | 400 | 1 | a | 2021-05-04T23:59:59.999+0000 |
2021-05-04T00:00:00.000+0000 | 400 | 1 | a | 2021-05-05T23:59:59.999+0000 |
2021-05-04T00:00:00.000+0000 | 400 | 1 | a | 2021-05-06T23:59:59.999+0000 |
2021-05-04T00:00:00.000+0000 | 400 | 1 | a | 2021-05-07T23:59:59.999+0000 |
2021-05-04T00:00:00.000+0000 | 400 | 1 | a | 2021-05-08T23:59:59.999+0000 |
2021-05-04T00:00:00.000+0000 | 400 | 1 | a | 2021-05-09T23:59:59.999+0000 |
2021-05-04T00:00:00.000+0000 | 400 | 1 | a | 2021-05-10T23:59:59.999+0000 |
2021-05-05T00:00:00.000+0000 | 500 | 1 | a | 2021-05-05T23:59:59.999+0000 |
2021-05-05T00:00:00.000+0000 | 500 | 1 | a | 2021-05-06T23:59:59.999+0000 |
2021-05-05T00:00:00.000+0000 | 500 | 1 | a | 2021-05-07T23:59:59.999+0000 |
2021-05-05T00:00:00.000+0000 | 500 | 1 | a | 2021-05-08T23:59:59.999+0000 |
2021-05-05T00:00:00.000+0000 | 500 | 1 | a | 2021-05-09T23:59:59.999+0000 |
2021-05-05T00:00:00.000+0000 | 500 | 1 | a | 2021-05-10T23:59:59.999+0000 |
2021-05-05T00:00:00.000+0000 | 500 | 1 | a | 2021-05-11T23:59:59.999+0000 |
Feature Aggregation¶
Finally, we group by our user_id
and window
columns to count the distinct merchants for each 30 day period ending at the window_end
time.
%sql
SELECT
window_end,
user_id,
COUNT(DISTINCT merchant_id) AS distinct_merchant_count
FROM window_df
GROUP BY
user_id,
window_end
ORDER BY user_id, window_end DESC
window_end | user_id | distinct_merchant_count |
---|---|---|
2021-05-11T23:59:59.999+0000 | 1 | 1 |
2021-05-10T23:59:59.999+0000 | 1 | 1 |
2021-05-09T23:59:59.999+0000 | 1 | 2 |
2021-05-08T23:59:59.999+0000 | 1 | 2 |
2021-05-07T23:59:59.999+0000 | 1 | 2 |
2021-05-06T23:59:59.999+0000 | 1 | 2 |
2021-05-05T23:59:59.999+0000 | 1 | 2 |
2021-05-04T23:59:59.999+0000 | 1 | 2 |
2021-05-03T23:59:59.999+0000 | 1 | 2 |
2021-05-02T23:59:59.999+0000 | 1 | 1 |
2021-05-01T23:59:59.999+0000 | 1 | 1 |