diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b861896..29df4801 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 1.4.0 [unreleased] + +### Features +1. [#52](https://github.com/influxdata/influxdb-client-python/issues/52): Initialize client library from config file and environmental properties + ## 1.3.0 [2020-01-17] ### Features diff --git a/README.rst b/README.rst index 7efaa295..cd38b71b 100644 --- a/README.rst +++ b/README.rst @@ -136,7 +136,50 @@ Please follow the `Installation`_ and then run the following: for cell in row: val_count += 1 + .. marker-query-end + +Client configuration +-------------------- + +Via File +^^^^^^^^ +A client can be configured via ``*.ini`` file in segment ``influx2``. + +The following options are supported: + +- ``url`` - the url to connect to InfluxDB +- ``org`` - default destination organization for writes and queries +- ``token`` - the token to use for the authorization +- ``timeout`` - socket timeout in ms (default value is 10000) + +.. code-block:: python + + self.client = InfluxDBClient.from_config_file("config.ini") + +.. code-block:: + + [influx2] + url=http://localhost:9999 + org=my-org + token=my-token + timeout=6000 + +Via Environment Properties +^^^^^^^^^^^^^^^^^^^^^^^^^^ +A client can be configured via environment properties. + +Supported properties are: + +- ``INFLUXDB_V2_URL`` - the url to connect to InfluxDB +- ``INFLUXDB_V2_ORG`` - default destination organization for writes and queries +- ``INFLUXDB_V2_TOKEN`` - the token to use for the authorization +- ``INFLUXDB_V2_TIMEOUT`` - socket timeout in ms (default value is 10000) + +.. code-block:: python + + self.client = InfluxDBClient.from_env_properties() + .. marker-index-end @@ -264,6 +307,9 @@ The expressions: - ``California Miner`` - static value - ``${env.hostname}`` - environment property +Via API +_______ + .. code-block:: python point_settings = PointSettings() @@ -278,6 +324,42 @@ The expressions: self.write_client = self.client.write_api(write_options=SYNCHRONOUS, point_settings=PointSettings(**{"id": "132-987-655", "customer": "California Miner"})) + +Via Configuration file +______________________ + +In a ini configuration file you are able to specify default tags by ``tags`` segment. + +.. code-block:: python + + self.client = InfluxDBClient.from_config_file("config.ini") + +.. code-block:: + + [influx2] + url=http://localhost:9999 + org=my-org + token=my-token + timeout=6000 + + [tags] + id = 132-987-655 + customer = California Miner + data_center = ${env.data_center} + +Via Environment Properties +__________________________ +You are able to specify default tags by environment properties with prefix ``INFLUXDB_V2_TAG_``. + +Examples: + +- ``INFLUXDB_V2_TAG_ID`` +- ``INFLUXDB_V2_TAG_HOSTNAME`` + +.. code-block:: python + + self.client = InfluxDBClient.from_env_properties() + .. marker-default-tags-end Asynchronous client diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index b0fb64c2..a8ca4eea 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -1,5 +1,8 @@ from __future__ import absolute_import +import configparser +import os + from influxdb_client import Configuration, ApiClient, HealthCheck, HealthService, Ready, ReadyService from influxdb_client.client.authorizations_api import AuthorizationsApi from influxdb_client.client.bucket_api import BucketsApi @@ -14,7 +17,8 @@ class InfluxDBClient(object): - def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org: str = None) -> None: + def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org: str = None, + default_tags: dict = None) -> None: """ :class:`influxdb_client.InfluxDBClient` is client for HTTP API defined in https://github.com/influxdata/influxdb/blob/master/http/swagger.yml. @@ -33,6 +37,8 @@ def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org self.timeout = timeout self.org = org + self.default_tags = default_tags + conf = _Configuration() conf.host = self.url conf.enable_gzip = enable_gzip @@ -45,6 +51,50 @@ def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org self.api_client = ApiClient(configuration=conf, header_name=auth_header_name, header_value=auth_header_value) + @classmethod + def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False): + config = configparser.ConfigParser() + config.read(config_file) + + url = config['influx2']['url'] + token = config['influx2']['token'] + + timeout = None + + if config.has_option('influx2', 'timeout'): + timeout = config['influx2']['timeout'] + + org = None + + if config.has_option('influx2', 'org'): + org = config['influx2']['org'] + + default_tags = None + + if config.has_section('tags'): + default_tags = dict(config.items('tags')) + + if timeout: + return cls(url, token, debug=debug, timeout=int(timeout), org=org, default_tags=default_tags, + enable_gzip=enable_gzip) + + return cls(url, token, debug=debug, org=org, default_tags=default_tags, enable_gzip=enable_gzip) + + @classmethod + def from_env_properties(cls, debug=None, enable_gzip=False): + url = os.getenv('INFLUXDB_V2_URL', "http://localhost:9999") + token = os.getenv('INFLUXDB_V2_TOKEN', "my-token") + timeout = os.getenv('INFLUXDB_V2_TIMEOUT', "10000") + org = os.getenv('INFLUXDB_V2_ORG', "my-org") + + default_tags = dict() + + for key, value in os.environ.items(): + if key.startswith("INFLUXDB_V2_TAG_"): + default_tags[key[16:].lower()] = value + + return cls(url, token, debug=debug, timeout=int(timeout), org=org, default_tags=default_tags, enable_gzip=enable_gzip) + def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi: """ Creates a Write API instance diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index b69e0646..3d621cee 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -138,6 +138,11 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions() self._write_service = WriteService(influxdb_client.api_client) self._write_options = write_options self._point_settings = point_settings + + if influxdb_client.default_tags: + for key, value in influxdb_client.default_tags.items(): + self._point_settings.add_default_tag(key, value) + if self._write_options.write_type is WriteType.batching: # Define Subject that listen incoming data and produces writes into InfluxDB self._subject = Subject() diff --git a/tests/config.ini b/tests/config.ini new file mode 100644 index 00000000..54ab60f7 --- /dev/null +++ b/tests/config.ini @@ -0,0 +1,10 @@ +[influx2] +url=http://192.168.0.2:9999 +org=my-org +token=my-token +timeout=6000 + +[tags] +id = 132-987-655 +customer = California Miner +data_center = ${env.data_center} \ No newline at end of file diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index cfdf8e65..805385a9 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -8,7 +8,7 @@ import time from multiprocessing.pool import ApplyResult -from influxdb_client import Point, WritePrecision +from influxdb_client import Point, WritePrecision, InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, PointSettings from influxdb_client.rest import ApiException from tests.base_test import BaseTest @@ -77,7 +77,6 @@ def test_write_records_list(self): self.write_client.write(bucket.name, self.org, record_list) query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)' - print(query) flux_result = self.client.query_api().query(query) @@ -109,7 +108,6 @@ def test_write_points_unicode(self): p.field(field_name, utf8_val) p.tag(tag, tag_value) record_list = [p] - print(record_list) self.write_client.write(bucket.name, self.org, record_list) @@ -147,7 +145,6 @@ def test_write_using_default_tags(self): p2.time(2) record_list = [p, p2] - print(record_list) self.write_client.write(bucket.name, self.org, record_list) @@ -304,7 +301,6 @@ def test_write_dictionaries(self): time.sleep(1) query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)' - print(query) flux_result = self.client.query_api().query(query) @@ -344,7 +340,6 @@ def test_use_default_tags_with_dictionaries(self): time.sleep(1) query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)' - print(query) flux_result = self.client.query_api().query(query) @@ -379,7 +374,6 @@ def test_write_bytes(self): time.sleep(1) query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)' - print(query) flux_result = self.client.query_api().query(query) @@ -444,5 +438,91 @@ def test_point_settings_with_add(self): self.assertEqual("LA", default_tags.get("data_center")) +class DefaultTagsConfiguration(BaseTest): + + def setUp(self) -> None: + super().setUp() + + os.environ['data_center'] = "LA" + + self.id_tag = "132-987-655" + self.customer_tag = "California Miner" + self.data_center_key = "data_center" + + os.environ['INFLUXDB_V2_TOKEN'] = "my-token" + os.environ['INFLUXDB_V2_TIMEOUT'] = "6000" + os.environ['INFLUXDB_V2_ORG'] = "my-org" + + os.environ['INFLUXDB_V2_TAG_ID'] = self.id_tag + os.environ['INFLUXDB_V2_TAG_CUSTOMER'] = self.customer_tag + os.environ['INFLUXDB_V2_TAG_DATA_CENTER'] = "${env.data_center}" + + def tearDown(self) -> None: + self.write_client.__del__() + super().tearDown() + + def test_connection_option_from_conf_file(self): + self.client.close() + self.client = InfluxDBClient.from_config_file(os.getcwd() + "/tests/config.ini", self.debug) + + self._check_connection_settings() + + def test_connection_option_from_env(self): + self.client.close() + self.client = InfluxDBClient.from_env_properties(self.debug) + + self._check_connection_settings() + + def _check_connection_settings(self): + self.write_client = self.client.write_api(write_options=SYNCHRONOUS) + + self.assertEqual(os.getenv("INFLUXDB_V2_URL"), self.client.url) + self.assertEqual("my-org", self.client.org) + self.assertEqual("my-token", self.client.token) + self.assertEqual(6000, self.client.timeout) + + def test_default_tags_from_conf_file(self): + self.client.close() + self.client = InfluxDBClient.from_config_file(os.getcwd() + "/tests/config.ini", self.debug) + + self._write_point() + + def test_default_tags_from_env(self): + self.client.close() + self.client = InfluxDBClient.from_env_properties(self.debug) + + self._write_point() + + def _write_point(self): + self.write_client = self.client.write_api(write_options=SYNCHRONOUS) + + bucket = self.create_test_bucket() + + measurement = "h2o_feet" + field_name = "water_level" + val = "1.0" + tag = "location" + tag_value = "creek level" + + p = Point(measurement) + p.field(field_name, val) + p.tag(tag, tag_value) + + record_list = [p] + + self.write_client.write(bucket.name, self.org, record_list) + + query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)' + flux_result = self.client.query_api().query(query) + self.assertEqual(1, len(flux_result)) + rec = flux_result[0].records[0] + + self.assertEqual(self.id_tag, rec["id"]) + self.assertEqual(self.customer_tag, rec["customer"]) + self.assertEqual("LA", rec[self.data_center_key]) + + self.delete_test_bucket(bucket) + + if __name__ == '__main__': unittest.main()