Stream Feature Usage¶
In [0]:
Copied!
import tecton
import pandas
from datetime import datetime
import dateutil.parser
import tecton
import pandas
from datetime import datetime
import dateutil.parser
Load a Stream Feature¶
In [0]:
Copied!
fv = tecton.get_feature_view("last_transaction_amount_sql")
fv.summary()
fv = tecton.get_feature_view("last_transaction_amount_sql")
fv.summary()
Name | last_transaction_amount_sql |
Workspace | prod |
Description | Last user transaction amount (stream calculated) |
Created At | 2021-06-24 19:21:44 UTC |
Owner | |
Last Modified By | matt@tecton.ai |
Family | fraud |
Source Filename | .direnv/python-3.7.4/lib/python3.7/site-packages/tecton/cli/common.py |
Tags | {} |
Type | StreamFeatureView |
URL | https://app.tecton.ai/app/repo/prod/features/last_transaction_amount_sql |
Entities | fraud_user |
Features | amount |
Feature Services | fraud_detection_feature_service, continuous_feature_service |
Transformation | last_transaction_amount_sql |
Timestamp Key | timestamp |
Online Materialization | Enabled |
Offline Materialization | Enabled |
Feature Start Time | 2021-05-20 00:00:00 UTC |
Online Join Keys | user_id |
Offline Join Keys | user_id |
Serving TTL | 30 days |
Schedule Interval | 1 days |
Online Serving Status | Running |
Online Serving Freshness | 31s |
Materialization Status | [2021-05-19 00:00:00 UTC, 2021-06-24 00:00:00 UTC] -> Ok |
In [ ]:
Copied!
## Start a Streaming Job for FeatureView Transformation Results
## Start a Streaming Job for FeatureView Transformation Results
In [ ]:
Copied!
fv.run_stream(output_temp_table="output_temp_table")
fv.run_stream(output_temp_table="output_temp_table")
In [ ]:
Copied!
# Query the result from the streaming output table.
display(spark.sql("SELECT * FROM output_temp_table ORDER BY timestamp DESC LIMIT 5"))
# Query the result from the streaming output table.
display(spark.sql("SELECT * FROM output_temp_table ORDER BY timestamp DESC LIMIT 5"))
Get a Range of Feature Values from Offline Feature Store¶
In [0]:
Copied!
fv.get_historical_features(start_time=datetime(2021, 6, 1), end_time=datetime(2021, 6, 30)).to_pandas().head()
fv.get_historical_features(start_time=datetime(2021, 6, 1), end_time=datetime(2021, 6, 30)).to_pandas().head()
user_id | amount | timestamp | |
---|---|---|---|
0 | C1026457687 | 397567.87 | 2021-06-08 21:23:44.155591 |
1 | C2041618417 | 159743.98 | 2021-06-08 21:23:47.198705 |
2 | C1852835046 | 317763.09 | 2021-06-08 21:23:49.845551 |
3 | C62770848 | 11338.01 | 2021-06-08 21:23:53.194352 |
4 | C791189536 | 70157.52 | 2021-06-08 21:23:57.874188 |
Read the Latest Features from Online Feature Store¶
In [0]:
Copied!
fv.get_online_features(join_keys={"user_id": "C1026457687"}).to_dict()
fv.get_online_features(join_keys={"user_id": "C1026457687"}).to_dict()
Out[5]: {'amount': 38050.21}
Read Historical Features from Offline Feature Store with Time-Travel¶
In [0]:
Copied!
spine_df = pandas.DataFrame({
'user_id': ['C1026457687', 'C2041618417'],
'timestamp': [dateutil.parser.parse("2021-06-22 03:20:00"), dateutil.parser.parse("2021-06-14 18:00:00")]
})
display(spine_df)
spine_df = pandas.DataFrame({
'user_id': ['C1026457687', 'C2041618417'],
'timestamp': [dateutil.parser.parse("2021-06-22 03:20:00"), dateutil.parser.parse("2021-06-14 18:00:00")]
})
display(spine_df)
user_id | timestamp |
---|---|
C1026457687 | 2021-06-22T03:20:00.000+0000 |
C2041618417 | 2021-06-14T18:00:00.000+0000 |
In [0]:
Copied!
features_df = fv.get_historical_features(spine=spine_df).to_pandas()
display(features_df)
features_df = fv.get_historical_features(spine=spine_df).to_pandas()
display(features_df)
user_id | timestamp | last_transaction_amount_sql.amount |
---|---|---|
C2041618417 | 2021-06-14T18:00:00.000+0000 | 159743.98 |
C1026457687 | 2021-06-22T03:20:00.000+0000 | 9214.19 |