Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DataFrame optimalization #97

Merged
merged 6 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
1. [#79](https://github.com/influxdata/influxdb-client-python/issues/79): Added support for writing Pandas DataFrame
2. [#92](https://github.com/influxdata/influxdb-client-python/issues/92): Optimize serializing Pandas DataFrame for writing

### Bug Fixes
1. [#85](https://github.com/influxdata/influxdb-client-python/issues/85): Fixed a possibility to generate empty write batch
Expand Down
1 change: 1 addition & 0 deletions extra-requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pandas>=0.25.3
numpy
59 changes: 37 additions & 22 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# coding: utf-8
import logging
import os
import re
from datetime import timedelta
from enum import Enum
from functools import reduce
from itertools import chain
from random import random
from time import sleep
from typing import Union, List
Expand All @@ -14,7 +17,7 @@

from influxdb_client import WritePrecision, WriteService
from influxdb_client.client.abstract_client import AbstractClient
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, _ESCAPE_KEY
from influxdb_client.rest import ApiException

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -253,10 +256,8 @@ def _serialize(self, record, write_precision, **kwargs) -> bytes:
_result = self._serialize(Point.from_dict(record, write_precision=write_precision),
write_precision, **kwargs)
elif 'DataFrame' in type(record).__name__:
_result = self._serialize(self._data_frame_to_list_of_points(record,
precision=write_precision, **kwargs),
write_precision,
**kwargs)
_data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs)
_result = self._serialize(_data, write_precision, **kwargs)

elif isinstance(record, list):
_result = b'\n'.join([self._serialize(item, write_precision,
Expand Down Expand Up @@ -297,8 +298,15 @@ def _write_batching(self, bucket, org, data,

return None

def _itertuples(self, data_frame):
"""Custom implementation of ``DataFrame.itertuples`` that
returns plain tuples instead of namedtuples. About 50% faster.
"""
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
return zip(data_frame.index, *cols)

def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
from ..extras import pd
from ..extras import pd, np
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))
Expand All @@ -314,28 +322,35 @@ def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

data = []
measurement_name = kwargs.get('data_frame_measurement_name')
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
data_frame_tag_columns = set(data_frame_tag_columns or [])

for c, (row) in enumerate(data_frame.values):
point = Point(measurement_name=kwargs.get('data_frame_measurement_name'))
tags = []
fields = []

for count, (value) in enumerate(row):
column = data_frame.columns[count]
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
if data_frame_tag_columns and column in data_frame_tag_columns:
point.tag(column, value)
else:
point.field(column, value)
if self._point_settings.defaultTags:
for key, value in self._point_settings.defaultTags.items():
data_frame[key] = value
data_frame_tag_columns.add(key)

point.time(data_frame.index[c], precision)
for index, (key, value) in enumerate(data_frame.dtypes.items()):
key = str(key).translate(_ESCAPE_KEY)

if self._point_settings.defaultTags:
for key, val in self._point_settings.defaultTags.items():
point.tag(key, val)
if key in data_frame_tag_columns:
tags.append(f"{key}={{p[{index + 1}].translate(_ESCAPE_KEY)}}")
elif issubclass(value.type, np.integer):
fields.append(f"{key}={{p[{index + 1}]}}i")
elif issubclass(value.type, (np.float, np.bool_)):
fields.append(f"{key}={{p[{index + 1}]}}")
else:
fields.append(f"{key}=\"{{p[{index + 1}].translate(_ESCAPE_KEY)}}\"")

data.append(point)
fmt = (f'{measurement_name}', f'{"," if tags else ""}', ','.join(tags),
' ', ','.join(fields), ' {p[0].value}')
f = eval("lambda p: f'{}'".format(''.join(fmt)))

return data
return list(map(f, self._itertuples(data_frame)))

def _http(self, batch_item: _BatchItem):

Expand Down
7 changes: 6 additions & 1 deletion influxdb_client/extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@
except ModuleNotFoundError as err:
raise ImportError(f"`query_data_frame` requires Pandas which couldn't be imported due: {err}")

__all__ = ['pd']
try:
import numpy as np
except ModuleNotFoundError as err:
raise ImportError(f"`data_frame` requires numpy which couldn't be imported due: {err}")

__all__ = ['pd', 'np']
8 changes: 4 additions & 4 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ def test_write_data_frame(self):
bucket = self.create_test_bucket()

now = pd.Timestamp('1970-01-01 00:00+00:00')
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
data_frame = pd.DataFrame(data=[["coyote_creek", 1], ["coyote_creek", 2]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])
columns=["location", "water water_level"])

self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])
Expand All @@ -247,14 +247,14 @@ def test_write_data_frame(self):
self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet")
self.assertEqual(result[0].records[0].get_value(), 1.0)
self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek")
self.assertEqual(result[0].records[0].get_field(), "water_level")
self.assertEqual(result[0].records[0].get_field(), "water water_level")
self.assertEqual(result[0].records[0].get_time(),
datetime.datetime(1970, 1, 1, 1, 0, tzinfo=datetime.timezone.utc))

self.assertEqual(result[0].records[1].get_measurement(), "h2o_feet")
self.assertEqual(result[0].records[1].get_value(), 2.0)
self.assertEqual(result[0].records[1].values.get("location"), "coyote_creek")
self.assertEqual(result[0].records[1].get_field(), "water_level")
self.assertEqual(result[0].records[1].get_field(), "water water_level")
self.assertEqual(result[0].records[1].get_time(),
datetime.datetime(1970, 1, 1, 2, 0, tzinfo=datetime.timezone.utc))

Expand Down
53 changes: 53 additions & 0 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import csv
import os
import time
import unittest

from influxdb_client import InfluxDBClient, WriteOptions, WriteApi, Point
from tests.base_test import BaseTest


class DataFrameWriteTest(BaseTest):

def setUp(self) -> None:
self.influxDb_client = InfluxDBClient(url="http://localhost:9999", token="my-token")

self.write_options = WriteOptions(batch_size=10_000, flush_interval=5_000, retry_interval=3_000)
self._write_client = WriteApi(influxdb_client=self.influxDb_client, write_options=self.write_options)

def tearDown(self) -> None:
self._write_client.__del__()

@unittest.skip('Test big file')
def test_write_data_frame(self):
import random
from influxdb_client.extras import pd

if not os.path.isfile("data_frame_file.csv"):
with open('data_frame_file.csv', mode='w+') as csv_file:
_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
_writer.writerow(['time', 'col1', 'col2', 'col3', 'col4', 'col5', 'col6', 'col7', 'col8'])

for i in range(1, 1500000):
choice = ['test_a', 'test_b', 'test_c']
_writer.writerow([i, random.choice(choice), 'test', random.random(), random.random(),
random.random(), random.random(), random.random(), random.random()])

csv_file.close()

with open('data_frame_file.csv', mode='rb') as csv_file:

data_frame = pd.read_csv(csv_file, index_col='time')
print(data_frame)

print('Writing...')

start = time.time()

self._write_client.write("my-bucket", "my-org", record=data_frame,
data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

bednar marked this conversation as resolved.
Show resolved Hide resolved
print("Time elapsed: ", (time.time() - start))

csv_file.close()