From 852c3f8fbddca577477f4d91b7983ce371e9e97d Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Mon, 20 Mar 2023 17:32:18 -0700 Subject: [PATCH 1/2] backfill prototype --- .../swa/SlidingWindowAggregationJoiner.scala | 11 +- .../localAnchorTestObsData.avro.json | 150 +++++--------- .../resources/slidingWindowAgg/test.avro.json | 111 ++++++++++ .../offline/SlidingWindowAggIntegTest.scala | 189 ++++++------------ .../NebulaCounterKeyExtractor.scala | 34 ++++ gradle.properties | 2 +- 6 files changed, 260 insertions(+), 237 deletions(-) create mode 100644 feathr-impl/src/test/resources/slidingWindowAgg/test.avro.json create mode 100644 feathr-impl/src/test/scala/com/linkedin/feathr/offline/anchored/keyExtractor/NebulaCounterKeyExtractor.scala diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala index 6f3458df7..5873ddf93 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala @@ -258,8 +258,11 @@ private[offline] class SlidingWindowAggregationJoiner( contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList) + val missingFeatures = factDataDefs.flatMap (factData => factData.aggFeatures.map(feature => feature.name)) diff (contextDF.columns diff origContextObsColumns) + log.warn("The following features are missing because of an IO Exception" + missingFeatures) + val finalJoinedFeatures = joinedFeatures diff missingFeatures contextDF = if (shouldFilterNulls && !factDataRowsWithNulls.isEmpty) { - val nullDfWithFeatureCols = joinedFeatures.foldLeft(factDataRowsWithNulls)((s, x) => s.withColumn(x, lit(null))) + val nullDfWithFeatureCols = finalJoinedFeatures.foldLeft(factDataRowsWithNulls)((s, x) => s.withColumn(x, lit(null))) contextDF.union(nullDfWithFeatureCols) } else contextDF @@ -272,13 +275,13 @@ private[offline] class SlidingWindowAggregationJoiner( .asInstanceOf[TimeWindowConfigurableAnchorExtractor].features(nameToFeatureAnchor._1).columnFormat) val FeatureDataFrame(withFDSFeatureDF, inferredTypes) = - SlidingWindowFeatureUtils.convertSWADFToFDS(contextDF, joinedFeatures.toSet, featureNameToColumnFormat, userSpecifiedTypesConfig) + SlidingWindowFeatureUtils.convertSWADFToFDS(contextDF, finalJoinedFeatures.toSet, featureNameToColumnFormat, userSpecifiedTypesConfig) // apply default on FDS dataset val withFeatureContextDF = - substituteDefaults(withFDSFeatureDF, defaults.keys.filter(joinedFeatures.contains).toSeq, defaults, userSpecifiedTypesConfig, ss) + substituteDefaults(withFDSFeatureDF, defaults.keys.filter(finalJoinedFeatures.contains).toSeq, defaults, userSpecifiedTypesConfig, ss) allInferredFeatureTypes ++= inferredTypes - contextDF = standardizeFeatureColumnNames(origContextObsColumns, withFeatureContextDF, joinedFeatures, keyTags.map(keyTagList)) + contextDF = standardizeFeatureColumnNames(origContextObsColumns, withFeatureContextDF, finalJoinedFeatures, keyTags.map(keyTagList)) if (shouldCheckPoint(ss)) { // checkpoint complicated dataframe for each stage to avoid Spark failure contextDF = contextDF.checkpoint(true) diff --git a/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData.avro.json b/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData.avro.json index 616b0f409..c2b5d4452 100644 --- a/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData.avro.json +++ b/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData.avro.json @@ -1,119 +1,65 @@ { "schema": { - "type": "record", - "name": "NTVObs", - "doc": "Daily or multi-day aggregated a activity features generated from similar data sources.", - "namespace": "com.linkedin.feathr.offline.data", - "fields": [ - { - "name": "x", - "type": [ - "null", - "string" - ], - "doc": "" - }, - { - "name": "y", - "type": "string", - "doc": "" - }, - { - "name": "timestamp", - "type": "string" - }, - { - "name": "passthroughFeatures", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "features", - "fields": [ - { - "name": "name", - "type": "string" - }, - { - "name": "term", - "type": "string" - }, - { - "name": "value", - "type": "float" - } - ] - } - } + "type" : "record", + "name" : "NTVObs2", + "fields" : [ { + "name" : "header", + "type" : { + "type" : "record", + "name" : "EventHeader", + "fields" : [ { + "name" : "memberId", + "type" : "long", + "doc" : "The LinkedIn member ID of the user initiating the action. LinkedIn member IDs are integers greater than zero. Guests are represented either as zero or a negative number." + }, { + "name" : "time", + "type" : "long", + "doc" : "The time of the event" + }, { + "name" : "bucket_time", + "type" : "long", + "doc" : "The time of the event" + }] } - ] + }] }, "data": [ { - "x": { - "string": "a1" - }, - "y": "a2", - "timestamp": "2018-05-03", - "passthroughFeatures": [ - { - "name": "f1f1", - "term": "f1t1", - "value": 12 - } - ] + "header" : { + "memberId" : 1561, + "time" : 1613361620, + "bucket_time": 1613361600 + } }, { - "x": { - "string": "a2" - }, - "y": "a1", - "timestamp": "2018-05-03", - "passthroughFeatures": [ - { - "name": "f1f1", - "term": "f1t1", - "value": 12 - } - ] + "header" : { + "memberId" : 1561, + "time" : 1613361679, + "bucket_time": 1613361660 + } }, { - "x": null, - "y": "a5", - "timestamp": "2018-05-03", - "passthroughFeatures": [ - { - "name": "f1f1", - "term": "f1t1", - "value": 12 - } - ] + "header": { + "memberId": 1561, + "time": 1613361610, + "bucket_time": 1613361600 + } }, { - "x": null, - "y": "a8", - "timestamp": "2018-05-03", - "passthroughFeatures": [ - { - "name": "f1f1", - "term": "f1t1", - "value": 12 - } - ] + "header" : { + "memberId" : 1561, + "time" : 1613361639, + "bucket_time": 1613361630 + } }, { - "x": { - "string": "xyz" - }, - "y": "abc", - "timestamp": "2018-04-30", - "passthroughFeatures": [ - { - "name": "f2f2", - "term": "f2t2", - "value": 12 - } - ] + "header" : { + "memberId" : 1561, + "time" : 1613361643, + "bucket_time": 1613361630 + + } } ] + } \ No newline at end of file diff --git a/feathr-impl/src/test/resources/slidingWindowAgg/test.avro.json b/feathr-impl/src/test/resources/slidingWindowAgg/test.avro.json new file mode 100644 index 000000000..35a9d1d91 --- /dev/null +++ b/feathr-impl/src/test/resources/slidingWindowAgg/test.avro.json @@ -0,0 +1,111 @@ +{ + "schema": { + "type": "record", + "name": "NTVObs", + "doc": "Daily or multi-day aggregated member activity features generated from similar data sources.", + "fields": [ + { + "name": "memberId", + "type": "long", + "doc": "id of the member" + }, + { + "name": "time", + "type": "long", + "doc": "timestamp" + }, + { + "name" : "bucket_time", + "type" : "long", + "doc" : "The time of the event" + }, + { + "name": "passthroughFeatures", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "features", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "term", + "type": "string" + }, + { + "name": "value", + "type": "float" + } + ] + } + } + } + ] + }, + "data": [ + { + "memberId": 1561, + "time": 1613361699, + "bucket_time": 1613361690, + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "memberId": 2, + "time": 1613361670, + "bucket_time": 1613361660, + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "memberId": 1561, + "time": 1613361650, + "bucket_time": 1613361630, + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "memberId": 1561, + "time": 1613361649, + "bucket_time": 1613361630, + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "memberId": 1561, + "time": 1613361655, + "bucket_time": 1613361630, + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + } + + ] +} \ No newline at end of file diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 3370a6646..92a687fde 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -22,155 +22,84 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { @Test def testLocalAnchorSWATest: Unit = { val df = runLocalFeatureJoinForTest( - joinConfigAsString = """ - | settings: { - | observationDataTimeSettings: { - | absoluteTimeRange: { - | startTime: "2018-05-01" - | endTime: "2018-05-03" - | timeFormat: "yyyy-MM-dd" - | } - | } - | joinTimeSettings: { - | timestampColumn: { - | def: timestamp - | format: "yyyy-MM-dd" - | } - | } - |} - | - |features: [ - | { - | key: [x], - | featureList: ["f1", "f1Sum", "f2", "f1f1"] - | }, - | { - | key: [x, y] - | featureList: ["f3", "f4"] - | } - |] + joinConfigAsString = + """ + |settings: { + | joinTimeSettings: { + | timestampColumn: { + | def: "time" + | format: "epoch" + | } + | } + |} + |features: [ { + | key: [memberId, bucket_time] + | featureList: ["totalInvitesSentIn1Hour"] + | } + |] """.stripMargin, - featureDefAsString = """ + featureDefAsString = + """ |sources: { - | ptSource: { - | type: "PASSTHROUGH" - | } - | swaSource: { - | location: { path: "slidingWindowAgg/localSWAAnchorTestFeatureData/daily" } - | timePartitionPattern: "yyyy/MM/dd" - | timeWindowParameters: { - | timestampColumn: "timestamp" - | timestampColumnFormat: "yyyy-MM-dd" - | } + | Metrics-InviteSends: { + | location: {path: "slidingWindowAgg/localAnchorTestObsData.avro.json" } + | timeWindowParameters: { + | // resumeUploadTime is the field name after any time-based keyExtractor is applied. + | timestamp: header.time + | timestamp_format: "epoch" | } |} | + |Training-Data: { + | type: "PASSTHROUGH" + |} + |} |anchors: { - | ptAnchor: { - | source: "ptSource" - | key: "x" - | features: { - | f1f1: { - | def: "([$.term:$.value] in passthroughFeatures if $.name == 'f1f1')" - | } - | } - | } - | swaAnchor: { - | source: "swaSource" - | key: "substring(x, 0)" - | lateralViewParameters: { - | lateralViewDef: explode(features) - | lateralViewItemAlias: feature - | } - | features: { - | f1: { - | def: "feature.col.value" - | filter: "feature.col.name = 'f1'" - | aggregation: SUM - | groupBy: "feature.col.term" - | window: 3d - | } - | } - | } - | - | swaAnchor2: { - | source: "swaSource" - | key: "x" - | lateralViewParameters: { - | lateralViewDef: explode(features) - | lateralViewItemAlias: feature - | } + | invitationtimebased: { + | source: Metrics-InviteSends + | // keyExtractor is really more of a pre-processor that performs feature extraction for Sliding Window Aggregation + | key: [header.memberId, header.bucket_time] | features: { - | f1Sum: { - | def: "feature.col.value" - | filter: "feature.col.name = 'f1'" - | aggregation: SUM - | groupBy: "feature.col.term" - | window: 3d + | totalInvitesSentIn1Hour: { + | def: header.memberId + | type: NUMERIC + | default: 0.0 + | aggregation: COUNT + | window: 30s | } | } | } - | swaAnchorWithKeyExtractor: { - | source: "swaSource" - | keyExtractor: "com.linkedin.feathr.offline.anchored.keyExtractor.SimpleSampleKeyExtractor" + | passthroughAnchor: { + | source: Training-Data + | key: "memberId" | features: { - | f3: { - | def: "aggregationWindow" - | aggregation: SUM - | window: 3d - | } - | } - | } - | swaAnchorWithKeyExtractor2: { - | source: "swaSource" - | keyExtractor: "com.linkedin.feathr.offline.anchored.keyExtractor.SimpleSampleKeyExtractor" - | features: { - | f4: { - | def: "aggregationWindow" - | aggregation: SUM - | window: 3d - | } - | } - | } - | swaAnchorWithKeyExtractor3: { - | source: "swaSource" - | keyExtractor: "com.linkedin.feathr.offline.anchored.keyExtractor.SimpleSampleKeyExtractor2" - | lateralViewParameters: { - | lateralViewDef: explode(features) - | lateralViewItemAlias: feature - | } - | features: { - | f2: { - | def: "feature.col.value" - | filter: "feature.col.name = 'f2'" - | aggregation: SUM - | groupBy: "feature.col.term" - | window: 3d + | cv_invitation_scorer_total_invitations_sent_to_members_24_one_hour: { + | def: "cv_invitation_scorer_total_invitations_sent_to_members_24_one_hour" + | type: DENSE_VECTOR | } | } | } |} """.stripMargin, - "slidingWindowAgg/localAnchorTestObsData.avro.json").data + "slidingWindowAgg/test.avro.json").data df.show() - // validate output in name term value format - val featureList = df.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("x") else "null") - val row0 = featureList(0) - val row0f1 = row0.getAs[Row]("f1") - assertEquals(row0f1, TestUtils.build1dSparseTensorFDSRow(Array("f1t1", "f1t2"), Array(2.0f, 3.0f))) - val row0f2 = row0.getAs[Row]("f2") - assertEquals(row0f2, TestUtils.build1dSparseTensorFDSRow(Array("f2t1"), Array(4.0f))) - val row0f1f1 = row0.getAs[Row]("f1f1") - assertEquals(row0f1f1, TestUtils.build1dSparseTensorFDSRow(Array("f1t1"), Array(12.0f))) + // val featureList = df.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("memberId") else "null") + // val row0 = featureList(0) + // val row0f1 = row0.getAs[Row]("f1") + // assertEquals(row0f1, TestUtils.build1dSparseTensorFDSRow(Array("f1t1", "f1t2"), Array(2.0f, 3.0f))) + // val row0f2 = row0.getAs[Row]("f2") + // assertEquals(row0f2, TestUtils.build1dSparseTensorFDSRow(Array("f2t1"), Array(4.0f))) + // val row0f1f1 = row0.getAs[Row]("f1f1") + // assertEquals(row0f1f1, TestUtils.build1dSparseTensorFDSRow(Array("f1t1"), Array(12.0f))) - val row1 = featureList(1) - val row1f1 = row1.getAs[Row]("f1") - assertEquals(row1f1, TestUtils.build1dSparseTensorFDSRow(Array("f1t1", "f1t2"), Array(5.0f, 6.0f))) - val row1f2 = row1.getAs[Row]("f2") - assertEquals(row1f2, TestUtils.build1dSparseTensorFDSRow(Array("f2t1"), Array(7.0f))) - val row1f1f1 = row1.getAs[Row]("f1f1") - assertEquals(row1f1f1, TestUtils.build1dSparseTensorFDSRow(Array("f1t1"), Array(12.0f))) + // val row1 = featureList(1) + // val row1f1 = row1.getAs[Row]("f1") + // assertEquals(row1f1, TestUtils.build1dSparseTensorFDSRow(Array("f1t1", "f1t2"), Array(5.0f, 6.0f))) + // val row1f2 = row1.getAs[Row]("f2") + // assertEquals(row1f2, TestUtils.build1dSparseTensorFDSRow(Array("f2t1"), Array(7.0f))) + // val row1f1f1 = row1.getAs[Row]("f1f1") + // assertEquals(row1f1f1, TestUtils.build1dSparseTensorFDSRow(Array("f1t1"), Array(12.0f))) } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/anchored/keyExtractor/NebulaCounterKeyExtractor.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/anchored/keyExtractor/NebulaCounterKeyExtractor.scala new file mode 100644 index 000000000..4f57cc297 --- /dev/null +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/anchored/keyExtractor/NebulaCounterKeyExtractor.scala @@ -0,0 +1,34 @@ +package com.linkedin.feathr.offline.anchored.keyExtractor + +import com.linkedin.feathr.sparkcommon.SourceKeyExtractor +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.DataFrame +/** + * This Spark job pulls online counters and features, computes derived features and labels the data to train the xgboost model in invitation scorer for ATO detection. + */ +class NebulaCounterKeyExtractor extends SourceKeyExtractor { + + override def getKeyColumnNames(datum: Option[Any]): Seq[String] = Seq("memberId") + + // This extractor is used as a pre-processor. Columns not present in the resulting DF will be unavailable to FRAME features + override def appendKeyColumns(dataFrame: DataFrame): DataFrame = { + import dataFrame.sparkSession.implicits._ + // dataFrame.withColumn("roundedSentTimeToHr", from_unixtime($"send_time"/1000, "yyyy-MM-dd-HH")) + // .withColumn("roundedMin", (from_unixtime($"send_time"/1000, "mm")/5).cast("integer")*5) + // .withColumn("bucketizedSentTimeToFiveMin", + // concat($"roundedSentTimeToHr", when($"roundedMin">=10, lit("-")).otherwise(lit("-0")), $"roundedMin")) + // .select($"sender_id" as "memberId", + // unix_timestamp(col("bucketizedSentTimeToFiveMin"),"yyyy-MM-dd-HH-mm").as("unixBucket"), + // $"send_time" as "time") + + val df = dataFrame.withColumn("roundedSentTimeToHr", from_unixtime($"header.time"/1000, "yyyy-MM-dd-HH")) + .withColumn("roundedMin", (from_unixtime($"header.time"/1000, "mm")/5).cast("integer")*5) + .withColumn("bucketizedSentTimeToFiveMin", + concat($"roundedSentTimeToHr", when($"roundedMin">=10, lit("-")).otherwise(lit("-0")), $"roundedMin")) + .select($"header.memberid" as "memberId", + unix_timestamp(col("bucketizedSentTimeToFiveMin"),"yyyy-MM-dd-HH-mm").as("unixBucket"), + $"header.time" as "time") + df.show() + df + } +} diff --git a/gradle.properties b/gradle.properties index 620ac933f..2611072c7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=1.0.1-rc1 +version=1.0.1-rc2 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12 From e832eefbf43e8705f7edbf1450f9e291c0c9725f Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Thu, 30 Mar 2023 21:37:27 -0700 Subject: [PATCH 2/2] gaps filled --- .../localAnchorTestObsData.avro.json | 68 +++++++++++++++++-- .../resources/slidingWindowAgg/test.avro.json | 59 ++++++++++++++-- .../offline/SlidingWindowAggIntegTest.scala | 43 +++++++++--- 3 files changed, 146 insertions(+), 24 deletions(-) diff --git a/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData.avro.json b/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData.avro.json index c2b5d4452..820e5e41c 100644 --- a/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData.avro.json +++ b/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData.avro.json @@ -35,7 +35,7 @@ "header" : { "memberId" : 1561, "time" : 1613361679, - "bucket_time": 1613361660 + "bucket_time": 1613361600 } }, { @@ -48,18 +48,72 @@ { "header" : { "memberId" : 1561, - "time" : 1613361639, - "bucket_time": 1613361630 + "time" : 1613361643, + "bucket_time": 1613361600 } }, { "header" : { "memberId" : 1561, - "time" : 1613361643, - "bucket_time": 1613361630 - + "time" : 1613361650, + "bucket_time": 1613361600 + } + }, + { + "header" : { + "memberId" : 1561, + "time" : 1613361249, + "bucket_time": 1613361000 + } + }, + { + "header" : { + "memberId" : 1561, + "time" : 1613358000, + "bucket_time": 1613358000 + } + }, + { + "header" : { + "memberId" : 1561, + "time" : 1613358300, + "bucket_time": 1613358300 + } + }, + { + "header" : { + "memberId" : 1561, + "time" : 1613361248, + "bucket_time": 1613361000 + } + }, + { + "header" : { + "memberId" : 1561, + "time" : 1613361049, + "bucket_time": 1613361000 + } + }, + { + "header" : { + "memberId" : 1561, + "time" : 1613358660, + "bucket_time": 1613358600 + } + }, + { + "header": { + "memberId": 1561, + "time": 1613358900, + "bucket_time": 1613358900 + } + }, + { + "header": { + "memberId": 1561, + "time": 1613362200, + "bucket_time": 1613362200 } } ] - } \ No newline at end of file diff --git a/feathr-impl/src/test/resources/slidingWindowAgg/test.avro.json b/feathr-impl/src/test/resources/slidingWindowAgg/test.avro.json index 35a9d1d91..1388b8299 100644 --- a/feathr-impl/src/test/resources/slidingWindowAgg/test.avro.json +++ b/feathr-impl/src/test/resources/slidingWindowAgg/test.avro.json @@ -49,7 +49,7 @@ { "memberId": 1561, "time": 1613361699, - "bucket_time": 1613361690, + "bucket_time": 1613361600, "passthroughFeatures": [ { "name": "f1f1", @@ -61,7 +61,7 @@ { "memberId": 2, "time": 1613361670, - "bucket_time": 1613361660, + "bucket_time": 1613361600, "passthroughFeatures": [ { "name": "f1f1", @@ -73,7 +73,7 @@ { "memberId": 1561, "time": 1613361650, - "bucket_time": 1613361630, + "bucket_time": 1613361600, "passthroughFeatures": [ { "name": "f1f1", @@ -85,7 +85,7 @@ { "memberId": 1561, "time": 1613361649, - "bucket_time": 1613361630, + "bucket_time": 1613361600, "passthroughFeatures": [ { "name": "f1f1", @@ -97,7 +97,55 @@ { "memberId": 1561, "time": 1613361655, - "bucket_time": 1613361630, + "bucket_time": 1613361600, + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "memberId": 1561, + "time": 1613361610, + "bucket_time": 1613361600, + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "memberId": 1561, + "time": 1613361249, + "bucket_time": 1613361000, + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "memberId": 1561, + "time": 1613361180, + "bucket_time": 1613361000, + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "memberId": 1561, + "time": 1613362200, + "bucket_time": 1613362200, "passthroughFeatures": [ { "name": "f1f1", @@ -106,6 +154,5 @@ } ] } - ] } \ No newline at end of file diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 92a687fde..45699bb47 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -27,15 +27,20 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { |settings: { | joinTimeSettings: { | timestampColumn: { - | def: "time" + | def: time | format: "epoch" | } | } |} |features: [ { - | key: [memberId, bucket_time] + | key: [memberId] | featureList: ["totalInvitesSentIn1Hour"] | } + | + | { + | key: [memberId, "bucket_time - 3600"] + | featureList: ["totalInvitesSentIn1HourMinus", "finalFeature"] + | } |] """.stripMargin, featureDefAsString = @@ -45,7 +50,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { | location: {path: "slidingWindowAgg/localAnchorTestObsData.avro.json" } | timeWindowParameters: { | // resumeUploadTime is the field name after any time-based keyExtractor is applied. - | timestamp: header.time + | timestamp: "header.time" | timestamp_format: "epoch" | } |} @@ -58,28 +63,44 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { | invitationtimebased: { | source: Metrics-InviteSends | // keyExtractor is really more of a pre-processor that performs feature extraction for Sliding Window Aggregation - | key: [header.memberId, header.bucket_time] + | key: [header.memberId] | features: { | totalInvitesSentIn1Hour: { | def: header.memberId | type: NUMERIC | default: 0.0 | aggregation: COUNT - | window: 30s + | window: 3600s | } | } | } - | passthroughAnchor: { - | source: Training-Data - | key: "memberId" + | + | invitationtimebased2: { + | source: Metrics-InviteSends + | // keyExtractor is really more of a pre-processor that performs feature extraction for Sliding Window Aggregation + | key: [header.memberId, "header.bucket_time"] | features: { - | cv_invitation_scorer_total_invitations_sent_to_members_24_one_hour: { - | def: "cv_invitation_scorer_total_invitations_sent_to_members_24_one_hour" - | type: DENSE_VECTOR + | totalInvitesSentIn1HourMinus: { + | def: header.memberId + | type: NUMERIC + | default: 0.0 + | aggregation: COUNT + | window: 3600s | } | } | } |} + | + |derivations: { + | finalFeature: { + | key: [header.memberId, "header.bucket_time"] + | inputs: { + | a1: { key: [header.memberId], feature: totalInvitesSentIn1Hour } + | a2: { key: [header.memberId, header.bucket_time], feature: totalInvitesSentIn1HourMinus } + | } + | definition.sqlExpr: "a1 - a2" + | } + |} """.stripMargin, "slidingWindowAgg/test.avro.json").data df.show()