Batch Feature Usage¶
In [ ]:
Copied!
import tecton
import pandas
from datetime import datetime
import pyspark.sql.functions as F
ws = tecton.get_workspace("prod")
import tecton
import pandas
from datetime import datetime
import pyspark.sql.functions as F
ws = tecton.get_workspace("prod")
Load a Batch Feature¶
In [ ]:
Copied!
fv = ws.get_feature_view("user_distinct_merchant_transaction_count_30d")
fv.summary()
fv = ws.get_feature_view("user_distinct_merchant_transaction_count_30d")
fv.summary()
Out[5]:
Name | user_distinct_merchant_transaction_count_30d |
Workspace | prod |
Description | How many transactions the user has made to distinct merchants in the last 30 days. |
Created At | 2022-06-02 23:52:09 UTC |
Owner | david@tecton.ai |
Last Modified By | david@tecton.ai |
Family | |
Source Filename | fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py |
Tags | {'release': 'production'} |
Type | BatchFeatureView |
URL | https://staging.tecton.ai/app/repo/prod/features/user_distinct_merchant_transaction_count_30d |
Entities | fraud_user |
Features | distinct_merchant_transaction_count_30d |
Feature Services | fraud_detection_feature_service |
Transformation | user_distinct_merchant_transaction_count_30d |
Timestamp Key | timestamp |
Online Materialization | Enabled |
Offline Materialization | Enabled |
Feature Start Time | 2022-04-01 00:00:00 UTC |
Online Join Keys | user_id |
Offline Join Keys | user_id |
Serving TTL | 2 days |
Schedule Interval | 1 days |
Online Serving Freshness | 16h 9m 43s |
Materialization Status | [2022-03-02 00:00:00 UTC, 2022-06-07 00:00:00 UTC] -> Ok |
Run Feature View Transformation Pipeline¶
See the API reference for the specific parameters available for each type of Feature View.
If end_time
is not set, it will be defaulted to datetime.now()
If start_time
is not set, it will be defaulted to end_time - <a materialization scheduling interval>
In [ ]:
Copied!
result_dataframe = fv.run(start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2))
display(result_dataframe.to_spark().limit(5))
result_dataframe = fv.run(start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2))
display(result_dataframe.to_spark().limit(5))
user_id | timestamp | distinct_merchant_transaction_count_30d |
---|---|---|
user_268308151877 | 2022-05-01T23:59:59.999+0000 | 669 |
user_722584453020 | 2022-05-01T23:59:59.999+0000 | 690 |
user_457435146833 | 2022-05-01T23:59:59.999+0000 | 692 |
user_782510788708 | 2022-05-01T23:59:59.999+0000 | 607 |
user_402539845901 | 2022-05-01T23:59:59.999+0000 | 693 |
Run with Mock Inputs¶
In [ ]:
Copied!
%sql
DESCRIBE demo_fraud_v2.transactions
%sql
DESCRIBE demo_fraud_v2.transactions
col_name | data_type | comment |
---|---|---|
user_id | string | null |
transaction_id | string | null |
category | string | null |
amt | double | null |
is_fraud | bigint | null |
merchant | string | null |
merch_lat | double | null |
merch_long | double | null |
timestamp | string | null |
partition_0 | string | null |
partition_1 | string | null |
partition_2 | string | null |
partition_3 | string | null |
# Partition Information | ||
# col_name | data_type | comment |
partition_0 | string | null |
partition_1 | string | null |
partition_2 | string | null |
partition_3 | string | null |
In [ ]:
Copied!
# Mock data schema must follow the DataSource's schema of the FeatureView.
transactions_batch_data = pandas.DataFrame({
"user_id": ["user_1", "user_1", "user_2"],
"transaction_id": ["", "", ""],
"category": ["", "", ""],
"amt": [0.0, 0.0, 0.0],
"is_fraud": [0, 0, 0],
"merchant": ["merchant_1", "merchant_2", "merchant_1"],
"merch_lat": [0.0, 0.0, 0.0],
"merch_long": [0.0, 0.0, 0.0],
"timestamp": [datetime(2022, 4, 10, 0), datetime(2022, 4, 25, 0), datetime(2022, 5, 1, 0)],
"partition_0": ["2022", "2022", "2022"],
"partition_1": ["04", "04", "05"],
"partition_2": ["10", "25", "01"],
"partition_3": ["00", "00", "00"],
})
result_dataframe = fv.run(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
transactions_batch=transactions_batch_data) # `transactions_batch` is the name of this FeatureView input.
display(result_dataframe.to_spark().limit(10))
# Mock data schema must follow the DataSource's schema of the FeatureView.
transactions_batch_data = pandas.DataFrame({
"user_id": ["user_1", "user_1", "user_2"],
"transaction_id": ["", "", ""],
"category": ["", "", ""],
"amt": [0.0, 0.0, 0.0],
"is_fraud": [0, 0, 0],
"merchant": ["merchant_1", "merchant_2", "merchant_1"],
"merch_lat": [0.0, 0.0, 0.0],
"merch_long": [0.0, 0.0, 0.0],
"timestamp": [datetime(2022, 4, 10, 0), datetime(2022, 4, 25, 0), datetime(2022, 5, 1, 0)],
"partition_0": ["2022", "2022", "2022"],
"partition_1": ["04", "04", "05"],
"partition_2": ["10", "25", "01"],
"partition_3": ["00", "00", "00"],
})
result_dataframe = fv.run(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
transactions_batch=transactions_batch_data) # `transactions_batch` is the name of this FeatureView input.
display(result_dataframe.to_spark().limit(10))
user_id | timestamp | distinct_merchant_transaction_count_30d |
---|---|---|
user_1 | 2022-05-01T23:59:59.999+0000 | 2 |
user_2 | 2022-05-01T23:59:59.999+0000 | 1 |
Run for Batch Feature View with Tiled Aggregations¶
BatchFeatureView::run
for feature views with aggregations is quite similar to with the only different that it also supports aggregation_level parameter.
When a feature view with tile aggregates, the query operates in three logical steps:
- The feature view query is run over the provided time range. The user defined transformations are applied over the data source.
- The result of #1 is aggregated into tiles the size of the
aggregation_interval
. - The tiles from #2 are combined to form the final feature values. The number of tiles that are combined is based off of the
time_window
of the aggregation.
To see the output of #1, use aggregation_level="disabled"
. For #2, use aggregation_level="partial"
. For #3, use aggregation_level="full"
.
aggregation_level="full"
is the default behavior.
For more details on aggregate_tiles, please see Tecton Documentation
In [ ]:
Copied!
agg_fv = ws.get_feature_view('user_transaction_counts')
result_dataframe = agg_fv.run(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
aggregation_level="disabled")
display(result_dataframe.to_spark().orderBy(F.col("user_id")).limit(5))
agg_fv = ws.get_feature_view('user_transaction_counts')
result_dataframe = agg_fv.run(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
aggregation_level="disabled")
display(result_dataframe.to_spark().orderBy(F.col("user_id")).limit(5))
user_id | transaction | timestamp |
---|---|---|
user_131340471060 | 1 | 2022-05-01T08:48:36.622+0000 |
user_131340471060 | 1 | 2022-05-01T22:22:42.916+0000 |
user_131340471060 | 1 | 2022-05-01T17:53:13.465+0000 |
user_131340471060 | 1 | 2022-05-01T08:47:22.993+0000 |
user_131340471060 | 1 | 2022-05-01T22:21:42.763+0000 |
In [ ]:
Copied!
result_dataframe = agg_fv.run(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
aggregation_level="partial")
display(result_dataframe.to_spark().orderBy(F.col("user_id")).limit(5))
result_dataframe = agg_fv.run(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
aggregation_level="partial")
display(result_dataframe.to_spark().orderBy(F.col("user_id")).limit(5))
user_id | transaction_count_1d | tile_start_time | tile_end_time |
---|---|---|---|
user_131340471060 | 406 | 2022-05-01T00:00:00.000+0000 | 2022-05-02T00:00:00.000+0000 |
user_200441593087 | 3 | 2022-05-01T00:00:00.000+0000 | 2022-05-02T00:00:00.000+0000 |
user_205125746682 | 508 | 2022-05-01T00:00:00.000+0000 | 2022-05-02T00:00:00.000+0000 |
user_212730160038 | 152 | 2022-05-01T00:00:00.000+0000 | 2022-05-02T00:00:00.000+0000 |
user_222506789984 | 2075 | 2022-05-01T00:00:00.000+0000 | 2022-05-02T00:00:00.000+0000 |
In [ ]:
Copied!
result_dataframe = agg_fv.run(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 30), # Note: to get an interesting "full" aggregation, we need to run over multiple periods.
aggregation_level="full")
display(result_dataframe.to_spark().orderBy(F.col("user_id"), F.col("timestamp").desc()).limit(5))
result_dataframe = agg_fv.run(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 30), # Note: to get an interesting "full" aggregation, we need to run over multiple periods.
aggregation_level="full")
display(result_dataframe.to_spark().orderBy(F.col("user_id"), F.col("timestamp").desc()).limit(5))
WARNING - 06/07/2022 06:59:27 PM - InteractiveRunApi - Run time range (2022-05-01 00:00:00, 2022-05-30 00:00:00) is smaller than the maximum aggregation size: 90 days, 0:00:00. This may lead to incorrect aggregate feature values.
user_id | timestamp | transaction_count_1d_1d | transaction_count_30d_1d | transaction_count_90d_1d |
---|---|---|---|---|
user_131340471060 | 2022-05-30T00:00:00.000+0000 | 353 | 10061 | 10061 |
user_131340471060 | 2022-05-29T00:00:00.000+0000 | 350 | 9708 | 9708 |
user_131340471060 | 2022-05-28T00:00:00.000+0000 | 352 | 9358 | 9358 |
user_131340471060 | 2022-05-27T00:00:00.000+0000 | 342 | 9006 | 9006 |
user_131340471060 | 2022-05-26T00:00:00.000+0000 | 362 | 8664 | 8664 |
Get a Range of Feature Values from Offline Feature Store¶
In [ ]:
Copied!
fv.get_historical_features(start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2)).to_pandas().head(5)
fv.get_historical_features(start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2)).to_pandas().head(5)
Out[45]:
user_id | distinct_merchant_transaction_count_30d | timestamp | |
---|---|---|---|
0 | user_722584453020 | 690 | 2022-05-01 23:59:59.999999 |
1 | user_461615966685 | 691 | 2022-05-01 23:59:59.999999 |
2 | user_724235628997 | 691 | 2022-05-01 23:59:59.999999 |
3 | user_205125746682 | 669 | 2022-05-01 23:59:59.999999 |
4 | user_609904782486 | 674 | 2022-05-01 23:59:59.999999 |
Read the Latest Features from Online Feature Store¶
In [ ]:
Copied!
fv.get_online_features({"user_id": "user_609904782486"}).to_dict()
fv.get_online_features({"user_id": "user_609904782486"}).to_dict()
Out[37]: {'distinct_merchant_transaction_count_30d': 678}
Read Historical Features from Offline Feature Store with Time-Travel¶
In [ ]:
Copied!
spine_df = pandas.DataFrame({
'user_id': ['user_722584453020', 'user_461615966685'],
'timestamp': [datetime(2022, 5, 1, 3, 20, 0), datetime(2022, 6, 6, 2, 30, 0)]
})
display(spine_df)
spine_df = pandas.DataFrame({
'user_id': ['user_722584453020', 'user_461615966685'],
'timestamp': [datetime(2022, 5, 1, 3, 20, 0), datetime(2022, 6, 6, 2, 30, 0)]
})
display(spine_df)
user_id | timestamp |
---|---|
user_722584453020 | 2022-05-01T03:20:00.000+0000 |
user_461615966685 | 2022-06-06T02:30:00.000+0000 |
In [ ]:
Copied!
result_dataframe = fv.get_historical_features(spine_df).to_spark()
display(result_dataframe)
result_dataframe = fv.get_historical_features(spine_df).to_spark()
display(result_dataframe)
INFO - 06/07/2022 04:52:43 PM - Query Builder - user_distinct_merchant_transaction_count_30d: (FV 1/1) Start Build [1/3]
INFO - 06/07/2022 04:52:43 PM - Query Builder - user_distinct_merchant_transaction_count_30d: Building Tile DataFrame [2/3]
INFO - 06/07/2022 04:52:44 PM - Query Builder - user_distinct_merchant_transaction_count_30d: Generating Join [3/3]
user_id | timestamp | user_distinct_merchant_transaction_count_30d__distinct_merchant_transaction_count_30d |
---|---|---|
user_722584453020 | 2022-05-01T03:20:00.000+0000 | 690 |
user_461615966685 | 2022-06-06T02:30:00.000+0000 | 691 |