Batch Feature Usage¶
In [ ]:
Copied!
import tecton
import pandas
from datetime import datetime
import dateutil.parser
import tecton
import pandas
from datetime import datetime
import dateutil.parser
Load a Batch Feature¶
In [ ]:
Copied!
fv = tecton.get_feature_view("user_distinct_merchant_transaction_count_30d")
fv.summary()
fv = tecton.get_feature_view("user_distinct_merchant_transaction_count_30d")
fv.summary()
Name | user_distinct_merchant_transaction_count_30d |
Workspace | prod |
Description | How many transactions the user has made to distinct merchants in the last 30 days. |
Created At | 2021-06-24 19:21:49 UTC |
Owner | matt@tecton.ai |
Last Modified By | matt@tecton.ai |
Family | fraud |
Source Filename | .direnv/python-3.7.4/lib/python3.7/site-packages/tecton/cli/common.py |
Tags | {'release': 'production'} |
Type | BatchFeatureView |
URL | https://app.tecton.ai/app/repo/prod/features/user_distinct_merchant_transaction_count_30d |
Entities | fraud_user |
Features | distinct_merchant_transaction_count_30d |
Feature Services | fraud_detection_feature_service |
Transformation | user_distinct_merchant_transaction_count_30d |
Timestamp Key | timestamp |
Online Materialization | Enabled |
Offline Materialization | Enabled |
Feature Start Time | 2021-04-01 00:00:00 UTC |
Online Join Keys | user_id |
Offline Join Keys | user_id |
Serving TTL | 1 days |
Schedule Interval | 1 days |
Online Serving Freshness | 20h 29s |
Materialization Status | [2021-03-02 00:00:00 UTC, 2021-06-24 00:00:00 UTC] -> Ok |
Run Feature View Transformation Pipeline¶
See the API reference for the specific parameters available for each type of Feature View.
If feature_end_time is not set, it will be defaulted to datetime.now()
If feature_start_time is not set, it will be defaulted to feature_end_time - <a materialization scheduling interval>
In [ ]:
Copied!
result_dataframe = fv.run(feature_start_time=datetime(2021, 6, 21), feature_end_time=datetime(2021, 6, 22))
display(result_dataframe.to_spark().limit(5))
result_dataframe = fv.run(feature_start_time=datetime(2021, 6, 21), feature_end_time=datetime(2021, 6, 22))
display(result_dataframe.to_spark().limit(5))
Run with Mock Inputs¶
In [ ]:
Copied!
# Mock data schema must follow the DataSource's schema of the FeatureView.
transactions_batch_data = [{
"amount": 1.0,
"nameorig": "user_1",
"namedest": "user_2",
"isfraud": 0,
"isflaggedfraud": 0,
"timestamp": datetime(2021, 6, 21, 18, 0, 0, 0),
"type_cash_in": 0,
"type_cash_out": 0,
"type_payment": 1,
"type_transfer": 0,
"type_debit": 0,
"__index_level_0__": 0, "partition_0": "", "partition_1": "", "partition_2": "", "partition_3": ""
}]
result_dataframe = fv.run(
feature_start_time=datetime(2021, 6, 21),
feature_end_time=datetime(2021, 6, 22),
transactions_batch=spark.createDataFrame(transactions_batch_data)) # `transactions_batch` is the name of this FeatureView input.
display(result_dataframe.to_spark().limit(10))
# Mock data schema must follow the DataSource's schema of the FeatureView.
transactions_batch_data = [{
"amount": 1.0,
"nameorig": "user_1",
"namedest": "user_2",
"isfraud": 0,
"isflaggedfraud": 0,
"timestamp": datetime(2021, 6, 21, 18, 0, 0, 0),
"type_cash_in": 0,
"type_cash_out": 0,
"type_payment": 1,
"type_transfer": 0,
"type_debit": 0,
"__index_level_0__": 0, "partition_0": "", "partition_1": "", "partition_2": "", "partition_3": ""
}]
result_dataframe = fv.run(
feature_start_time=datetime(2021, 6, 21),
feature_end_time=datetime(2021, 6, 22),
transactions_batch=spark.createDataFrame(transactions_batch_data)) # `transactions_batch` is the name of this FeatureView input.
display(result_dataframe.to_spark().limit(10))
Run for Batch Window Aggregate Feature View¶
BatchWindowAggregateFeatureView::run
is quite similar to with the only different that it also supports aggregate_tiles flag.
If aggregate_tiles is True
(default behavior), the result rows which the timestamps fall within the same aggregate tile will be aggregated up into an output row.
If aggregate_tiles is False
, the feature rows won't be aggregated, and will be all listed out.
For more details on aggregate_tiles, please see Tecton Documentation
In [ ]:
Copied!
agg_fv = tecton.get_feature_view('user_transaction_counts')
result_dataframe = agg_fv.run(
feature_start_time=datetime(2021, 6, 21),
feature_end_time=datetime(2021, 6, 22),
aggregate_tiles=True)
display(result_dataframe.to_spark())
agg_fv = tecton.get_feature_view('user_transaction_counts')
result_dataframe = agg_fv.run(
feature_start_time=datetime(2021, 6, 21),
feature_end_time=datetime(2021, 6, 22),
aggregate_tiles=True)
display(result_dataframe.to_spark())
Get a Range of Feature Values from Offline Feature Store¶
In [ ]:
Copied!
fv.get_historical_features(start_time=datetime(2021, 6, 21), end_time=datetime(2021, 6, 22)).to_pandas().head()
fv.get_historical_features(start_time=datetime(2021, 6, 21), end_time=datetime(2021, 6, 22)).to_pandas().head()
user_id | distinct_merchant_transaction_count_30d | timestamp | |
---|---|---|---|
0 | C1267200339 | 1 | 2021-06-21 23:59:59 |
1 | C80644064 | 2 | 2021-06-21 23:59:59 |
2 | C1126579915 | 4 | 2021-06-21 23:59:59 |
3 | C810566550 | 3 | 2021-06-21 23:59:59 |
4 | C1160723146 | 1 | 2021-06-21 23:59:59 |
Read the Latest Features from Online Feature Store¶
In [ ]:
Copied!
fv.get_online_features(join_keys={"user_id": "C1126579915"}).to_dict()
fv.get_online_features(join_keys={"user_id": "C1126579915"}).to_dict()
Out[19]: {'distinct_merchant_transaction_count_30d': 4}
Read Historical Features from Offline Feature Store with Time-Travel¶
In [ ]:
Copied!
spine_df = pandas.DataFrame({
'user_id': ['C1126579915', 'C810566550'],
'timestamp': [dateutil.parser.parse("2021-06-22 03:20:00"), dateutil.parser.parse("2021-06-14 18:00:00")]
})
display(spine_df)
spine_df = pandas.DataFrame({
'user_id': ['C1126579915', 'C810566550'],
'timestamp': [dateutil.parser.parse("2021-06-22 03:20:00"), dateutil.parser.parse("2021-06-14 18:00:00")]
})
display(spine_df)
user_id | timestamp |
---|---|
C1126579915 | 2021-06-22T03:20:00.000+0000 |
C810566550 | 2021-06-14T18:00:00.000+0000 |
In [ ]:
Copied!
features_df = fv.get_historical_features(spine=spine_df).to_pandas()
display(features_df)
features_df = fv.get_historical_features(spine=spine_df).to_pandas()
display(features_df)
user_id | timestamp | user_distinct_merchant_transaction_count_30d.distinct_merchant_transaction_count_30d |
---|---|---|
C1126579915 | 2021-06-22T03:20:00.000+0000 | 4 |
C810566550 | 2021-06-14T18:00:00.000+0000 | 2 |