Skip to content

Commit

Permalink
fix: Optimize serializing data into Pandas DataFrame (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Mar 18, 2020
1 parent ab18935 commit 11b9034
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 7 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.6.0 [unreleased]

### Bugs
1. [#72](https://github.com/influxdata/influxdb-client-python/issues/72): Optimize serializing data into Pandas DataFrame

## 1.5.0 [2020-03-13]

### Features
Expand Down
18 changes: 13 additions & 5 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self, response: HTTPResponse, serialization_mode: FluxSerialization
self.tables = []
self._serialization_mode = serialization_mode
self._data_frame_index = data_frame_index
self._data_frame_values = []
pass

def __enter__(self):
Expand Down Expand Up @@ -135,20 +136,27 @@ def _parse_flux_response(self):
yield flux_record

if self._serialization_mode is FluxSerializationMode.dataFrame:
self._data_frame.loc[len(self._data_frame.index)] = flux_record.values
self._data_frame_values.append(flux_record.values)
pass

# debug
# print(flux_record)

# Return latest DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
yield self._prepare_data_frame()

def _prepare_data_frame(self):
from ..extras import pd

# We have to create temporary DataFrame because we want to preserve default column values
_temp_df = pd.DataFrame(self._data_frame_values)
self._data_frame_values = []

# Custom DataFrame index
if self._data_frame_index:
self._data_frame = self._data_frame.set_index(self._data_frame_index)
return self._data_frame
_temp_df = _temp_df.set_index(self._data_frame_index)

# Append data
return self._data_frame.append(_temp_df)

def parse_record(self, table_index, table, csv):
record = FluxRecord(table_index)
Expand Down
42 changes: 40 additions & 2 deletions tests/test_QueryApiDataFrame.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import random

import httpretty
import rx
from pandas import DataFrame
from pandas._libs.tslibs.timestamps import Timestamp
from rx import operators as ops

from influxdb_client import InfluxDBClient
from tests.base_test import BaseTest
from influxdb_client import InfluxDBClient, Point, WritePrecision, WriteOptions
from tests.base_test import BaseTest, current_milli_time


class QueryDataFrameApi(BaseTest):
Expand Down Expand Up @@ -250,3 +254,37 @@ def test_more_table_custom_index(self):
Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[2].index))
self.assertEqual(5, len(_dataFrames[2]))


class QueryDataFrameIntegrationApi(BaseTest):

def test_large_amount_of_data(self):
_measurement_name = "data_frame_" + str(current_milli_time())

def _create_point(index) -> Point:
return Point(_measurement_name) \
.tag("deviceType", str(random.choice(['A', 'B']))) \
.tag("name", random.choice(['A', 'B'])) \
.field("uuid", random.randint(0, 10_000)) \
.field("co2", random.randint(0, 10_000)) \
.field("humid", random.randint(0, 10_000)) \
.field("lux", random.randint(0, 10_000)) \
.field("water", random.randint(0, 10_000)) \
.field("shine", random.randint(0, 10_000)) \
.field("temp", random.randint(0, 10_000)) \
.field("voc", random.randint(0, 10_000)) \
.time(time=(1583828781 + index), write_precision=WritePrecision.S)

data = rx.range(0, 2_000).pipe(ops.map(lambda index: _create_point(index)))

write_api = self.client.write_api(write_options=WriteOptions(batch_size=500))
write_api.write(org="my-org", bucket="my-bucket", record=data, write_precision=WritePrecision.S)
write_api.__del__()

query = 'from(bucket: "my-bucket")' \
'|> range(start: 2020-02-19T23:30:00Z, stop: now())' \
f'|> filter(fn: (r) => r._measurement == "{_measurement_name}")'

result = self.client.query_api().query_data_frame(org="my-org", query=query)

self.assertGreater(len(result), 1)

0 comments on commit 11b9034

Please sign in to comment.