Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DataFrame optimalization #97

Merged
merged 6 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,4 @@ sandbox
# OpenAPI-generator
/.openapi-generator*
/tests/writer.pickle
/tests/data_frame_file.csv
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
1. [#79](https://github.com/influxdata/influxdb-client-python/issues/79): Added support for writing Pandas DataFrame
2. [#92](https://github.com/influxdata/influxdb-client-python/issues/92): Optimize serializing Pandas DataFrame for writing

### Bug Fixes
1. [#85](https://github.com/influxdata/influxdb-client-python/issues/85): Fixed a possibility to generate empty write batch
Expand Down
1 change: 1 addition & 0 deletions extra-requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pandas>=0.25.3
numpy
56 changes: 34 additions & 22 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# coding: utf-8
import logging
import os
import re
from datetime import timedelta
from enum import Enum
from functools import reduce
from itertools import chain
from random import random
from time import sleep
from typing import Union, List
Expand All @@ -14,7 +17,7 @@

from influxdb_client import WritePrecision, WriteService
from influxdb_client.client.abstract_client import AbstractClient
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, _ESCAPE_KEY
from influxdb_client.rest import ApiException

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -253,10 +256,8 @@ def _serialize(self, record, write_precision, **kwargs) -> bytes:
_result = self._serialize(Point.from_dict(record, write_precision=write_precision),
write_precision, **kwargs)
elif 'DataFrame' in type(record).__name__:
_result = self._serialize(self._data_frame_to_list_of_points(record,
precision=write_precision, **kwargs),
write_precision,
**kwargs)
_data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs)
_result = self._serialize(_data, write_precision, **kwargs)

elif isinstance(record, list):
_result = b'\n'.join([self._serialize(item, write_precision,
Expand Down Expand Up @@ -297,8 +298,12 @@ def _write_batching(self, bucket, org, data,

return None

def _itertuples(self, data_frame):
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
return zip(data_frame.index, *cols)

def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
from ..extras import pd
from ..extras import pd, np
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))
Expand All @@ -314,28 +319,35 @@ def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

data = []
measurement_name = kwargs.get('data_frame_measurement_name')
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
data_frame_tag_columns = set(data_frame_tag_columns or [])

for c, (row) in enumerate(data_frame.values):
point = Point(measurement_name=kwargs.get('data_frame_measurement_name'))
tags = []
fields = []

for count, (value) in enumerate(row):
column = data_frame.columns[count]
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
if data_frame_tag_columns and column in data_frame_tag_columns:
point.tag(column, value)
else:
point.field(column, value)
if self._point_settings.defaultTags:
for key, value in self._point_settings.defaultTags.items():
data_frame[key] = value
data_frame_tag_columns.add(key)

point.time(data_frame.index[c], precision)
for index, (key, value) in enumerate(data_frame.dtypes.items()):
key = str(key).translate(_ESCAPE_KEY)

if self._point_settings.defaultTags:
for key, val in self._point_settings.defaultTags.items():
point.tag(key, val)
if key in data_frame_tag_columns:
tags.append(f"{key}={{p[{index + 1}].translate(_ESCAPE_KEY)}}")
elif issubclass(value.type, np.integer):
fields.append(f"{key}={{p[{index + 1}]}}i")
elif issubclass(value.type, (np.float, np.bool_)):
fields.append(f"{key}={{p[{index + 1}]}}")
else:
fields.append(f"{key}=\"{{p[{index + 1}].translate(_ESCAPE_KEY)}}\"")

data.append(point)
fmt = (f'{measurement_name}', f'{"," if tags else ""}', ','.join(tags),
' ', ','.join(fields), ' {p[0].value}')
f = eval("lambda p: f'{}'".format(''.join(fmt)))

return data
return list(map(f, self._itertuples(data_frame)))

def _http(self, batch_item: _BatchItem):

Expand Down
7 changes: 6 additions & 1 deletion influxdb_client/extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@
except ModuleNotFoundError as err:
raise ImportError(f"`query_data_frame` requires Pandas which couldn't be imported due: {err}")

__all__ = ['pd']
try:
import numpy as np
except ModuleNotFoundError as err:
raise ImportError(f"`data_frame` requires numpy which couldn't be imported due: {err}")

__all__ = ['pd', 'np']
1 change: 1 addition & 0 deletions tests/test_BucketsApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def test_create_delete_bucket(self):
assert self.buckets_api.find_bucket_by_id(my_bucket.id)
assert "bucket not found" in e.value.body

@pytest.mark.skip(reason="https://github.com/influxdata/influxdb/issues/14900")
def test_find_by_name(self):
my_org = self.find_my_org()

Expand Down
9 changes: 4 additions & 5 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import datetime
import os
import unittest
import time
from datetime import timedelta
from multiprocessing.pool import ApplyResult

Expand Down Expand Up @@ -231,9 +230,9 @@ def test_write_data_frame(self):
bucket = self.create_test_bucket()

now = pd.Timestamp('1970-01-01 00:00+00:00')
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
data_frame = pd.DataFrame(data=[["coyote_creek", 1], ["coyote_creek", 2]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])
columns=["location", "water water_level"])

self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])
Expand All @@ -247,14 +246,14 @@ def test_write_data_frame(self):
self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet")
self.assertEqual(result[0].records[0].get_value(), 1.0)
self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek")
self.assertEqual(result[0].records[0].get_field(), "water_level")
self.assertEqual(result[0].records[0].get_field(), "water water_level")
self.assertEqual(result[0].records[0].get_time(),
datetime.datetime(1970, 1, 1, 1, 0, tzinfo=datetime.timezone.utc))

self.assertEqual(result[0].records[1].get_measurement(), "h2o_feet")
self.assertEqual(result[0].records[1].get_value(), 2.0)
self.assertEqual(result[0].records[1].values.get("location"), "coyote_creek")
self.assertEqual(result[0].records[1].get_field(), "water_level")
self.assertEqual(result[0].records[1].get_field(), "water water_level")
self.assertEqual(result[0].records[1].get_time(),
datetime.datetime(1970, 1, 1, 2, 0, tzinfo=datetime.timezone.utc))

Expand Down
88 changes: 88 additions & 0 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import csv
import os
import time
import unittest
from datetime import timedelta

from influxdb_client import InfluxDBClient, WriteOptions, WriteApi
from influxdb_client.client.write_api import SYNCHRONOUS
from tests.base_test import BaseTest


class DataFrameWriteTest(BaseTest):

def setUp(self) -> None:
super().setUp()
self.influxDb_client = InfluxDBClient(url="http://localhost:9999", token="my-token", debug=False)

self.write_options = WriteOptions(batch_size=10_000, flush_interval=5_000, retry_interval=3_000)
self._write_client = WriteApi(influxdb_client=self.influxDb_client, write_options=self.write_options)

def tearDown(self) -> None:
super().tearDown()
self._write_client.__del__()

@unittest.skip('Test big file')
def test_write_data_frame(self):
import random
from influxdb_client.extras import pd

if not os.path.isfile("data_frame_file.csv"):
with open('data_frame_file.csv', mode='w+') as csv_file:
_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
_writer.writerow(['time', 'col1', 'col2', 'col3', 'col4', 'col5', 'col6', 'col7', 'col8'])

for i in range(1, 1500000):
choice = ['test_a', 'test_b', 'test_c']
_writer.writerow([i, random.choice(choice), 'test', random.random(), random.random(),
random.random(), random.random(), random.random(), random.random()])

csv_file.close()

with open('data_frame_file.csv', mode='rb') as csv_file:

data_frame = pd.read_csv(csv_file, index_col='time')
print(data_frame)

print('Writing...')

start = time.time()

self._write_client.write("my-bucket", "my-org", record=data_frame,
data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

bednar marked this conversation as resolved.
Show resolved Hide resolved
self._write_client.__del__()

print("Time elapsed: ", (time.time() - start))

csv_file.close()

def test_write_num_py(self):
from influxdb_client.extras import pd, np

bucket = self.create_test_bucket()

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["coyote_creek", np.int64(100.5)], ["coyote_creek", np.int64(200)]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

write_api = self.client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

write_api.__del__()

result = self.query_api.query(
"from(bucket:\"" + bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)",
self.my_organization.id)

self.assertEqual(1, len(result))
self.assertEqual(2, len(result[0].records))

self.assertEqual(result[0].records[0].get_value(), 100.0)
self.assertEqual(result[0].records[1].get_value(), 200.0)

pass