diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala index d7cf748fe..2bb7bccce 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala @@ -83,7 +83,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: def joinFeaturesWithSuppressedExceptions(joinConfig: FeatureJoinConfig, obsData: SparkFeaturizedDataset, jobContext: JoinJobContext = JoinJobContext()): (SparkFeaturizedDataset, Map[String, String]) = { (joinFeatures(joinConfig, obsData, jobContext), Map(SuppressedExceptionHandlerUtils.MISSING_DATA_EXCEPTION - -> SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs)) + -> SuppressedExceptionHandlerUtils.missingFeatures.mkString)) } /** @@ -231,7 +231,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: val (joinedDF, header) = doJoinObsAndFeatures(joinConfig, jobContext, obsData) (joinedDF, header, Map(SuppressedExceptionHandlerUtils.MISSING_DATA_EXCEPTION - -> SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs)) + -> SuppressedExceptionHandlerUtils.missingFeatures.mkString)) } /** diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala index 4d9f45af5..61bc48c4a 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/DerivedFeatureEvaluator.scala @@ -10,7 +10,7 @@ import com.linkedin.feathr.offline.join.algorithms.{SequentialJoinConditionBuild import com.linkedin.feathr.offline.logical.FeatureGroups import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataPathHandler -import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils +import com.linkedin.feathr.offline.util.{FeaturizedDatasetUtils, SuppressedExceptionHandlerUtils} import com.linkedin.feathr.offline.{ErasedEntityTaggedFeature, FeatureDataFrame} import com.linkedin.feathr.sparkcommon.FeatureDerivationFunctionSpark import com.linkedin.feathr.{common, offline} @@ -37,6 +37,9 @@ private[offline] class DerivedFeatureEvaluator(derivationStrategies: DerivationS def evaluate(keyTag: Seq[Int], keyTagList: Seq[String], contextDF: DataFrame, derivedFeature: DerivedFeature): FeatureDataFrame = { val tags = Some(keyTag.map(keyTagList).toList) val producedFeatureColName = DataFrameColName.genFeatureColumnName(derivedFeature.producedFeatureNames.head, tags) + if (derivedFeature.consumedFeatureNames.exists(x => SuppressedExceptionHandlerUtils.missingFeatures.contains(x.getFeatureName))) { + SuppressedExceptionHandlerUtils.missingFeatures.add(derivedFeature.producedFeatureNames.head) + } derivedFeature.derivation match { case g: SeqJoinDerivationFunction => diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala index d35fc5486..02b14820f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala @@ -7,6 +7,7 @@ import com.linkedin.feathr.common.{FeatureAggregationType, FeatureValue} import com.linkedin.feathr.offline.PostTransformationUtil import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource import com.linkedin.feathr.offline.client.DataFrameColName +import com.linkedin.feathr.offline.client.DataFrameColName.genFeatureColumnName import com.linkedin.feathr.offline.derived.DerivedFeature import com.linkedin.feathr.offline.derived.functions.SeqJoinDerivationFunction import com.linkedin.feathr.offline.derived.strategies.SequentialJoinAsDerivation._ @@ -18,7 +19,7 @@ import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataPathHandler import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults import com.linkedin.feathr.offline.transformation.{AnchorToDataSourceMapper, MvelDefinition} -import com.linkedin.feathr.offline.util.{CoercionUtilsScala, DataFrameSplitterMerger, FeathrUtils, FeaturizedDatasetUtils} +import com.linkedin.feathr.offline.util.{CoercionUtilsScala, DataFrameSplitterMerger, FeathrUtils, FeaturizedDatasetUtils, SuppressedExceptionHandlerUtils} import com.linkedin.feathr.sparkcommon.{ComplexAggregation, SeqJoinCustomAggregation} import org.apache.logging.log4j.LogManager import org.apache.spark.sql.functions._ @@ -52,6 +53,22 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession, val seqJoinDerivationFunction = derivationFunction val baseFeatureName = seqJoinDerivationFunction.left.feature val expansionFeatureName = seqJoinDerivationFunction.right.feature + val shouldAddDefault = FeathrUtils.getFeathrJobParam(ss, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean + + // If expansion feature is missing because of data issues, then we just copy the expansion feature column to be the final feature value. + if (shouldAddDefault && SuppressedExceptionHandlerUtils.missingFeatures.contains(expansionFeatureName)) { + val seqJoinedFeatureResult = df.withColumn(genFeatureColumnName(FEATURE_NAME_PREFIX + derivedFeature.producedFeatureNames.head), + col(genFeatureColumnName(FEATURE_NAME_PREFIX + expansionFeatureName))) + + // Add the additional column with the keytags to mimic the exact behavior of the seq join flow, this will get dropped later. + val seqJoinFeatureResultWithRenamed = seqJoinedFeatureResult.withColumn(genFeatureColumnName(FEATURE_NAME_PREFIX + + derivedFeature.producedFeatureNames.head, Some(keyTags.map(keyTagList).toList)), + col(genFeatureColumnName(FEATURE_NAME_PREFIX + expansionFeatureName))) + val missingFeature = derivedFeature.producedFeatureNames.head + log.warn(s"Missing data for features ${missingFeature}. Default values will be populated for this column.") + SuppressedExceptionHandlerUtils.missingFeatures += missingFeature + return seqJoinFeatureResultWithRenamed + } val aggregationFunction = seqJoinDerivationFunction.aggregation val tagStrList = Some(keyTags.map(keyTagList).toList) val outputKey = seqJoinDerivationFunction.left.outputKey @@ -219,8 +236,6 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession, val anchorDFMap1 = anchorToDataSourceMapper.getBasicAnchorDFMapForJoin(ss, Seq(featureAnchor), failOnMissingPartition) val updatedAnchorDFMap = anchorDFMap1.filter(anchorEntry => anchorEntry._2.isDefined) .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) - // We dont need to check if the anchored feature's dataframes are missing (due to skip missing feature) as such - // seq join features have already been removed in the FeatureGroupsUpdater#getUpdatedFeatureGroupsWithoutInvalidPaths. val featureInfo = FeatureTransformation.directCalculate( anchorGroup: AnchorFeatureGroups, updatedAnchorDFMap(featureAnchor), diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala index 51d97ebe6..8c126ca81 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala @@ -17,7 +17,7 @@ import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults import com.linkedin.feathr.offline.transformation.DataFrameExt._ -import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils, FeaturizedDatasetUtils} +import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils, FeaturizedDatasetUtils, SuppressedExceptionHandlerUtils} import com.linkedin.feathr.offline.util.FeathrUtils.shouldCheckPoint import org.apache.logging.log4j.LogManager import org.apache.spark.sql.{DataFrame, SparkSession} @@ -113,10 +113,12 @@ private[offline] class AnchoredFeatureJoinStep( val containsFeature: Seq[Boolean] = anchorDFMap.map(y => y._1.selectedFeatures.contains(x)).toSeq !containsFeature.contains(true) }) + log.warn(s"Missing data for features ${missingFeatures.mkString}. Default values will be populated for this column.") + SuppressedExceptionHandlerUtils.missingFeatures ++= missingFeatures val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1)) substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan, missingAnchoredFeatures) - }else observationDF + } else observationDF val allAnchoredFeatures: Map[String, FeatureAnchorWithSource] = ctx.featureGroups.allAnchoredFeatures val joinStages = ctx.logicalPlan.joinStages diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala index 0683c3601..6e3f962d9 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala @@ -179,7 +179,7 @@ private[offline] class SlidingWindowAggregationJoiner( res.map(emptyFeatures.add) val exceptionMsg = emptyFeatures.mkString log.warn(s"Missing data for features ${emptyFeatures}. Default values will be populated for this column.") - SuppressedExceptionHandlerUtils.missingDataSuppressedExceptionMsgs += exceptionMsg + SuppressedExceptionHandlerUtils.missingFeatures ++= emptyFeatures anchors.map(anchor => (anchor, originalSourceDf)) } else { val sourceDF: DataFrame = preprocessedDf match { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index cd18e512a..eb0cc2804 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -86,7 +86,6 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH } catch { case e: Exception => if (shouldSkipFeature || shouldAddDefaultCol) None else throw e } - anchorsWithDate.map(anchor => (anchor, timeSeriesSource)) }) } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala index 95e1fe90c..d38d33b2f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/SuppressedExceptionHandlerUtils.scala @@ -1,9 +1,13 @@ package com.linkedin.feathr.offline.util +import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource + /** * Util classes and methods to handle suppressed exceptions. */ object SuppressedExceptionHandlerUtils { val MISSING_DATA_EXCEPTION = "missing_data_exception" - var missingDataSuppressedExceptionMsgs = "" + + // Set of features that may be missing because of missing data. + var missingFeatures = scala.collection.mutable.Set.empty[String] } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala index 071db5034..3d35101a1 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala @@ -6,7 +6,7 @@ import com.linkedin.feathr.offline.config.location.SimplePath import com.linkedin.feathr.offline.generation.SparkIOUtils import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager import com.linkedin.feathr.offline.source.dataloader.{AvroJsonDataLoader, CsvDataLoader} -import com.linkedin.feathr.offline.util.FeathrTestUtils +import com.linkedin.feathr.offline.util.{FeathrTestUtils, SuppressedExceptionHandlerUtils} import com.linkedin.feathr.offline.util.FeathrUtils.{ADD_DEFAULT_COL_FOR_MISSING_DATA, SKIP_MISSING_FEATURE, setFeathrJobParam} import org.apache.spark.sql.Row import org.apache.spark.sql.functions.col @@ -409,7 +409,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { | key: a_id | featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "featureWithNull3", "featureWithNull4", | "featureWithNull5", "derived_featureWithNull2", "featureWithNull6", "featureWithNull7", "derived_featureWithNull7" - | "aEmbedding", "memberEmbeddingAutoTZ", "aEmbedding", "featureWithNullSql"] + | "aEmbedding", "memberEmbeddingAutoTZ", "aEmbedding", "featureWithNullSql", "seqJoin_featureWithNull"] | } """.stripMargin, featureDefAsString = @@ -529,6 +529,14 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { | derived_featureWithNull: "featureWithNull * 2" | derived_featureWithNull2: "featureWithNull2 * 2" | derived_featureWithNull7: "featureWithNull7 * 2" + | seqJoin_featureWithNull: { + | key: x + | join: { + | base: {key: x, feature: featureWithNull2} + | expansion: {key: y, feature: featureWithNull5} + | } + | aggregation: "SUM" + | } |} """.stripMargin, observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv") @@ -550,6 +558,11 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { assertEquals(featureList(0).getAs[Row]("derived_featureWithNull2"), Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f)))) assertEquals(featureList(0).getAs[Row]("featureWithNullSql"), 1.0f) + assertEquals(featureList(0).getAs[Row]("seqJoin_featureWithNull"), + Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f)))) + assertEquals(SuppressedExceptionHandlerUtils.missingFeatures, + Set("featureWithNull", "featureWithNull3", "featureWithNull5", "featureWithNull4", "featureWithNull7", + "aEmbedding", "featureWithNull6", "derived_featureWithNull", "seqJoin_featureWithNull", "derived_featureWithNull7")) setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false") } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala index e11a8cd2a..f061029c2 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/derived/TestSequentialJoinAsDerivation.scala @@ -11,8 +11,9 @@ import com.linkedin.feathr.offline.job.FeatureTransformation.FEATURE_NAME_PREFIX import com.linkedin.feathr.offline.join.algorithms.{SeqJoinExplodedJoinKeyColumnAppender, SequentialJoinConditionBuilder, SparkJoinWithJoinCondition} import com.linkedin.feathr.offline.logical.FeatureGroups import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext +import com.linkedin.feathr.offline.util.FeathrUtils import com.linkedin.feathr.offline.{TestFeathr, TestUtils} -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.sql.functions.{when => _, _} import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession} @@ -985,6 +986,12 @@ class TestSequentialJoinAsDerivation extends TestFeathr with MockitoSugar { val mockDerivationFunction = mock[SeqJoinDerivationFunction] val mockBaseTaggedDependency = mock[BaseTaggedDependency] val mockTaggedDependency = mock[TaggedDependency] + val mockSparkContext = mock[SparkContext] + val mockSparkConf = mock[SparkConf] + when(mockSparkContext.getConf).thenReturn(mockSparkConf) + when(mockSparkSession.sparkContext).thenReturn(mockSparkContext) + when(mockSparkConf.get(s"${FeathrUtils.FEATHR_PARAMS_PREFIX}${FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA}", "false")) + .thenReturn("false") // mock derivation function when(mockDerivedFeature.derivation.asInstanceOf[SeqJoinDerivationFunction]).thenReturn(mockDerivationFunction) when(mockDerivedFeature.producedFeatureNames).thenReturn(Seq("seqJoinFeature")) @@ -1059,6 +1066,13 @@ class TestSequentialJoinAsDerivation extends TestFeathr with MockitoSugar { val mockDerivationFunction = mock[SeqJoinDerivationFunction] val mockBaseTaggedDependency = mock[BaseTaggedDependency] val mockTaggedDependency = mock[TaggedDependency] + val mockSparkConf = mock[SparkConf] + val mockSparkContext = mock[SparkContext] + when(mockSparkSession.sparkContext).thenReturn(mockSparkContext) + when(mockSparkContext.getConf).thenReturn(mockSparkConf) + when(mockSparkConf.get(s"${FeathrUtils.FEATHR_PARAMS_PREFIX}${FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA}", + "false")) + .thenReturn("false") // mock derivation function when(mockDerivedFeature.derivation.asInstanceOf[SeqJoinDerivationFunction]).thenReturn(mockDerivationFunction) when(mockDerivedFeature.producedFeatureNames).thenReturn(Seq("seqJoinFeature")) diff --git a/gradle.properties b/gradle.properties index bd4e6fb38..570596788 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=1.0.4-rc3 +version=1.0.4-rc5 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12