Skip to content

Commit 4d9e439

Browse files
rolincovarhajek
authored andcommitted
Add support for default tags (#50) (#51)
1 parent 73a2141 commit 4d9e439

File tree

5 files changed

+225
-7
lines changed

5 files changed

+225
-7
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.3.0 [unreleased]
22

3+
### Features
4+
1. [#49](https://github.com/influxdata/influxdb-client-python/issues/50): Implemented default tags
5+
36
### API
47
1. [#47](https://github.com/influxdata/influxdb-client-python/pull/47): Updated swagger to latest version
58

README.rst

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,34 @@ The batching is configurable by ``write_options``\ :
252252
253253
.. marker-batching-end
254254
255+
Default Tags
256+
""""""""""""
257+
.. marker-default-tags-start
258+
259+
Sometimes is useful to store same information in every measurement e.g. ``hostname``, ``location``, ``customer``.
260+
The client is able to use static value or env property as a tag value.
261+
262+
The expressions:
263+
264+
- ``California Miner`` - static value
265+
- ``${env.hostname}`` - environment property
266+
267+
.. code-block:: python
268+
269+
point_settings = PointSettings()
270+
point_settings.add_default_tag("id", "132-987-655")
271+
point_settings.add_default_tag("customer", "California Miner")
272+
point_settings.add_default_tag("data_center", "${env.data_center}")
273+
274+
self.write_client = self.client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings)
275+
276+
.. code-block:: python
277+
278+
self.write_client = self.client.write_api(write_options=SYNCHRONOUS,
279+
point_settings=PointSettings(**{"id": "132-987-655",
280+
"customer": "California Miner"}))
281+
.. marker-default-tags-end
282+
255283
Asynchronous client
256284
"""""""""""""""""""
257285

influxdb_client/client/influxdb_client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from influxdb_client.client.query_api import QueryApi
1010
from influxdb_client.client.tasks_api import TasksApi
1111
from influxdb_client.client.users_api import UsersApi
12-
from influxdb_client.client.write_api import WriteApi, WriteOptions
12+
from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings
1313

1414

1515
class InfluxDBClient(object):
@@ -45,14 +45,15 @@ def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org
4545
self.api_client = ApiClient(configuration=conf, header_name=auth_header_name,
4646
header_value=auth_header_value)
4747

48-
def write_api(self, write_options=WriteOptions()) -> WriteApi:
48+
def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi:
4949
"""
5050
Creates a Write API instance
5151
52+
:param point_settings:
5253
:param write_options: write api configuration
5354
:return: write api instance
5455
"""
55-
return WriteApi(influxdb_client=self, write_options=write_options)
56+
return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings)
5657

5758
def query_api(self) -> QueryApi:
5859
"""

influxdb_client/client/write_api.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from time import sleep
77
from typing import Union, List
88

9+
import os
10+
911
import rx
1012
from rx import operators as ops, Observable
1113
from rx.scheduler import ThreadPoolScheduler
@@ -55,6 +57,33 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5557
ASYNCHRONOUS = WriteOptions(write_type=WriteType.asynchronous)
5658

5759

60+
class PointSettings(object):
61+
62+
def __init__(self, **default_tags) -> None:
63+
64+
"""
65+
Creates point settings for write api.
66+
67+
:param default_tags: Default tags which will be added to each point written by api.
68+
"""
69+
70+
self.defaultTags = dict()
71+
72+
for key, val in default_tags.items():
73+
self.add_default_tag(key, val)
74+
75+
@staticmethod
76+
def _get_value(value):
77+
78+
if value.startswith("${env."):
79+
return os.environ.get(value[6:-1])
80+
81+
return value
82+
83+
def add_default_tag(self, key, value) -> None:
84+
self.defaultTags[key] = self._get_value(value)
85+
86+
5887
class _BatchItem(object):
5988
def __init__(self, key, data, size=1) -> None:
6089
self.key = key
@@ -103,10 +132,12 @@ def _body_reduce(batch_items):
103132

104133
class WriteApi(AbstractClient):
105134

106-
def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()) -> None:
135+
def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions(),
136+
point_settings: PointSettings = PointSettings()) -> None:
107137
self._influxdb_client = influxdb_client
108138
self._write_service = WriteService(influxdb_client.api_client)
109139
self._write_options = write_options
140+
self._point_settings = point_settings
110141
if self._write_options.write_type is WriteType.batching:
111142
# Define Subject that listen incoming data and produces writes into InfluxDB
112143
self._subject = Subject()
@@ -153,6 +184,17 @@ def write(self, bucket: str, org: str = None,
153184
if self._write_options.write_type is WriteType.batching:
154185
return self._write_batching(bucket, org, record, write_precision)
155186

187+
if self._point_settings.defaultTags and record:
188+
for key, val in self._point_settings.defaultTags.items():
189+
if isinstance(record, dict):
190+
record.get("tags")[key] = val
191+
else:
192+
for r in record:
193+
if isinstance(r, dict):
194+
r.get("tags")[key] = val
195+
elif isinstance(r, Point):
196+
r.tag(key, val)
197+
156198
final_string = self._serialize(record, write_precision)
157199

158200
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False

tests/test_WriteApi.py

Lines changed: 147 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
from __future__ import absolute_import
44

55
import datetime
6+
import os
67
import unittest
78
import time
89
from multiprocessing.pool import ApplyResult
910

1011
from influxdb_client import Point, WritePrecision
11-
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
12+
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, PointSettings
1213
from influxdb_client.rest import ApiException
1314
from tests.base_test import BaseTest
1415

@@ -17,7 +18,18 @@ class SynchronousWriteTest(BaseTest):
1718

1819
def setUp(self) -> None:
1920
super().setUp()
20-
self.write_client = self.client.write_api(write_options=SYNCHRONOUS)
21+
22+
os.environ['data_center'] = "LA"
23+
24+
self.id_tag = "132-987-655"
25+
self.customer_tag = "California Miner"
26+
self.data_center_key = "data_center"
27+
28+
self.write_client = self.client.write_api(write_options=SYNCHRONOUS,
29+
point_settings=PointSettings(**{"id": self.id_tag,
30+
"customer": self.customer_tag,
31+
self.data_center_key:
32+
'${env.data_center}'}))
2133

2234
def tearDown(self) -> None:
2335
self.write_client.__del__()
@@ -97,6 +109,7 @@ def test_write_points_unicode(self):
97109
p.field(field_name, utf8_val)
98110
p.tag(tag, tag_value)
99111
record_list = [p]
112+
print(record_list)
100113

101114
self.write_client.write(bucket.name, self.org, record_list)
102115

@@ -105,10 +118,55 @@ def test_write_points_unicode(self):
105118
self.assertEqual(1, len(flux_result))
106119
rec = flux_result[0].records[0]
107120

121+
self.assertEqual(self.id_tag, rec["id"])
122+
self.assertEqual(self.customer_tag, rec["customer"])
123+
self.assertEqual("LA", rec[self.data_center_key])
124+
108125
self.assertEqual(measurement, rec.get_measurement())
109126
self.assertEqual(utf8_val, rec.get_value())
110127
self.assertEqual(field_name, rec.get_field())
111128

129+
def test_write_using_default_tags(self):
130+
bucket = self.create_test_bucket()
131+
132+
measurement = "h2o_feet"
133+
field_name = "water_level"
134+
val = "1.0"
135+
val2 = "2.0"
136+
tag = "location"
137+
tag_value = "creek level"
138+
139+
p = Point(measurement)
140+
p.field(field_name, val)
141+
p.tag(tag, tag_value)
142+
p.time(1)
143+
144+
p2 = Point(measurement)
145+
p2.field(field_name, val2)
146+
p2.tag(tag, tag_value)
147+
p2.time(2)
148+
149+
record_list = [p, p2]
150+
print(record_list)
151+
152+
self.write_client.write(bucket.name, self.org, record_list)
153+
154+
query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
155+
flux_result = self.client.query_api().query(query)
156+
self.assertEqual(1, len(flux_result))
157+
rec = flux_result[0].records[0]
158+
rec2 = flux_result[0].records[1]
159+
160+
self.assertEqual(self.id_tag, rec["id"])
161+
self.assertEqual(self.customer_tag, rec["customer"])
162+
self.assertEqual("LA", rec[self.data_center_key])
163+
164+
self.assertEqual(self.id_tag, rec2["id"])
165+
self.assertEqual(self.customer_tag, rec2["customer"])
166+
self.assertEqual("LA", rec2[self.data_center_key])
167+
168+
self.delete_test_bucket(bucket)
169+
112170
def test_write_result(self):
113171
_bucket = self.create_test_bucket()
114172

@@ -205,7 +263,18 @@ class AsynchronousWriteTest(BaseTest):
205263

206264
def setUp(self) -> None:
207265
super().setUp()
208-
self.write_client = self.client.write_api(write_options=ASYNCHRONOUS)
266+
267+
os.environ['data_center'] = "LA"
268+
269+
self.id_tag = "132-987-655"
270+
self.customer_tag = "California Miner"
271+
self.data_center_key = "data_center"
272+
273+
self.write_client = self.client.write_api(write_options=ASYNCHRONOUS,
274+
point_settings=PointSettings(**{"id": self.id_tag,
275+
"customer": self.customer_tag,
276+
self.data_center_key:
277+
'${env.data_center}'}))
209278

210279
def tearDown(self) -> None:
211280
self.write_client.__del__()
@@ -261,6 +330,43 @@ def test_write_dictionaries(self):
261330

262331
self.delete_test_bucket(bucket)
263332

333+
def test_use_default_tags_with_dictionaries(self):
334+
bucket = self.create_test_bucket()
335+
336+
_point1 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
337+
"time": "2009-11-10T22:00:00Z", "fields": {"water_level": 1.0}}
338+
_point2 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
339+
"time": "2009-11-10T23:00:00Z", "fields": {"water_level": 2.0}}
340+
341+
_point_list = [_point1, _point2]
342+
343+
self.write_client.write(bucket.name, self.org, _point_list)
344+
time.sleep(1)
345+
346+
query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
347+
print(query)
348+
349+
flux_result = self.client.query_api().query(query)
350+
351+
self.assertEqual(1, len(flux_result))
352+
353+
records = flux_result[0].records
354+
355+
self.assertEqual(2, len(records))
356+
357+
rec = records[0]
358+
rec2 = records[1]
359+
360+
self.assertEqual(self.id_tag, rec["id"])
361+
self.assertEqual(self.customer_tag, rec["customer"])
362+
self.assertEqual("LA", rec[self.data_center_key])
363+
364+
self.assertEqual(self.id_tag, rec2["id"])
365+
self.assertEqual(self.customer_tag, rec2["customer"])
366+
self.assertEqual("LA", rec2[self.data_center_key])
367+
368+
self.delete_test_bucket(bucket)
369+
264370
def test_write_bytes(self):
265371
bucket = self.create_test_bucket()
266372

@@ -300,5 +406,43 @@ def test_write_bytes(self):
300406
self.delete_test_bucket(bucket)
301407

302408

409+
class PointSettingTest(BaseTest):
410+
411+
def setUp(self) -> None:
412+
super().setUp()
413+
self.id_tag = "132-987-655"
414+
self.customer_tag = "California Miner"
415+
416+
def tearDown(self) -> None:
417+
self.write_client.__del__()
418+
super().tearDown()
419+
420+
def test_point_settings(self):
421+
self.write_client = self.client.write_api(write_options=SYNCHRONOUS,
422+
point_settings=PointSettings(**{"id": self.id_tag,
423+
"customer": self.customer_tag}))
424+
425+
default_tags = self.write_client._point_settings.defaultTags
426+
427+
self.assertEqual(self.id_tag, default_tags.get("id"))
428+
self.assertEqual(self.customer_tag, default_tags.get("customer"))
429+
430+
def test_point_settings_with_add(self):
431+
os.environ['data_center'] = "LA"
432+
433+
point_settings = PointSettings()
434+
point_settings.add_default_tag("id", self.id_tag)
435+
point_settings.add_default_tag("customer", self.customer_tag)
436+
point_settings.add_default_tag("data_center", "${env.data_center}")
437+
438+
self.write_client = self.client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings)
439+
440+
default_tags = self.write_client._point_settings.defaultTags
441+
442+
self.assertEqual(self.id_tag, default_tags.get("id"))
443+
self.assertEqual(self.customer_tag, default_tags.get("customer"))
444+
self.assertEqual("LA", default_tags.get("data_center"))
445+
446+
303447
if __name__ == '__main__':
304448
unittest.main()

0 commit comments

Comments
 (0)