Skip to content

Commit

Permalink
feat: support for writing pandas DataFrame (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
rolincova authored May 4, 2020
1 parent ab3915f commit a1f9826
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.7.0 [unreleased]

### Features
1. [#79](https://github.com/influxdata/influxdb-client-python/issues/79): Added support for writing Pandas DataFrame

### Bug Fixes
1. [#85](https://github.com/influxdata/influxdb-client-python/issues/85): Fixed a possibility to generate empty write batch

Expand Down
12 changes: 12 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ InfluxDB 2.0 client features
- `Line Protocol <https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial>`_
- `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16>`__
- `RxPY <https://rxpy.readthedocs.io/en/latest/>`__ Observable
- `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
- `How to writes <#writes>`_
- `InfluxDB 2.0 API <https://github.com/influxdata/influxdb/blob/master/http/swagger.yml>`_ client for management
- the client is generated from the `swagger <https://github.com/influxdata/influxdb/blob/master/http/swagger.yml>`_ by using the `openapi-generator <https://github.com/OpenAPITools/openapi-generator>`_
Expand Down Expand Up @@ -219,6 +220,7 @@ The data could be written as
3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time``
4. List of above items
5. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item
6. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_


Batching
Expand Down Expand Up @@ -302,6 +304,16 @@ The batching is configurable by ``write_options``\ :
_write_client.write("my-bucket", "my-org", _data)
"""
Write Pandas DataFrame
"""
_now = pd.Timestamp().now('UTC')
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[now, now + timedelta(hours=1)],
columns=["location", "water_level"])
_write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',

This comment has been minimized.

Copy link
@kakila

kakila Feb 14, 2021

This line is not right.

  1. bucket does not exist
  2. org argument missing

This comment has been minimized.

Copy link
@bednar

bednar Feb 15, 2021

Contributor

@kakila Thanks for note, I will take a look

This comment has been minimized.

Copy link
@bednar

bednar Feb 19, 2021

Contributor

The docs is fixed in PR: #196

data_frame_tag_columns=['location'])
"""
Close client
Expand Down
89 changes: 74 additions & 15 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,23 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
def write(self, bucket: str, org: str = None,
record: Union[
str, List['str'], Point, List['Point'], dict, List['dict'], bytes, List['bytes'], Observable] = None,
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION) -> None:
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> None:
"""
Writes time-series data into influxdb.
:param str org: specifies the destination organization for writes; take either the ID or Name interchangeably; if both orgID and org are specified, org takes precedence. (required)
:param str bucket: specifies the destination bucket for writes (required)
:param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol
:param record: Points, line protocol, RxPY Observable to write
:param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write
:param data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:param data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
"""

if org is None:
org = self._influxdb_client.org

if self._point_settings.defaultTags and record:
if self._point_settings.defaultTags and record is not None:
for key, val in self._point_settings.defaultTags.items():
if isinstance(record, dict):
record.get("tags")[key] = val
Expand All @@ -209,9 +211,10 @@ def write(self, bucket: str, org: str = None,
r.tag(key, val)

if self._write_options.write_type is WriteType.batching:
return self._write_batching(bucket, org, record, write_precision)
return self._write_batching(bucket, org, record,
write_precision, **kwargs)

final_string = self._serialize(record, write_precision)
final_string = self._serialize(record, write_precision, **kwargs)

_async_req = True if self._write_options.write_type == WriteType.asynchronous else False

Expand All @@ -235,7 +238,7 @@ def __del__(self):
self._disposable = None
pass

def _serialize(self, record, write_precision) -> bytes:
def _serialize(self, record, write_precision, **kwargs) -> bytes:
_result = b''
if isinstance(record, bytes):
_result = record
Expand All @@ -244,40 +247,96 @@ def _serialize(self, record, write_precision) -> bytes:
_result = record.encode("utf-8")

elif isinstance(record, Point):
_result = self._serialize(record.to_line_protocol(), write_precision=write_precision)
_result = self._serialize(record.to_line_protocol(), write_precision, **kwargs)

elif isinstance(record, dict):
_result = self._serialize(Point.from_dict(record, write_precision=write_precision),
write_precision=write_precision)
write_precision, **kwargs)
elif 'DataFrame' in type(record).__name__:
_result = self._serialize(self._data_frame_to_list_of_points(record,
precision=write_precision, **kwargs),
write_precision,
**kwargs)

elif isinstance(record, list):
_result = b'\n'.join([self._serialize(item, write_precision=write_precision) for item in record])
_result = b'\n'.join([self._serialize(item, write_precision,
**kwargs) for item in record])

return _result

def _write_batching(self, bucket, org, data, precision=DEFAULT_WRITE_PRECISION):
def _write_batching(self, bucket, org, data,
precision=DEFAULT_WRITE_PRECISION,
**kwargs):
_key = _BatchItemKey(bucket, org, precision)
if isinstance(data, bytes):
self._subject.on_next(_BatchItem(key=_key, data=data))

elif isinstance(data, str):
self._write_batching(bucket, org, data.encode("utf-8"), precision)
self._write_batching(bucket, org, data.encode("utf-8"),
precision, **kwargs)

elif isinstance(data, Point):
self._write_batching(bucket, org, data.to_line_protocol(), precision)
self._write_batching(bucket, org, data.to_line_protocol(),
precision, **kwargs)

elif isinstance(data, dict):
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), precision)
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision),
precision, **kwargs)

elif 'DataFrame' in type(data).__name__:
self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, precision, **kwargs),
precision, **kwargs)

elif isinstance(data, list):
for item in data:
self._write_batching(bucket, org, item, precision)
self._write_batching(bucket, org, item, precision, **kwargs)

elif isinstance(data, Observable):
data.subscribe(lambda it: self._write_batching(bucket, org, it, precision))
data.subscribe(lambda it: self._write_batching(bucket, org, it, precision, **kwargs))
pass

return None

def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
from ..extras import pd
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))

if 'data_frame_measurement_name' not in kwargs:
raise TypeError('"data_frame_measurement_name" is a Required Argument')

if isinstance(data_frame.index, pd.PeriodIndex):
data_frame.index = data_frame.index.to_timestamp()
else:
data_frame.index = pd.to_datetime(data_frame.index)

if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

data = []

for c, (row) in enumerate(data_frame.values):
point = Point(measurement_name=kwargs.get('data_frame_measurement_name'))

for count, (value) in enumerate(row):
column = data_frame.columns[count]
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
if data_frame_tag_columns and column in data_frame_tag_columns:
point.tag(column, value)
else:
point.field(column, value)

point.time(data_frame.index[c], precision)

if self._point_settings.defaultTags:
for key, val in self._point_settings.defaultTags.items():
point.tag(key, val)

data.append(point)

return data

def _http(self, batch_item: _BatchItem):

logger.debug("Write time series data into InfluxDB: %s", batch_item)
Expand Down
90 changes: 90 additions & 0 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import unittest
import time
from datetime import timedelta
from multiprocessing.pool import ApplyResult

from influxdb_client import Point, WritePrecision, InfluxDBClient
Expand Down Expand Up @@ -224,6 +225,57 @@ def test_write_bytes(self):

self.delete_test_bucket(_bucket)

def test_write_data_frame(self):
from influxdb_client.extras import pd

bucket = self.create_test_bucket()

now = pd.Timestamp('1970-01-01 00:00+00:00')
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

result = self.query_api.query(
"from(bucket:\"" + bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", self.org)

self.assertEqual(1, len(result))
self.assertEqual(2, len(result[0].records))

self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet")
self.assertEqual(result[0].records[0].get_value(), 1.0)
self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek")
self.assertEqual(result[0].records[0].get_field(), "water_level")
self.assertEqual(result[0].records[0].get_time(),
datetime.datetime(1970, 1, 1, 1, 0, tzinfo=datetime.timezone.utc))

self.assertEqual(result[0].records[1].get_measurement(), "h2o_feet")
self.assertEqual(result[0].records[1].get_value(), 2.0)
self.assertEqual(result[0].records[1].values.get("location"), "coyote_creek")
self.assertEqual(result[0].records[1].get_field(), "water_level")
self.assertEqual(result[0].records[1].get_time(),
datetime.datetime(1970, 1, 1, 2, 0, tzinfo=datetime.timezone.utc))

self.delete_test_bucket(bucket)

def test_write_data_frame_without_measurement_name(self):
from influxdb_client.extras import pd

bucket = self.create_test_bucket()

now = pd.Timestamp('1970-01-01 00:00+00:00')
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

with self.assertRaises(TypeError) as cm:
self.write_client.write(bucket.name, record=data_frame)
exception = cm.exception

self.assertEqual('"data_frame_measurement_name" is a Required Argument', exception.__str__())

def test_use_default_org(self):
bucket = self.create_test_bucket()

Expand Down Expand Up @@ -362,6 +414,44 @@ def test_use_default_tags_with_dictionaries(self):

self.delete_test_bucket(bucket)

def test_use_default_tags_with_data_frame(self):
from influxdb_client.extras import pd

bucket = self.create_test_bucket()

now = pd.Timestamp('1970-01-01 00:00+00:00')
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

time.sleep(1)

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'

flux_result = self.client.query_api().query(query)

self.assertEqual(1, len(flux_result))

records = flux_result[0].records

self.assertEqual(2, len(records))

rec = records[0]
rec2 = records[1]

self.assertEqual(self.id_tag, rec["id"])
self.assertEqual(self.customer_tag, rec["customer"])
self.assertEqual("LA", rec[self.data_center_key])

self.assertEqual(self.id_tag, rec2["id"])
self.assertEqual(self.customer_tag, rec2["customer"])
self.assertEqual("LA", rec2[self.data_center_key])

self.delete_test_bucket(bucket)

def test_write_bytes(self):
bucket = self.create_test_bucket()

Expand Down
27 changes: 27 additions & 0 deletions tests/test_WriteApiBatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,33 @@ def test_to_low_flush_interval(self):

httpretty.reset()

def test_batching_data_frame(self):
from influxdb_client.extras import pd

httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)

data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0],
["coyote_creek", 3.0], ["coyote_creek", 4.0]],
index=[1, 2, 3, 4],
columns=["location", "level water_level"])

self._write_client.write("my-bucket", "my-org", record=data_frame,
data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

time.sleep(1)

_requests = httpretty.httpretty.latest_requests

self.assertEqual(2, len(_requests))
_request1 = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n" \
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2"
_request2 = "h2o_feet,location=coyote_creek level\\ water_level=3.0 3\n" \
"h2o_feet,location=coyote_creek level\\ water_level=4.0 4"

self.assertEqual(_request1, _requests[0].parsed_body)
self.assertEqual(_request2, _requests[1].parsed_body)

if __name__ == '__main__':
unittest.main()

0 comments on commit a1f9826

Please sign in to comment.