Skip to content

Commit

Permalink
Add support for serialise query response into Pandas Data Frame (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Nov 18, 2019
1 parent 1e2fae1 commit fb97b11
Show file tree
Hide file tree
Showing 17 changed files with 1,185 additions and 12 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.1.0 [unreleased]

### Features
1. [#29](https://github.com/influxdata/influxdb-client-python/issues/29): Added support for serialise response into Pandas DataFrame

## 1.0.0 [2019-11-11]

### Features
Expand Down
76 changes: 73 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ InfluxDB 2.0 client features

- Querying data
- using the Flux language
- into csv, raw data, `flux_table <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5>`_ structure
- into csv, raw data, `flux_table <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5>`_ structure, `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
- `How to queries <#queries>`_
- Writing data using
- `Line Protocol <https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial>`_
Expand All @@ -65,6 +65,7 @@ InfluxDB 2.0 client features
- `Connect to InfluxDB Cloud`_
- `How to efficiently import large dataset`_
- `Efficiency write data from IOT sensor`_
- `How to use Jupyter + Pandas + InfluxDB 2`_

Installation
------------
Expand Down Expand Up @@ -300,6 +301,7 @@ The result retrieved by `QueryApi <https://github.com/influxdata/influxdb-client
1. Flux data structure: `FluxTable <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5>`_, `FluxColumn <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L22>`_ and `FluxRecord <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L31>`_
2. `csv.reader <https://docs.python.org/3.4/library/csv.html#reader-objects>`__ which will iterate over CSV lines
3. Raw unprocessed results as a ``str`` iterator
4. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_

The API also support streaming ``FluxRecord`` via `query_stream <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py#L77>`_, see example below:

Expand Down Expand Up @@ -372,6 +374,57 @@ The API also support streaming ``FluxRecord`` via `query_stream <https://github.
"""
client.__del__()
Pandas DataFrame
""""""""""""""""
.. marker-pandas-start
.. note:: Note that if a query returns more then one table then the client generates a ``DataFrame`` for each of them.

The ``client`` is able to retrieve data in `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_ format thought ``query_data_frame``:

.. code-block:: python
from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
"""
Prepare data
"""
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
write_api.write(bucket="my-bucket", org="my-org", record=[_point1, _point2])
"""
Query: using Pandas DataFrame
"""
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
'|> range(start: -10m) '
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
'|> keep(columns: ["location", "temperature"])')
print(data_frame.to_string())
"""
Close client
"""
client.__del__()
Output:

.. code-block::
result table location temperature
0 _result 0 New York 24.3
1 _result 1 Prague 25.3
.. marker-pandas-end
Examples
^^^^^^^^

Expand Down Expand Up @@ -560,7 +613,7 @@ Efficiency write data from IOT sensor
.. marker-iot-end
Connect to InfluxDB Cloud
^^^^^^^^^^^^^^^^^^^^^^^^^
"""""""""""""""""""""""""
The following example demonstrate a simplest way how to write and query date with the InfluxDB Cloud.

At first point you should create an authentication token as is described `here <https://v2.docs.influxdata.com/v2.0/security/tokens/create-token/>`_.
Expand Down Expand Up @@ -634,7 +687,24 @@ The last step is run a python script via: ``python3 influx_cloud.py``.
finally:
client.close()
.. marker-iot-end
How to use Jupyter + Pandas + InfluxDB 2
""""""""""""""""""""""""""""""""""""""""
The first example shows how to use client capabilities to predict stock price via `Keras <https://keras.io>`_, `TensorFlow <https://www.tensorflow.org>`_, `sklearn <https://scikit-learn.org/stable/>`_:

* sources - `stock-predictions.ipynb <notebooks/stock-predictions.ipynb>`_

.. image:: docs/images/stock-price-prediction.gif

Result:

.. image:: docs/images/stock-price-prediction-results.png

The second example shows how to use client capabilities to realtime visualization via `hvPlot <https://hvplot.pyviz.org>`_, `Streamz <https://streamz.readthedocs.io/en/latest/>`_, `RxPY <https://rxpy.readthedocs.io/en/latest/>`_:

* sources - `realtime-stream.ipynb <notebooks/realtime-stream.ipynb>`_

.. image:: docs/images/realtime-result.gif


Advanced Usage
--------------
Expand Down
5 changes: 5 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ TasksApi

.. autoclass:: influxdb_client.domain.Task
:members:

DeleteApi
""""""""
.. autoclass:: influxdb_client.DeleteApi
:members:
Binary file added docs/images/realtime-result.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/stock-price-prediction-results.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/stock-price-prediction.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ Query
:start-after: marker-query-start
:end-before: marker-query-end

Pandas DataFrame
^^^^^^^^^^^^^^^^
.. include:: ../README.rst
:start-after: marker-pandas-start
:end-before: marker-pandas-end

Write
^^^^^
.. include:: ../README.rst
Expand Down
12 changes: 12 additions & 0 deletions examples/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@
if not len(csv_line) == 0:
print(f'Temperature in {csv_line[9]} is {csv_line[6]}')

print()
print()

"""
Query: using Pandas DataFrame
"""
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
'|> range(start: -10m) '
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
'|> keep(columns: ["location", "temperature"])')
print(data_frame.to_string())

"""
Close client
"""
Expand Down
46 changes: 41 additions & 5 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import base64
import codecs
import csv as csv_parser
from enum import Enum
from typing import List

import ciso8601
from pandas import DataFrame
from urllib3 import HTTPResponse

from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord
Expand All @@ -18,12 +21,20 @@ class FluxCsvParserException(Exception):
pass


class FluxSerializationMode(Enum):
tables = 1
stream = 2
dataFrame = 3


class FluxCsvParser(object):

def __init__(self, response: HTTPResponse, stream: bool) -> None:
def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
data_frame_index: List[str] = None) -> None:
self._response = response
self.tables = []
self._stream = stream
self._serialization_mode = serialization_mode
self._data_frame_index = data_frame_index
pass

def __enter__(self):
Expand Down Expand Up @@ -64,6 +75,11 @@ def _parse_flux_response(self):
token = csv[0]
# start new table
if "#datatype" == token:

# Return already parsed DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
yield self._prepare_data_frame()

start_new_table = True
table = FluxTable()
self._insert_table(table, table_index)
Expand All @@ -86,6 +102,12 @@ def _parse_flux_response(self):
if start_new_table:
self.add_column_names_and_tags(table, csv)
start_new_table = False
# Create DataFrame with default values
if self._serialization_mode is FluxSerializationMode.dataFrame:
self._data_frame = DataFrame(data=[], columns=[], index=None)
for column in table.columns:
self._data_frame[column.label] = column.default_value
pass
continue

# to int converions todo
Expand All @@ -101,14 +123,28 @@ def _parse_flux_response(self):

flux_record = self.parse_record(table_index - 1, table, csv)

if not self._stream:
if self._serialization_mode is FluxSerializationMode.tables:
self.tables[table_index - 1].records.append(flux_record)

yield flux_record
if self._serialization_mode is FluxSerializationMode.stream:
yield flux_record

if self._serialization_mode is FluxSerializationMode.dataFrame:
self._data_frame.loc[len(self._data_frame.index)] = flux_record.values
pass

# debug
# print(flux_record)

# Return latest DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
yield self._prepare_data_frame()

def _prepare_data_frame(self):
if self._data_frame_index:
self._data_frame = self._data_frame.set_index(self._data_frame_index)
return self._data_frame

def parse_record(self, table_index, table, csv):
record = FluxRecord(table_index)

Expand Down Expand Up @@ -180,5 +216,5 @@ def add_column_names_and_tags(table, csv):
i += 1

def _insert_table(self, table, table_index):
if not self._stream:
if self._serialization_mode is FluxSerializationMode.tables:
self.tables.insert(table_index, table)
35 changes: 32 additions & 3 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import csv
from typing import List, Generator, Any

from pandas import DataFrame

from influxdb_client import Dialect
from influxdb_client import Query, QueryService
from influxdb_client.client.flux_csv_parser import FluxCsvParser
from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode
from influxdb_client.client.flux_table import FluxTable, FluxRecord


Expand Down Expand Up @@ -68,7 +70,7 @@ def query(self, query: str, org=None) -> List['FluxTable']:
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, stream=False)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables)

list(_parser.generator())

Expand All @@ -88,10 +90,37 @@ def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, Non
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, stream=True)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream)

return _parser.generator()

def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None):
"""
Synchronously executes the Flux query and return Pandas DataFrame.
Note that if a query returns more then one table than the client generates a DataFrame for each of them.
:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param data_frame_index: the list of columns that are used as DataFrame index
:return:
"""
if org is None:
org = self._influxdb_client.org

response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
data_frame_index=data_frame_index)
_dataFrames = list(_parser.generator())

if len(_dataFrames) == 0:
return DataFrame(columns=[], index=None)
elif len(_dataFrames) == 1:
return _dataFrames[0]
else:
return _dataFrames

# private helper for c
@staticmethod
def _create_query(query, dialect=default_dialect):
Expand Down
Loading

0 comments on commit fb97b11

Please sign in to comment.