Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for serialise query response into Pandas Data Frame #41

Merged
merged 23 commits into from
Nov 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
20ab37c
feat: Added pandas DataFrame support
rhajek Nov 7, 2019
29714fb
feat: Add live streaming example
bednar Nov 7, 2019
9952f0c
feat: Add live streaming example
bednar Nov 7, 2019
1970dba
feat: Add live streaming example
bednar Nov 7, 2019
c63ffa3
feat: Add live streaming example
bednar Nov 8, 2019
f0ed9ab
feat: Add live streaming example
bednar Nov 11, 2019
6407789
feat: Add live streaming example
bednar Nov 11, 2019
9e96c58
feat: Add on-fly parsing of response to pandas (#29)
bednar Nov 12, 2019
41ee594
feat: Add possibility to specify DataFrame index (#29)
bednar Nov 13, 2019
3193442
feat: Prepare documentation for PandasData frame (#29)
bednar Nov 13, 2019
47be9d2
feat: Prepare documentation for PandasData frame (#29)
bednar Nov 13, 2019
cd7627d
feat: Prepare documentation for PandasData frame (#29)
bednar Nov 13, 2019
2540a53
Update README.rst
bednar Nov 13, 2019
2af15b7
feat: Prepare documentation for Jupyter (#29)
bednar Nov 13, 2019
0bdbc86
feat: realtime jupyter example (#29)
bednar Nov 13, 2019
aa87581
feat: realtime jupyter example (#29)
bednar Nov 13, 2019
7563926
feat: realtime jupyter example (#29)
bednar Nov 13, 2019
e7c59de
feat: stock price jupyter example (#29)
bednar Nov 13, 2019
7713f36
feat: stock price jupyter example (#29)
bednar Nov 13, 2019
f4c819a
feat: stock price jupyter example (#29)
bednar Nov 13, 2019
cc8898b
Update README.rst
bednar Nov 13, 2019
8dfc831
feat: improved doc (#29)
bednar Nov 13, 2019
35ced8e
feat: it there is no results than return empty DataFrame (#29)
bednar Nov 13, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.1.0 [unreleased]

### Features
1. [#29](https://github.com/influxdata/influxdb-client-python/issues/29): Added support for serialise response into Pandas DataFrame

## 1.0.0 [2019-11-11]

### Features
Expand Down
76 changes: 73 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ InfluxDB 2.0 client features

- Querying data
- using the Flux language
- into csv, raw data, `flux_table <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5>`_ structure
- into csv, raw data, `flux_table <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5>`_ structure, `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
- `How to queries <#queries>`_
- Writing data using
- `Line Protocol <https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial>`_
Expand All @@ -65,6 +65,7 @@ InfluxDB 2.0 client features
- `Connect to InfluxDB Cloud`_
- `How to efficiently import large dataset`_
- `Efficiency write data from IOT sensor`_
- `How to use Jupyter + Pandas + InfluxDB 2`_

Installation
------------
Expand Down Expand Up @@ -300,6 +301,7 @@ The result retrieved by `QueryApi <https://github.com/influxdata/influxdb-client
1. Flux data structure: `FluxTable <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5>`_, `FluxColumn <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L22>`_ and `FluxRecord <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L31>`_
2. `csv.reader <https://docs.python.org/3.4/library/csv.html#reader-objects>`__ which will iterate over CSV lines
3. Raw unprocessed results as a ``str`` iterator
4. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_

The API also support streaming ``FluxRecord`` via `query_stream <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py#L77>`_, see example below:

Expand Down Expand Up @@ -372,6 +374,57 @@ The API also support streaming ``FluxRecord`` via `query_stream <https://github.
"""
client.__del__()

Pandas DataFrame
""""""""""""""""
.. marker-pandas-start

.. note:: Note that if a query returns more then one table then the client generates a ``DataFrame`` for each of them.

The ``client`` is able to retrieve data in `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_ format thought ``query_data_frame``:

.. code-block:: python

from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

"""
Prepare data
"""

_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)

write_api.write(bucket="my-bucket", org="my-org", record=[_point1, _point2])

"""
Query: using Pandas DataFrame
"""
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
'|> range(start: -10m) '
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
'|> keep(columns: ["location", "temperature"])')
print(data_frame.to_string())

"""
Close client
"""
client.__del__()

Output:

.. code-block::

result table location temperature
0 _result 0 New York 24.3
1 _result 1 Prague 25.3

.. marker-pandas-end

Examples
^^^^^^^^

Expand Down Expand Up @@ -560,7 +613,7 @@ Efficiency write data from IOT sensor
.. marker-iot-end

Connect to InfluxDB Cloud
^^^^^^^^^^^^^^^^^^^^^^^^^
"""""""""""""""""""""""""
The following example demonstrate a simplest way how to write and query date with the InfluxDB Cloud.

At first point you should create an authentication token as is described `here <https://v2.docs.influxdata.com/v2.0/security/tokens/create-token/>`_.
Expand Down Expand Up @@ -634,7 +687,24 @@ The last step is run a python script via: ``python3 influx_cloud.py``.
finally:
client.close()

.. marker-iot-end
How to use Jupyter + Pandas + InfluxDB 2
""""""""""""""""""""""""""""""""""""""""
The first example shows how to use client capabilities to predict stock price via `Keras <https://keras.io>`_, `TensorFlow <https://www.tensorflow.org>`_, `sklearn <https://scikit-learn.org/stable/>`_:

* sources - `stock-predictions.ipynb <notebooks/stock-predictions.ipynb>`_

.. image:: docs/images/stock-price-prediction.gif

Result:

.. image:: docs/images/stock-price-prediction-results.png

The second example shows how to use client capabilities to realtime visualization via `hvPlot <https://hvplot.pyviz.org>`_, `Streamz <https://streamz.readthedocs.io/en/latest/>`_, `RxPY <https://rxpy.readthedocs.io/en/latest/>`_:

* sources - `realtime-stream.ipynb <notebooks/realtime-stream.ipynb>`_

.. image:: docs/images/realtime-result.gif


Advanced Usage
--------------
Expand Down
5 changes: 5 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ TasksApi

.. autoclass:: influxdb_client.domain.Task
:members:

DeleteApi
""""""""
.. autoclass:: influxdb_client.DeleteApi
:members:
Binary file added docs/images/realtime-result.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/stock-price-prediction-results.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/stock-price-prediction.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ Query
:start-after: marker-query-start
:end-before: marker-query-end

Pandas DataFrame
^^^^^^^^^^^^^^^^
.. include:: ../README.rst
:start-after: marker-pandas-start
:end-before: marker-pandas-end

Write
^^^^^
.. include:: ../README.rst
Expand Down
12 changes: 12 additions & 0 deletions examples/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@
if not len(csv_line) == 0:
print(f'Temperature in {csv_line[9]} is {csv_line[6]}')

print()
print()

"""
Query: using Pandas DataFrame
"""
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
'|> range(start: -10m) '
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
'|> keep(columns: ["location", "temperature"])')
print(data_frame.to_string())

"""
Close client
"""
Expand Down
46 changes: 41 additions & 5 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import base64
import codecs
import csv as csv_parser
from enum import Enum
from typing import List

import ciso8601
from pandas import DataFrame
from urllib3 import HTTPResponse

from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord
Expand All @@ -18,12 +21,20 @@ class FluxCsvParserException(Exception):
pass


class FluxSerializationMode(Enum):
tables = 1
stream = 2
dataFrame = 3


class FluxCsvParser(object):

def __init__(self, response: HTTPResponse, stream: bool) -> None:
def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
data_frame_index: List[str] = None) -> None:
self._response = response
self.tables = []
self._stream = stream
self._serialization_mode = serialization_mode
self._data_frame_index = data_frame_index
pass

def __enter__(self):
Expand Down Expand Up @@ -64,6 +75,11 @@ def _parse_flux_response(self):
token = csv[0]
# start new table
if "#datatype" == token:

# Return already parsed DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
yield self._prepare_data_frame()

start_new_table = True
table = FluxTable()
self._insert_table(table, table_index)
Expand All @@ -86,6 +102,12 @@ def _parse_flux_response(self):
if start_new_table:
self.add_column_names_and_tags(table, csv)
start_new_table = False
# Create DataFrame with default values
if self._serialization_mode is FluxSerializationMode.dataFrame:
self._data_frame = DataFrame(data=[], columns=[], index=None)
for column in table.columns:
self._data_frame[column.label] = column.default_value
pass
continue

# to int converions todo
Expand All @@ -101,14 +123,28 @@ def _parse_flux_response(self):

flux_record = self.parse_record(table_index - 1, table, csv)

if not self._stream:
if self._serialization_mode is FluxSerializationMode.tables:
self.tables[table_index - 1].records.append(flux_record)

yield flux_record
if self._serialization_mode is FluxSerializationMode.stream:
yield flux_record

if self._serialization_mode is FluxSerializationMode.dataFrame:
self._data_frame.loc[len(self._data_frame.index)] = flux_record.values
pass

# debug
# print(flux_record)

# Return latest DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
yield self._prepare_data_frame()

def _prepare_data_frame(self):
if self._data_frame_index:
self._data_frame = self._data_frame.set_index(self._data_frame_index)
return self._data_frame

def parse_record(self, table_index, table, csv):
record = FluxRecord(table_index)

Expand Down Expand Up @@ -180,5 +216,5 @@ def add_column_names_and_tags(table, csv):
i += 1

def _insert_table(self, table, table_index):
if not self._stream:
if self._serialization_mode is FluxSerializationMode.tables:
self.tables.insert(table_index, table)
35 changes: 32 additions & 3 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import csv
from typing import List, Generator, Any

from pandas import DataFrame

from influxdb_client import Dialect
from influxdb_client import Query, QueryService
from influxdb_client.client.flux_csv_parser import FluxCsvParser
from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode
from influxdb_client.client.flux_table import FluxTable, FluxRecord


Expand Down Expand Up @@ -68,7 +70,7 @@ def query(self, query: str, org=None) -> List['FluxTable']:
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, stream=False)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables)

list(_parser.generator())

Expand All @@ -88,10 +90,37 @@ def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, Non
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, stream=True)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream)

return _parser.generator()

def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None):
"""
Synchronously executes the Flux query and return Pandas DataFrame.
Note that if a query returns more then one table than the client generates a DataFrame for each of them.

:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param data_frame_index: the list of columns that are used as DataFrame index
:return:
"""
if org is None:
org = self._influxdb_client.org

response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
data_frame_index=data_frame_index)
_dataFrames = list(_parser.generator())

if len(_dataFrames) == 0:
return DataFrame(columns=[], index=None)
elif len(_dataFrames) == 1:
return _dataFrames[0]
else:
return _dataFrames

# private helper for c
@staticmethod
def _create_query(query, dialect=default_dialect):
Expand Down
Loading