Skip to content

Commit e01f569

Browse files
committed
WriteMetrics takes map of existing points and doesn't write any that match. Accept start/end flags or ExtractDays config.
1 parent 41a84dd commit e01f569

File tree

2 files changed

+45
-28
lines changed

2 files changed

+45
-28
lines changed

internal/app/main.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
package app
22

33
import (
4+
"flag"
45
"fmt"
56
"log"
67
"os"
8+
"time"
79

810
"gopkg.in/yaml.v3"
911
)
1012

11-
var (
12-
startDate = "09/29/2022"
13-
endDate = "09/30/2022"
14-
)
13+
//var (
14+
// startDate = "09/29/2022"
15+
// endDate = "09/30/2022"
16+
//)
1517

1618
type InfluxDB struct {
1719
Host string
@@ -25,6 +27,7 @@ type Config struct {
2527
Password string
2628
LoginUrl string
2729
DownloadDir string
30+
ExtractDays int
2831
InfluxDB InfluxDB
2932
}
3033

@@ -41,16 +44,43 @@ func Main() {
4144
log.Fatal(err)
4245
}
4346

44-
path, err := DownloadCsv(config, startDate, endDate)
47+
// parse time flags
48+
startFlag := flag.String("start", "", "Start date of period to extract from electric co.")
49+
endFlag := flag.String("end", "", "End date of period to extract from electric co.")
50+
flag.Parse()
51+
var startDate, endDate time.Time
52+
if *startFlag != "" {
53+
startDate, err = time.Parse("2006-01-02", *startFlag)
54+
if err != nil {
55+
log.Fatal(err)
56+
}
57+
if *endFlag == "" {
58+
log.Fatal("start and end parameters must both be provided")
59+
}
60+
endDate, err = time.Parse("2006-01-02", *endFlag)
61+
if err != nil {
62+
log.Fatal(err)
63+
}
64+
} else {
65+
endDate = time.Now().Truncate(24 * time.Hour)
66+
startDate = endDate.Add(time.Duration(-config.ExtractDays) * 24 * time.Hour)
67+
}
68+
69+
path, err := DownloadCsv(config, startDate.Format("01/02/2006"), endDate.Format("01/02/2002"))
4570
if err != nil {
4671
log.Fatal(err)
4772
}
4873
fmt.Printf("file downloaded: %s", path)
49-
// parse csv!
74+
75+
// parse csv
5076
records, err := ParseCsv(path)
5177
if err != nil {
5278
log.Fatal(err)
5379
}
5480
fmt.Printf("%+v", records)
55-
WriteMetrics(records, config.InfluxDB)
81+
existingPoints, err := QueryPreviousMetrics(startDate, endDate, config.InfluxDB)
82+
if err != nil {
83+
log.Fatal(err)
84+
}
85+
WriteMetrics(records, config.InfluxDB, existingPoints)
5686
}

internal/app/metrics.go

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,20 @@ import (
1313
"github.com/influxdata/influxdb-client-go/v2/api/write"
1414
)
1515

16-
//type Metric struct {
17-
// Name string `json:"__name__"`
18-
// Db string
19-
//}
20-
2116
type MetricLine struct {
22-
//Metric Metric
23-
//Values []float64
2417
Timestamps []int64
2518
}
2619

27-
func WriteMetrics(records []*ElectricUsage, config InfluxDB) {
20+
func WriteMetrics(records []*ElectricUsage, config InfluxDB, existingPoints map[int64]struct{}) {
2821
client := influxdb2.NewClient(config.Host, config.User+":"+config.Password)
2922
writeApi := client.WriteAPIBlocking("", config.Database)
3023
points := make([]*write.Point, 0, 15*2*len(records))
3124
for _, record := range records {
3225
divisor := record.EndTime.Sub(record.StartTime).Minutes()
3326
for t := record.StartTime; record.EndTime.After(t); t = t.Add(time.Minute) {
27+
if _, ok := existingPoints[t.Unix()]; ok {
28+
continue
29+
}
3430
watts := influxdb2.NewPointWithMeasurement("electric").
3531
SetTime(t).
3632
AddField("usage", float64(record.WattHours)/divisor)
@@ -45,18 +41,9 @@ func WriteMetrics(records []*ElectricUsage, config InfluxDB) {
4541
if err != nil {
4642
log.Fatal(err)
4743
}
48-
// query VM for metrics that already exist in the range we're trying to insert?
49-
// if that's too much work, then just maintain last-inserted-time and don't insert newer
50-
log.Println(points)
5144
}
5245

53-
// the goal here is to not double write any metrics
54-
// therefore, we should simply filter inserted points by any points that
55-
// already exist.
56-
// the algorithm for that is to run this query first, making a map[time]struct{}
57-
// and discarding any point in WriteMetrics that exists in the map.
58-
59-
func QueryPreviousMetrics(startTime time.Time, endTime time.Time, config InfluxDB) map[int64]struct{} {
46+
func QueryPreviousMetrics(startTime time.Time, endTime time.Time, config InfluxDB) (map[int64]struct{}, error) {
6047
client := &http.Client{}
6148
v := url.Values{
6249
"match[]": {"sensor_temperature"},
@@ -65,13 +52,13 @@ func QueryPreviousMetrics(startTime time.Time, endTime time.Time, config InfluxD
6552
}
6653
req, err := http.NewRequest("POST", config.Host+"/api/v1/export", strings.NewReader(v.Encode()))
6754
if err != nil {
68-
log.Fatal(err)
55+
return nil, err
6956
}
7057
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
7158
req.SetBasicAuth(config.User, config.Password)
7259
resp, err := client.Do(req)
7360
if err != nil {
74-
log.Fatal(err)
61+
return nil, err
7562
}
7663
defer func() {
7764
err := resp.Body.Close()
@@ -90,5 +77,5 @@ func QueryPreviousMetrics(startTime time.Time, endTime time.Time, config InfluxD
9077
existing[ts] = struct{}{}
9178
}
9279
}
93-
return existing
80+
return existing, nil
9481
}

0 commit comments

Comments
 (0)