|
14 | 14 | }, |
15 | 15 | "name": "AzureDataLakeSource", |
16 | 16 | "description": "Source dataset in Azure Data Lake" |
| 17 | + }, |
| 18 | + { |
| 19 | + "linkedService": { |
| 20 | + "referenceName": "AzureDataExplorer", |
| 21 | + "type": "LinkedServiceReference" |
| 22 | + }, |
| 23 | + "name": "KustoSinkReference", |
| 24 | + "description": "Kusto Sink Reference Dataset" |
17 | 25 | } |
18 | 26 | ], |
19 | 27 | "sinks": [ |
|
30 | 38 | } |
31 | 39 | } |
32 | 40 | ], |
33 | | - "transformations": [], |
| 41 | + "transformations": [ |
| 42 | + { |
| 43 | + "name": "ExistingHashes" |
| 44 | + } |
| 45 | + ], |
34 | 46 | "scriptLines": [ |
35 | 47 | "parameters{", |
36 | 48 | " sourceFileSystem as string,", |
|
53 | 65 | " DeviceVendor as string,", |
54 | 66 | " HappinessScore as float,", |
55 | 67 | " UserId as long,", |
56 | | - " EndTime as timestamp", |
| 68 | + " EndTime as timestamp,", |
| 69 | + " Hash as string", |
57 | 70 | " ),", |
58 | 71 | " allowSchemaDrift: true,", |
59 | 72 | " validateSchema: false,", |
60 | 73 | " ignoreNoFilesFound: false,", |
61 | 74 | " format: 'delta',", |
62 | 75 | " fileSystem: ($sourceFileSystem),", |
63 | 76 | " folderPath: ($sourceFolderPath)) ~> AzureDataLakeSource", |
64 | | - "AzureDataLakeSource sink(allowSchemaDrift: true,", |
| 77 | + "source(output(", |
| 78 | + " Country as string,", |
| 79 | + " Isp as string,", |
| 80 | + " CdnNodeHost as string,", |
| 81 | + " Type as string,", |
| 82 | + " Title as string,", |
| 83 | + " SelectedQuality as string,", |
| 84 | + " DeviceType as string,", |
| 85 | + " Version as string,", |
| 86 | + " Connection as string,", |
| 87 | + " CommercilizationType as string,", |
| 88 | + " DeviceVendor as string,", |
| 89 | + " HappinessScore as double,", |
| 90 | + " UserId as string,", |
| 91 | + " EndTime as timestamp,", |
| 92 | + " Hash as string", |
| 93 | + " ),", |
| 94 | + " allowSchemaDrift: true,", |
| 95 | + " validateSchema: false,", |
| 96 | + " format: 'table',", |
| 97 | + " tableName: ($sinkTable),", |
| 98 | + " store: 'azuredataexplorer') ~> KustoSinkReference", |
| 99 | + "AzureDataLakeSource, KustoSinkReference exists(AzureDataLakeSource@Hash == KustoSinkReference@Hash,", |
| 100 | + " negate:true,", |
| 101 | + " broadcast: 'auto')~> ExistingHashes", |
| 102 | + "ExistingHashes sink(allowSchemaDrift: true,", |
65 | 103 | " validateSchema: false,", |
66 | 104 | " format: 'table',", |
67 | 105 | " tableName: ($sinkTable),", |
68 | 106 | " store: 'azuredataexplorer',", |
| 107 | + " postSQLs:[(concat('.delete table ', $sinkTable, ' records with (whatif=false) <| ', $sinkTable, ' | sort by Hash, ingestion_time() desc | where row_cumsum(1,prev(Hash) != Hash) > 1'))],", |
69 | 108 | " skipDuplicateMapInputs: true,", |
70 | 109 | " skipDuplicateMapOutputs: true,", |
71 | 110 | " outputAssertFailedRows: true,", |
|
0 commit comments