From 709b3eb159622c56ce9b54b3a7b31177cf3fdf3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Bedn=C3=A1=C5=99?= Date: Thu, 28 Nov 2019 12:21:01 +0100 Subject: [PATCH] feat: client performance (#44) * fix: optimize serialization to LineProtocol * fix: add reasonable arguments for import data * doc: clarified how to use client for import large amount of data * example: Add example how to stream data into InfluxDB --- CHANGELOG.md | 3 + README.rst | 22 +- examples/import_data_set.py | 20 +- examples/import_data_set_multiprocessing.py | 219 ++++++++++++++++++++ influxdb_client/client/write/point.py | 56 +++-- influxdb_client/client/write_api.py | 65 +++--- notebooks/stock_predictions_import_data.py | 28 ++- tests/test_point.py | 10 +- 8 files changed, 323 insertions(+), 100 deletions(-) create mode 100644 examples/import_data_set_multiprocessing.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c1fb8f4..1d16f6a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.2.0 [unreleased] +### Features +1. [#44](https://github.com/influxdata/influxdb-client-python/pull/44): Optimized serialization into LineProtocol, Clarified how to use client for import large amount of data + ### API 1. [#42](https://github.com/influxdata/influxdb-client-python/pull/42): Updated swagger to latest version diff --git a/README.rst b/README.rst index 5d7f56e0..dfdb0e28 100644 --- a/README.rst +++ b/README.rst @@ -428,6 +428,8 @@ Examples How to efficiently import large dataset """"""""""""""""""""""""""""""""""""""" +The following example shows how to import dataset with dozen megabytes. +If you would like to import gigabytes of data then use our multiprocessing example: `import_data_set_multiprocessing.py `_ for use a full capability of your hardware. * sources - `import_data_set.py `_ @@ -441,7 +443,6 @@ How to efficiently import large dataset from collections import OrderedDict from csv import DictReader - from datetime import datetime import rx from rx import operators as ops @@ -466,13 +467,26 @@ How to efficiently import large dataset :param row: the row of CSV file :return: Parsed csv row to [Point] """ + + """ + For better performance is sometimes useful directly create a LineProtocol to avoid unnecessary escaping overhead: + """ + # from pytz import UTC + # import ciso8601 + # from influxdb_client.client.write.point import EPOCH + # + # time = (UTC.localize(ciso8601.parse_datetime(row["Date"])) - EPOCH).total_seconds() * 1e9 + # return f"financial-analysis,type=vix-daily" \ + # f" close={float(row['VIX Close'])},high={float(row['VIX High'])},low={float(row['VIX Low'])},open={float(row['VIX Open'])} " \ + # f" {int(time)}" + return Point("financial-analysis") \ .tag("type", "vix-daily") \ .field("open", float(row['VIX Open'])) \ .field("high", float(row['VIX High'])) \ .field("low", float(row['VIX Low'])) \ .field("close", float(row['VIX Close'])) \ - .time(datetime.strptime(row['Date'], '%Y-%m-%d')) + .time(row['Date']) """ @@ -485,9 +499,9 @@ How to efficiently import large dataset client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=True) """ - Create client that writes data in batches with 500 items. + Create client that writes data in batches with 50_000 items. """ - write_api = client.write_api(write_options=WriteOptions(batch_size=500, jitter_interval=1_000)) + write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000)) """ Write data into InfluxDB diff --git a/examples/import_data_set.py b/examples/import_data_set.py index 77c490d3..4fe59bc7 100644 --- a/examples/import_data_set.py +++ b/examples/import_data_set.py @@ -6,7 +6,6 @@ from collections import OrderedDict from csv import DictReader -from datetime import datetime import rx from rx import operators as ops @@ -32,13 +31,26 @@ def parse_row(row: OrderedDict): :param row: the row of CSV file :return: Parsed csv row to [Point] """ + + """ + For better performance is sometimes useful directly create a LineProtocol to avoid unnecessary escaping overhead: + """ + # from pytz import UTC + # import ciso8601 + # from influxdb_client.client.write.point import EPOCH + # + # time = (UTC.localize(ciso8601.parse_datetime(row["Date"])) - EPOCH).total_seconds() * 1e9 + # return f"financial-analysis,type=vix-daily" \ + # f" close={float(row['VIX Close'])},high={float(row['VIX High'])},low={float(row['VIX Low'])},open={float(row['VIX Open'])} " \ + # f" {int(time)}" + return Point("financial-analysis") \ .tag("type", "vix-daily") \ .field("open", float(row['VIX Open'])) \ .field("high", float(row['VIX High'])) \ .field("low", float(row['VIX Low'])) \ .field("close", float(row['VIX Close'])) \ - .time(datetime.strptime(row['Date'], '%Y-%m-%d')) + .time(row['Date']) """ @@ -51,9 +63,9 @@ def parse_row(row: OrderedDict): client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=True) """ -Create client that writes data in batches with 500 items. +Create client that writes data in batches with 50_000 items. """ -write_api = client.write_api(write_options=WriteOptions(batch_size=500, jitter_interval=1_000)) +write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000)) """ Write data into InfluxDB diff --git a/examples/import_data_set_multiprocessing.py b/examples/import_data_set_multiprocessing.py new file mode 100644 index 00000000..09ba91da --- /dev/null +++ b/examples/import_data_set_multiprocessing.py @@ -0,0 +1,219 @@ +""" +Import public NYC taxi and for-hire vehicle (Uber, Lyft, etc.) trip data into InfluxDB 2.0 + +https://github.com/toddwschneider/nyc-taxi-data +""" +import concurrent.futures +import io +import multiprocessing +from collections import OrderedDict +from csv import DictReader +from datetime import datetime +from multiprocessing import Value +from urllib.request import urlopen + +import rx +from rx import operators as ops + +from influxdb_client import Point, InfluxDBClient, WriteOptions +from influxdb_client.client.write_api import WriteType + + +class ProgressTextIOWrapper(io.TextIOWrapper): + """ + TextIOWrapper that store progress of read. + """ + def __init__(self, *args, **kwargs): + io.TextIOWrapper.__init__(self, *args, **kwargs) + self.progress = None + pass + + def readline(self, *args, **kwarg) -> str: + readline = super().readline(*args, **kwarg) + self.progress.value += len(readline) + return readline + + +class InfluxDBWriter(multiprocessing.Process): + """ + Writer that writes data in batches with 50_000 items. + """ + def __init__(self, queue): + multiprocessing.Process.__init__(self) + self.queue = queue + self.client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=False) + self.write_api = self.client.write_api( + write_options=WriteOptions(write_type=WriteType.batching, batch_size=50_000, flush_interval=10_000)) + + def run(self): + while True: + next_task = self.queue.get() + if next_task is None: + # Poison pill means terminate + self.terminate() + self.queue.task_done() + break + self.write_api.write(org="my-org", bucket="my-bucket", record=next_task) + self.queue.task_done() + + def terminate(self) -> None: + proc_name = self.name + print() + print('Writer: flushing data...') + self.write_api.__del__() + self.client.__del__() + print('Writer: closed'.format(proc_name)) + + +def parse_row(row: OrderedDict): + """Parse row of CSV file into Point with structure: + + taxi-trip-data,DOLocationID=152,PULocationID=79,dispatching_base_num=B02510 dropoff_datetime="2019-01-01 01:27:24" 1546304267000000000 + + CSV format: + dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag + B00001,2019-01-01 00:30:00,2019-01-01 02:51:55,,, + B00001,2019-01-01 00:45:00,2019-01-01 00:54:49,,, + B00001,2019-01-01 00:15:00,2019-01-01 00:54:52,,, + B00008,2019-01-01 00:19:00,2019-01-01 00:39:00,,, + B00008,2019-01-01 00:27:00,2019-01-01 00:37:00,,, + B00008,2019-01-01 00:48:00,2019-01-01 01:02:00,,, + B00008,2019-01-01 00:50:00,2019-01-01 00:59:00,,, + B00008,2019-01-01 00:51:00,2019-01-01 00:56:00,,, + B00009,2019-01-01 00:44:00,2019-01-01 00:58:00,,, + B00009,2019-01-01 00:19:00,2019-01-01 00:36:00,,, + B00009,2019-01-01 00:36:00,2019-01-01 00:49:00,,, + B00009,2019-01-01 00:26:00,2019-01-01 00:32:00,,, + ... + + :param row: the row of CSV file + :return: Parsed csv row to [Point] + """ + + return Point("taxi-trip-data") \ + .tag("dispatching_base_num", row['dispatching_base_num']) \ + .tag("PULocationID", row['PULocationID']) \ + .tag("DOLocationID", row['DOLocationID']) \ + .tag("SR_Flag", row['SR_Flag']) \ + .field("dropoff_datetime", row['dropoff_datetime']) \ + .time(row['pickup_datetime']) \ + .to_line_protocol() + + +def parse_rows(rows, total_size): + """ + Parse bunch of CSV rows into LineProtocol + + :param total_size: Total size of file + :param rows: CSV rows + :return: List of line protocols + """ + _parsed_rows = list(map(parse_row, rows)) + + counter_.value += len(_parsed_rows) + if counter_.value % 10_000 == 0: + print('{0:8}{1}'.format(counter_.value, ' - {0:.2f} %' + .format(100 * float(progress_.value) / float(int(total_size))) if total_size else "")) + pass + + queue_.put(_parsed_rows) + return None + + +def init_counter(counter, progress, queue): + """ + Initialize shared counter for display progress + """ + global counter_ + counter_ = counter + global progress_ + progress_ = progress + global queue_ + queue_ = queue + + +""" +Create multiprocess shared environment +""" +queue_ = multiprocessing.Manager().Queue() +counter_ = Value('i', 0) +progress_ = Value('i', 0) +startTime = datetime.now() + +url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv" +# url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv" + +""" +Open URL and for stream data +""" +response = urlopen(url) +if response.headers: + content_length = response.headers['Content-length'] +io_wrapper = ProgressTextIOWrapper(response) +io_wrapper.progress = progress_ + +""" +Start writer as a new process +""" +writer = InfluxDBWriter(queue_) +writer.start() + +""" +Create process pool for parallel encoding into LineProtocol +""" +cpu_count = multiprocessing.cpu_count() +with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter, + initargs=(counter_, progress_, queue_)) as executor: + """ + Converts incoming HTTP stream into sequence of LineProtocol + """ + data = rx \ + .from_iterable(DictReader(io_wrapper)) \ + .pipe(ops.buffer_with_count(10_000), + # Parse 10_000 rows into LineProtocol on subprocess + ops.flat_map(lambda rows: executor.submit(parse_rows, rows, content_length))) + + """ + Write data into InfluxDB + """ + data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}')) + +""" +Terminate Writer +""" +queue_.put(None) +queue_.join() + +print() +print(f'Import finished in: {datetime.now() - startTime}') +print() + +""" +Querying 10 pickups from dispatching 'B00008' +""" +query = 'from(bucket:"my-bucket")' \ + '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \ + '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \ + '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \ + '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \ + '|> rename(columns: {_time: "pickup_datetime"})' \ + '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)' + +client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=False) +result = client.query_api().query(org="my-org", query=query) + +""" +Processing results +""" +print() +print("=== Querying 10 pickups from dispatching 'B00008' ===") +print() +for table in result: + for record in table.records: + print( + f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}') + +""" +Close client +""" +client.__del__() diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index 53ac1c14..2083afc8 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -4,7 +4,7 @@ from decimal import Decimal from numbers import Integral -import dateutil.parser +import ciso8601 from pytz import UTC from six import iteritems @@ -12,6 +12,8 @@ EPOCH = UTC.localize(datetime.utcfromtimestamp(0)) DEFAULT_WRITE_PRECISION = WritePrecision.NS +_ESCAPE_KEY = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '=': r'\=', '\n': ''}) +_ESCAPE_STRING = str.maketrans({'\"': r"\"", "\\": r"\\"}) class Point(object): @@ -56,32 +58,33 @@ def field(self, field, value): return self def to_line_protocol(self): - ret = _escape_tag(self._name) - ret += _append_tags(self._tags) + _measurement = _escape_key(self._name) + _tags = _append_tags(self._tags) _fields = _append_fields(self._fields) if not _fields: return "" - ret += _fields - ret += _append_time(self._time, self._write_precision) - return ret + _time = _append_time(self._time, self._write_precision) + + return f"{_measurement}{_tags}{_fields}{_time}" def _append_tags(tags): - _ret = "" + _return = [] for tag_key, tag_value in sorted(iteritems(tags)): if tag_value is None: continue - tag = _escape_tag(tag_key) + tag = _escape_key(tag_key) value = _escape_tag_value(tag_value) if tag != '' and value != '': - _ret += ',' + tag_key + '=' + value - return _ret + ' ' + _return.append(f'{tag}={value}') + + return f"{',' if _return else ''}{','.join(_return)} " def _append_fields(fields): - _ret = "" + _return = [] for field, value in sorted(iteritems(fields)): if value is None: @@ -90,43 +93,38 @@ def _append_fields(fields): if isinstance(value, float) or isinstance(value, Decimal): if not math.isfinite(value): continue - _ret += _escape_tag(field) + '=' + str(value) + ',' + _return.append(f'{_escape_key(field)}={str(value)}') elif isinstance(value, int) and not isinstance(value, bool): - _ret += _escape_tag(field) + '=' + str(value) + 'i' + ',' + _return.append(f'{_escape_key(field)}={str(value)}i') elif isinstance(value, bool): - _ret += _escape_tag(field) + '=' + str(value).lower() + ',' + _return.append(f'{_escape_key(field)}={str(value).lower()}') elif isinstance(value, str): - value = escape_value(str(value)) - _ret += _escape_tag(field) + "=" + '"' + value + '"' + "," + _return.append(f'{_escape_key(field)}="{_escape_string(value)}"') else: raise ValueError() - if _ret.endswith(","): - return _ret[:-1] - else: - return _ret + return f"{','.join(_return)}" -def _append_time(time, write_precission): +def _append_time(time, write_precision): if time is None: return '' - _ret = " " + str(int(_convert_timestamp(time, write_precission))) - return _ret + return f" {int(_convert_timestamp(time, write_precision))}" -def _escape_tag(tag): - return str(tag).replace("\\", "\\\\").replace(" ", "\\ ").replace(",", "\\,").replace("=", "\\=") +def _escape_key(tag): + return str(tag).translate(_ESCAPE_KEY) def _escape_tag_value(value): - ret = _escape_tag(value) + ret = _escape_key(value) if ret.endswith('\\'): ret += ' ' return ret -def escape_value(value): - return value.translate(str.maketrans({'\"': r"\"", "\\": r"\\"})) +def _escape_string(value): + return str(value).translate(_ESCAPE_STRING) def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION): @@ -134,7 +132,7 @@ def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION): return timestamp # assume precision is correct if timestamp is int if isinstance(timestamp, str): - timestamp = dateutil.parser.parse(timestamp) + timestamp = ciso8601.parse_datetime(timestamp) if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime): diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 2bbd988e..74fc678d 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -1,5 +1,6 @@ # coding: utf-8 import logging +import threading from datetime import timedelta from enum import Enum from random import random @@ -8,7 +9,6 @@ import rx from rx import operators as ops, Observable -from rx.core import GroupedObservable from rx.scheduler import ThreadPoolScheduler from rx.subject import Subject @@ -57,14 +57,15 @@ def __init__(self, write_type: WriteType = WriteType.batching, class _BatchItem(object): - def __init__(self, key, data) -> None: + def __init__(self, key, data, size=1) -> None: self.key = key self.data = data + self.size = size pass def __str__(self) -> str: return '_BatchItem[key:\'{}\', \'{}\']' \ - .format(str(self.key), str(self.data)) + .format(str(self.key), str(self.size)) class _BatchItemKey(object): @@ -101,31 +102,6 @@ def _body_reduce(batch_items): return b'\n'.join(map(lambda batch_item: batch_item.data, batch_items)) -def _create_batch(group: GroupedObservable): - return lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs)) - - -def _group_by(batch_item: _BatchItem): - return batch_item.key - - -def _group_to_batch(group: GroupedObservable): - return group.pipe(ops.to_iterable(), - ops.map(list), - ops.map(_create_batch(group))) - - -def _window_to_group(value): - return value.pipe( - ops.to_iterable(), - ops.map(lambda x: rx.from_iterable(x).pipe( - # Group window by 'organization', 'bucket' and 'precision' - ops.group_by(_group_by), - # Create batch (concatenation line protocols by \n) - ops.map(_group_to_batch), - ops.merge_all())), ops.merge_all()) - - class WriteApi(AbstractClient): def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()) -> None: @@ -136,18 +112,24 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions() # Define Subject that listen incoming data and produces writes into InfluxDB self._subject = Subject() - # Define a scheduler that is used for processing incoming data - default singleton - observable = self._subject.pipe(ops.observe_on(self._write_options.write_scheduler)) - self._disposable = observable \ - .pipe( # Split incoming data to windows by batch_size or flush_interval - ops.window_with_time_or_count(count=write_options.batch_size, - timespan=timedelta(milliseconds=write_options.flush_interval)), - # Map incoming batch window in groups defined by 'organization', 'bucket' and 'precision' - ops.flat_map(lambda v: _window_to_group(v)), - # Write data into InfluxDB (possibility to retry if its fail) - ops.map(mapper=lambda batch: self._retryable(data=batch, delay=self._jitter_delay())), # - ops.merge_all()) \ + self._disposable = self._subject.pipe( + # Split incoming data to windows by batch_size or flush_interval + ops.window_with_time_or_count(count=write_options.batch_size, + timespan=timedelta(milliseconds=write_options.flush_interval)), + # Map window into groups defined by 'organization', 'bucket' and 'precision' + ops.flat_map(lambda window: window.pipe( + # Group window by 'organization', 'bucket' and 'precision' + ops.group_by(lambda batch_item: batch_item.key), + # Create batch (concatenation line protocols by \n) + ops.map(lambda group: group.pipe( + ops.to_iterable(), + ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))), + ops.merge_all())), + # Write data into InfluxDB (possibility to retry if its fail) + ops.map(mapper=lambda batch: self._retryable(data=batch, delay=self._jitter_delay())), # + ops.merge_all())\ .subscribe(self._on_next, self._on_error, self._on_complete) + else: self._subject = None self._disposable = None @@ -238,11 +220,13 @@ def _write_batching(self, bucket, org, data, precision=DEFAULT_WRITE_PRECISION): def _http(self, batch_item: _BatchItem): - logger.debug("http post to: %s", batch_item) + logger.debug("Write time series data into InfluxDB: %s", batch_item) self._post_write(False, batch_item.key.bucket, batch_item.key.org, batch_item.data, batch_item.key.precision) + logger.debug("Write request finished %s", batch_item) + return _BatchResponse(data=batch_item) def _post_write(self, _async_req, bucket, org, body, precision): @@ -253,6 +237,7 @@ def _post_write(self, _async_req, bucket, org, body, precision): def _retryable(self, data: str, delay: timedelta): return rx.of(data).pipe( + ops.subscribe_on(self._write_options.write_scheduler), # use delay if its specified ops.delay(duetime=delay, scheduler=self._write_options.write_scheduler), # invoke http call diff --git a/notebooks/stock_predictions_import_data.py b/notebooks/stock_predictions_import_data.py index c1d6026d..d7bbbd1f 100644 --- a/notebooks/stock_predictions_import_data.py +++ b/notebooks/stock_predictions_import_data.py @@ -5,21 +5,21 @@ """ from collections import OrderedDict from csv import DictReader -from datetime import datetime -import pandas as pd +import ciso8601 import requests import rx -import urllib3 +from pytz import UTC from rx import operators as ops -from influxdb_client import Point, InfluxDBClient, WriteOptions +from influxdb_client import InfluxDBClient, WriteOptions +from influxdb_client.client.write.point import EPOCH _progress = 0 def parse_row(row: OrderedDict): - """Parse row of CSV file into Point with structure: + """Parse row of CSV file into LineProtocol with structure: CSV format: date,symbol,open,close,low,high,volume @@ -33,21 +33,19 @@ def parse_row(row: OrderedDict): ... :param row: the row of CSV file - :return: Parsed csv row to [Point] + :return: Parsed csv row to LineProtocol """ global _progress _progress += 1 - if _progress % 1000 == 0: + if _progress % 10000 == 0: print(_progress) - return Point("financial-analysis") \ - .tag("symbol", row["symbol"]) \ - .field("open", float(row['open'])) \ - .field("high", float(row['high'])) \ - .field("low", float(row['low'])) \ - .field("close", float(row['close'])) \ - .time(datetime.strptime(row['date'], '%Y-%m-%d')) + time = (UTC.localize(ciso8601.parse_datetime(row["date"])) - EPOCH).total_seconds() * 1e9 + + return f'financial-analysis,symbol={row["symbol"]} ' \ + f'close={row["close"]},high={row["high"]},low={row["low"]},open={row["open"]} ' \ + f'{int(time)}' def main(): @@ -60,7 +58,7 @@ def main(): .pipe(ops.map(lambda row: parse_row(row))) client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=False) - write_api = client.write_api(write_options=WriteOptions(batch_size=50000)) + write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000)) write_api.write(org="my-org", bucket="my-bucket", record=data) write_api.__del__() diff --git a/tests/test_point.py b/tests/test_point.py index d9cff9be..1987e728 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -1,18 +1,12 @@ # -*- coding: utf-8 -*- -from datetime import datetime -from pytz import UTC - -from influxdb_client import Point, WritePrecision - -EPOCH = UTC.localize(datetime.utcfromtimestamp(0)) - import unittest - from datetime import datetime, timezone, timedelta from decimal import Decimal + from pytz import UTC, timezone +from influxdb_client import Point, WritePrecision from tests.base_test import BaseTest