From 28b1fb7dfff6c05ea6cd4d55d08749538ec605fc Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Wed, 20 May 2020 11:55:31 +0200 Subject: [PATCH 1/6] feat: Optimize serializing Pandas DataFrame for writing #92 --- CHANGELOG.md | 1 + extra-requirements.txt | 1 + influxdb_client/client/write_api.py | 59 ++++++++++++++++++----------- influxdb_client/extras.py | 7 +++- tests/test_WriteApi.py | 6 +-- tests/test_WriteApiDataFrame.py | 53 ++++++++++++++++++++++++++ 6 files changed, 101 insertions(+), 26 deletions(-) create mode 100644 tests/test_WriteApiDataFrame.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5122bb1c..1ffae3ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features 1. [#79](https://github.com/influxdata/influxdb-client-python/issues/79): Added support for writing Pandas DataFrame +2. [#92](https://github.com/influxdata/influxdb-client-python/issues/92): Optimize serializing Pandas DataFrame for writing ### Bug Fixes 1. [#85](https://github.com/influxdata/influxdb-client-python/issues/85): Fixed a possibility to generate empty write batch diff --git a/extra-requirements.txt b/extra-requirements.txt index ad1a0235..620a88f8 100644 --- a/extra-requirements.txt +++ b/extra-requirements.txt @@ -1 +1,2 @@ pandas>=0.25.3 +numpy diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 4ddf778a..c5b76c6e 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -1,8 +1,11 @@ # coding: utf-8 import logging import os +import re from datetime import timedelta from enum import Enum +from functools import reduce +from itertools import chain from random import random from time import sleep from typing import Union, List @@ -14,7 +17,7 @@ from influxdb_client import WritePrecision, WriteService from influxdb_client.client.abstract_client import AbstractClient -from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION +from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, _ESCAPE_KEY from influxdb_client.rest import ApiException logger = logging.getLogger(__name__) @@ -253,10 +256,8 @@ def _serialize(self, record, write_precision, **kwargs) -> bytes: _result = self._serialize(Point.from_dict(record, write_precision=write_precision), write_precision, **kwargs) elif 'DataFrame' in type(record).__name__: - _result = self._serialize(self._data_frame_to_list_of_points(record, - precision=write_precision, **kwargs), - write_precision, - **kwargs) + _data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs) + _result = self._serialize(_data, write_precision, **kwargs) elif isinstance(record, list): _result = b'\n'.join([self._serialize(item, write_precision, @@ -297,8 +298,15 @@ def _write_batching(self, bucket, org, data, return None + def _itertuples(self, data_frame): + """Custom implementation of ``DataFrame.itertuples`` that + returns plain tuples instead of namedtuples. About 50% faster. + """ + cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))] + return zip(data_frame.index, *cols) + def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs): - from ..extras import pd + from ..extras import pd, np if not isinstance(data_frame, pd.DataFrame): raise TypeError('Must be DataFrame, but type was: {0}.' .format(type(data_frame))) @@ -314,28 +322,35 @@ def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs): if data_frame.index.tzinfo is None: data_frame.index = data_frame.index.tz_localize('UTC') - data = [] + measurement_name = kwargs.get('data_frame_measurement_name') + data_frame_tag_columns = kwargs.get('data_frame_tag_columns') + data_frame_tag_columns = set(data_frame_tag_columns or []) - for c, (row) in enumerate(data_frame.values): - point = Point(measurement_name=kwargs.get('data_frame_measurement_name')) + tags = [] + fields = [] - for count, (value) in enumerate(row): - column = data_frame.columns[count] - data_frame_tag_columns = kwargs.get('data_frame_tag_columns') - if data_frame_tag_columns and column in data_frame_tag_columns: - point.tag(column, value) - else: - point.field(column, value) + if self._point_settings.defaultTags: + for key, value in self._point_settings.defaultTags.items(): + data_frame[key] = value + data_frame_tag_columns.add(key) - point.time(data_frame.index[c], precision) + for index, (key, value) in enumerate(data_frame.dtypes.items()): + key = str(key).translate(_ESCAPE_KEY) - if self._point_settings.defaultTags: - for key, val in self._point_settings.defaultTags.items(): - point.tag(key, val) + if key in data_frame_tag_columns: + tags.append(f"{key}={{p[{index + 1}].translate(_ESCAPE_KEY)}}") + elif issubclass(value.type, np.integer): + fields.append(f"{key}={{p[{index + 1}]}}i") + elif issubclass(value.type, (np.float, np.bool_)): + fields.append(f"{key}={{p[{index + 1}]}}") + else: + fields.append(f"{key}=\"{{p[{index + 1}].translate(_ESCAPE_KEY)}}\"") - data.append(point) + fmt = (f'{measurement_name}', f'{"," if tags else ""}', ','.join(tags), + ' ', ','.join(fields), ' {p[0].value}') + f = eval("lambda p: f'{}'".format(''.join(fmt))) - return data + return list(map(f, self._itertuples(data_frame))) def _http(self, batch_item: _BatchItem): diff --git a/influxdb_client/extras.py b/influxdb_client/extras.py index cb0eaff7..a6e89985 100644 --- a/influxdb_client/extras.py +++ b/influxdb_client/extras.py @@ -3,4 +3,9 @@ except ModuleNotFoundError as err: raise ImportError(f"`query_data_frame` requires Pandas which couldn't be imported due: {err}") -__all__ = ['pd'] +try: + import numpy as np +except ModuleNotFoundError as err: + raise ImportError(f"`data_frame` requires numpy which couldn't be imported due: {err}") + +__all__ = ['pd', 'np'] diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 74aa92f9..af851729 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -231,7 +231,7 @@ def test_write_data_frame(self): bucket = self.create_test_bucket() now = pd.Timestamp('1970-01-01 00:00+00:00') - data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]], + data_frame = pd.DataFrame(data=[["coyote_creek", 1], ["coyote_creek", 2]], index=[now + timedelta(hours=1), now + timedelta(hours=2)], columns=["location", "water_level"]) @@ -247,14 +247,14 @@ def test_write_data_frame(self): self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet") self.assertEqual(result[0].records[0].get_value(), 1.0) self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek") - self.assertEqual(result[0].records[0].get_field(), "water_level") + self.assertEqual(result[0].records[0].get_field(), "water water_level") self.assertEqual(result[0].records[0].get_time(), datetime.datetime(1970, 1, 1, 1, 0, tzinfo=datetime.timezone.utc)) self.assertEqual(result[0].records[1].get_measurement(), "h2o_feet") self.assertEqual(result[0].records[1].get_value(), 2.0) self.assertEqual(result[0].records[1].values.get("location"), "coyote_creek") - self.assertEqual(result[0].records[1].get_field(), "water_level") + self.assertEqual(result[0].records[1].get_field(), "water water_level") self.assertEqual(result[0].records[1].get_time(), datetime.datetime(1970, 1, 1, 2, 0, tzinfo=datetime.timezone.utc)) diff --git a/tests/test_WriteApiDataFrame.py b/tests/test_WriteApiDataFrame.py new file mode 100644 index 00000000..b235b9a7 --- /dev/null +++ b/tests/test_WriteApiDataFrame.py @@ -0,0 +1,53 @@ +import csv +import os +import time +import unittest + +from influxdb_client import InfluxDBClient, WriteOptions, WriteApi, Point +from tests.base_test import BaseTest + + +class DataFrameWriteTest(BaseTest): + + def setUp(self) -> None: + self.influxDb_client = InfluxDBClient(url="http://localhost:9999", token="my-token") + + self.write_options = WriteOptions(batch_size=10_000, flush_interval=5_000, retry_interval=3_000) + self._write_client = WriteApi(influxdb_client=self.influxDb_client, write_options=self.write_options) + + def tearDown(self) -> None: + self._write_client.__del__() + + @unittest.skip('Test big file') + def test_write_data_frame(self): + import random + from influxdb_client.extras import pd + + if not os.path.isfile("data_frame_file.csv"): + with open('data_frame_file.csv', mode='w+') as csv_file: + _writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) + _writer.writerow(['time', 'col1', 'col2', 'col3', 'col4', 'col5', 'col6', 'col7', 'col8']) + + for i in range(1, 1500000): + choice = ['test_a', 'test_b', 'test_c'] + _writer.writerow([i, random.choice(choice), 'test', random.random(), random.random(), + random.random(), random.random(), random.random(), random.random()]) + + csv_file.close() + + with open('data_frame_file.csv', mode='rb') as csv_file: + + data_frame = pd.read_csv(csv_file, index_col='time') + print(data_frame) + + print('Writing...') + + start = time.time() + + self._write_client.write("my-bucket", "my-org", record=data_frame, + data_frame_measurement_name='h2o_feet', + data_frame_tag_columns=['location']) + + print("Time elapsed: ", (time.time() - start)) + + csv_file.close() From f613fc6f01c2aeefc185d442cba9872f261f033c Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Wed, 20 May 2020 12:01:38 +0200 Subject: [PATCH 2/6] feat: Optimize serializing Pandas DataFrame for writing #92 --- tests/test_WriteApi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index af851729..0a4733f7 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -233,7 +233,7 @@ def test_write_data_frame(self): now = pd.Timestamp('1970-01-01 00:00+00:00') data_frame = pd.DataFrame(data=[["coyote_creek", 1], ["coyote_creek", 2]], index=[now + timedelta(hours=1), now + timedelta(hours=2)], - columns=["location", "water_level"]) + columns=["location", "water water_level"]) self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet', data_frame_tag_columns=['location']) From 6a642ed71fd8a4674af98678046d853e85456882 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Wed, 20 May 2020 14:00:00 +0200 Subject: [PATCH 3/6] feat: Optimize serializing Pandas DataFrame for writing #92 --- influxdb_client/client/write_api.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index c5b76c6e..4ba706ab 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -299,9 +299,6 @@ def _write_batching(self, bucket, org, data, return None def _itertuples(self, data_frame): - """Custom implementation of ``DataFrame.itertuples`` that - returns plain tuples instead of namedtuples. About 50% faster. - """ cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))] return zip(data_frame.index, *cols) From 8de8ed999f0dc530b952bdde79a1aeff5836215a Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Thu, 21 May 2020 08:12:18 +0200 Subject: [PATCH 4/6] feat: Optimize serializing Pandas DataFrame for writing #92 --- tests/test_WriteApiDataFrame.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_WriteApiDataFrame.py b/tests/test_WriteApiDataFrame.py index b235b9a7..51ffb6dd 100644 --- a/tests/test_WriteApiDataFrame.py +++ b/tests/test_WriteApiDataFrame.py @@ -48,6 +48,8 @@ def test_write_data_frame(self): data_frame_measurement_name='h2o_feet', data_frame_tag_columns=['location']) + self._write_client.__del__() + print("Time elapsed: ", (time.time() - start)) csv_file.close() From e7ff7a09dca9a52285b688f077015abe75823683 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Mon, 25 May 2020 10:25:32 +0200 Subject: [PATCH 5/6] chore: check support of numpy.int64 --- .gitignore | 1 + tests/test_WriteApi.py | 1 - tests/test_WriteApiDataFrame.py | 37 +++++++++++++++++++++++++++++++-- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 4fcf06fc..c3810457 100644 --- a/.gitignore +++ b/.gitignore @@ -114,3 +114,4 @@ sandbox # OpenAPI-generator /.openapi-generator* /tests/writer.pickle +/tests/data_frame_file.csv diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 0a4733f7..331bb026 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -5,7 +5,6 @@ import datetime import os import unittest -import time from datetime import timedelta from multiprocessing.pool import ApplyResult diff --git a/tests/test_WriteApiDataFrame.py b/tests/test_WriteApiDataFrame.py index 51ffb6dd..01f52c95 100644 --- a/tests/test_WriteApiDataFrame.py +++ b/tests/test_WriteApiDataFrame.py @@ -2,20 +2,24 @@ import os import time import unittest +from datetime import timedelta -from influxdb_client import InfluxDBClient, WriteOptions, WriteApi, Point +from influxdb_client import InfluxDBClient, WriteOptions, WriteApi +from influxdb_client.client.write_api import SYNCHRONOUS from tests.base_test import BaseTest class DataFrameWriteTest(BaseTest): def setUp(self) -> None: - self.influxDb_client = InfluxDBClient(url="http://localhost:9999", token="my-token") + super().setUp() + self.influxDb_client = InfluxDBClient(url="http://localhost:9999", token="my-token", debug=False) self.write_options = WriteOptions(batch_size=10_000, flush_interval=5_000, retry_interval=3_000) self._write_client = WriteApi(influxdb_client=self.influxDb_client, write_options=self.write_options) def tearDown(self) -> None: + super().tearDown() self._write_client.__del__() @unittest.skip('Test big file') @@ -53,3 +57,32 @@ def test_write_data_frame(self): print("Time elapsed: ", (time.time() - start)) csv_file.close() + + def test_write_num_py(self): + from influxdb_client.extras import pd, np + + bucket = self.create_test_bucket() + + now = pd.Timestamp('2020-04-05 00:00+00:00') + + data_frame = pd.DataFrame(data=[["coyote_creek", np.int64(100.5)], ["coyote_creek", np.int64(200)]], + index=[now + timedelta(hours=1), now + timedelta(hours=2)], + columns=["location", "water_level"]) + + write_api = self.client.write_api(write_options=SYNCHRONOUS) + write_api.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet', + data_frame_tag_columns=['location']) + + write_api.__del__() + + result = self.query_api.query( + "from(bucket:\"" + bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", + self.my_organization.id) + + self.assertEqual(1, len(result)) + self.assertEqual(2, len(result[0].records)) + + self.assertEqual(result[0].records[0].get_value(), 100.0) + self.assertEqual(result[0].records[1].get_value(), 200.0) + + pass From 3a56509d1217d3f803640e8456fe99dc0b7e35fb Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Mon, 25 May 2020 10:38:42 +0200 Subject: [PATCH 6/6] fix: disable not working tests --- tests/test_BucketsApi.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_BucketsApi.py b/tests/test_BucketsApi.py index a528bf03..876e5e67 100644 --- a/tests/test_BucketsApi.py +++ b/tests/test_BucketsApi.py @@ -37,6 +37,7 @@ def test_create_delete_bucket(self): assert self.buckets_api.find_bucket_by_id(my_bucket.id) assert "bucket not found" in e.value.body + @pytest.mark.skip(reason="https://github.com/influxdata/influxdb/issues/14900") def test_find_by_name(self): my_org = self.find_my_org()