Skip to content

Commit

Permalink
feat: DataFrame optimalization (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
rolincova authored Jun 2, 2020
1 parent cd50aa3 commit 4ecce15
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 28 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,4 @@ sandbox
# OpenAPI-generator
/.openapi-generator*
/tests/writer.pickle
/tests/data_frame_file.csv
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features
1. [#79](https://github.com/influxdata/influxdb-client-python/issues/79): Added support for writing Pandas DataFrame
2. [#92](https://github.com/influxdata/influxdb-client-python/issues/92): Optimize serializing Pandas DataFrame for writing

### Bug Fixes
1. [#85](https://github.com/influxdata/influxdb-client-python/issues/85): Fixed a possibility to generate empty write batch
Expand Down
1 change: 1 addition & 0 deletions extra-requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pandas>=0.25.3
numpy
56 changes: 34 additions & 22 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# coding: utf-8
import logging
import os
import re
from datetime import timedelta
from enum import Enum
from functools import reduce
from itertools import chain
from random import random
from time import sleep
from typing import Union, List
Expand All @@ -14,7 +17,7 @@

from influxdb_client import WritePrecision, WriteService
from influxdb_client.client.abstract_client import AbstractClient
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, _ESCAPE_KEY
from influxdb_client.rest import ApiException

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -253,10 +256,8 @@ def _serialize(self, record, write_precision, **kwargs) -> bytes:
_result = self._serialize(Point.from_dict(record, write_precision=write_precision),
write_precision, **kwargs)
elif 'DataFrame' in type(record).__name__:
_result = self._serialize(self._data_frame_to_list_of_points(record,
precision=write_precision, **kwargs),
write_precision,
**kwargs)
_data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs)
_result = self._serialize(_data, write_precision, **kwargs)

elif isinstance(record, list):
_result = b'\n'.join([self._serialize(item, write_precision,
Expand Down Expand Up @@ -297,8 +298,12 @@ def _write_batching(self, bucket, org, data,

return None

def _itertuples(self, data_frame):
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
return zip(data_frame.index, *cols)

def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
from ..extras import pd
from ..extras import pd, np
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))
Expand All @@ -314,28 +319,35 @@ def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

data = []
measurement_name = kwargs.get('data_frame_measurement_name')
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
data_frame_tag_columns = set(data_frame_tag_columns or [])

for c, (row) in enumerate(data_frame.values):
point = Point(measurement_name=kwargs.get('data_frame_measurement_name'))
tags = []
fields = []

for count, (value) in enumerate(row):
column = data_frame.columns[count]
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
if data_frame_tag_columns and column in data_frame_tag_columns:
point.tag(column, value)
else:
point.field(column, value)
if self._point_settings.defaultTags:
for key, value in self._point_settings.defaultTags.items():
data_frame[key] = value
data_frame_tag_columns.add(key)

point.time(data_frame.index[c], precision)
for index, (key, value) in enumerate(data_frame.dtypes.items()):
key = str(key).translate(_ESCAPE_KEY)

if self._point_settings.defaultTags:
for key, val in self._point_settings.defaultTags.items():
point.tag(key, val)
if key in data_frame_tag_columns:
tags.append(f"{key}={{p[{index + 1}].translate(_ESCAPE_KEY)}}")
elif issubclass(value.type, np.integer):
fields.append(f"{key}={{p[{index + 1}]}}i")
elif issubclass(value.type, (np.float, np.bool_)):
fields.append(f"{key}={{p[{index + 1}]}}")
else:
fields.append(f"{key}=\"{{p[{index + 1}].translate(_ESCAPE_KEY)}}\"")

data.append(point)
fmt = (f'{measurement_name}', f'{"," if tags else ""}', ','.join(tags),
' ', ','.join(fields), ' {p[0].value}')
f = eval("lambda p: f'{}'".format(''.join(fmt)))

return data
return list(map(f, self._itertuples(data_frame)))

def _http(self, batch_item: _BatchItem):

Expand Down
7 changes: 6 additions & 1 deletion influxdb_client/extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@
except ModuleNotFoundError as err:
raise ImportError(f"`query_data_frame` requires Pandas which couldn't be imported due: {err}")

__all__ = ['pd']
try:
import numpy as np
except ModuleNotFoundError as err:
raise ImportError(f"`data_frame` requires numpy which couldn't be imported due: {err}")

__all__ = ['pd', 'np']
9 changes: 4 additions & 5 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import datetime
import os
import unittest
import time
from datetime import timedelta
from multiprocessing.pool import ApplyResult

Expand Down Expand Up @@ -231,9 +230,9 @@ def test_write_data_frame(self):
bucket = self.create_test_bucket()

now = pd.Timestamp('1970-01-01 00:00+00:00')
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
data_frame = pd.DataFrame(data=[["coyote_creek", 1], ["coyote_creek", 2]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])
columns=["location", "water water_level"])

self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])
Expand All @@ -247,14 +246,14 @@ def test_write_data_frame(self):
self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet")
self.assertEqual(result[0].records[0].get_value(), 1.0)
self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek")
self.assertEqual(result[0].records[0].get_field(), "water_level")
self.assertEqual(result[0].records[0].get_field(), "water water_level")
self.assertEqual(result[0].records[0].get_time(),
datetime.datetime(1970, 1, 1, 1, 0, tzinfo=datetime.timezone.utc))

self.assertEqual(result[0].records[1].get_measurement(), "h2o_feet")
self.assertEqual(result[0].records[1].get_value(), 2.0)
self.assertEqual(result[0].records[1].values.get("location"), "coyote_creek")
self.assertEqual(result[0].records[1].get_field(), "water_level")
self.assertEqual(result[0].records[1].get_field(), "water water_level")
self.assertEqual(result[0].records[1].get_time(),
datetime.datetime(1970, 1, 1, 2, 0, tzinfo=datetime.timezone.utc))

Expand Down
88 changes: 88 additions & 0 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import csv
import os
import time
import unittest
from datetime import timedelta

from influxdb_client import InfluxDBClient, WriteOptions, WriteApi
from influxdb_client.client.write_api import SYNCHRONOUS
from tests.base_test import BaseTest


class DataFrameWriteTest(BaseTest):

def setUp(self) -> None:
super().setUp()
self.influxDb_client = InfluxDBClient(url="http://localhost:9999", token="my-token", debug=False)

self.write_options = WriteOptions(batch_size=10_000, flush_interval=5_000, retry_interval=3_000)
self._write_client = WriteApi(influxdb_client=self.influxDb_client, write_options=self.write_options)

def tearDown(self) -> None:
super().tearDown()
self._write_client.__del__()

@unittest.skip('Test big file')
def test_write_data_frame(self):
import random
from influxdb_client.extras import pd

if not os.path.isfile("data_frame_file.csv"):
with open('data_frame_file.csv', mode='w+') as csv_file:
_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
_writer.writerow(['time', 'col1', 'col2', 'col3', 'col4', 'col5', 'col6', 'col7', 'col8'])

for i in range(1, 1500000):
choice = ['test_a', 'test_b', 'test_c']
_writer.writerow([i, random.choice(choice), 'test', random.random(), random.random(),
random.random(), random.random(), random.random(), random.random()])

csv_file.close()

with open('data_frame_file.csv', mode='rb') as csv_file:

data_frame = pd.read_csv(csv_file, index_col='time')
print(data_frame)

print('Writing...')

start = time.time()

self._write_client.write("my-bucket", "my-org", record=data_frame,
data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

self._write_client.__del__()

print("Time elapsed: ", (time.time() - start))

csv_file.close()

def test_write_num_py(self):
from influxdb_client.extras import pd, np

bucket = self.create_test_bucket()

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["coyote_creek", np.int64(100.5)], ["coyote_creek", np.int64(200)]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

write_api = self.client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

write_api.__del__()

result = self.query_api.query(
"from(bucket:\"" + bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)",
self.my_organization.id)

self.assertEqual(1, len(result))
self.assertEqual(2, len(result[0].records))

self.assertEqual(result[0].records[0].get_value(), 100.0)
self.assertEqual(result[0].records[1].get_value(), 200.0)

pass

0 comments on commit 4ecce15

Please sign in to comment.