diff --git a/.gitignore b/.gitignore index c87b4e16..4fcf06fc 100644 --- a/.gitignore +++ b/.gitignore @@ -87,6 +87,7 @@ celerybeat-schedule .venv env/ venv/ +venv-*/ ENV/ env.bak/ venv.bak/ @@ -112,3 +113,4 @@ sandbox # OpenAPI-generator /.openapi-generator* +/tests/writer.pickle diff --git a/CHANGELOG.md b/CHANGELOG.md index 61908986..c2cb5f3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### Bugs 1. [#61](https://github.com/influxdata/influxdb-client-python/issues/61): Correctly parse CSV where multiple results include multiple tables 1. [#66](https://github.com/influxdata/influxdb-client-python/issues/66): Correctly close connection pool manager at exit +1. [#69](https://github.com/influxdata/influxdb-client-python/issues/69): `InfluxDBClient` and `WriteApi` could serialized by [pickle](https://docs.python.org/3/library/pickle.html#object.__getstate__) (python3.7 or higher) ## 1.4.0 [2020-02-14] diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 6dfd6a6e..5c401e21 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -52,6 +52,17 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.retry_interval = retry_interval self.write_scheduler = write_scheduler + def __getstate__(self): + state = self.__dict__.copy() + # Remove write scheduler + del state['write_scheduler'] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + # Init default write Scheduler + self.write_scheduler = ThreadPoolScheduler(max_workers=1) + SYNCHRONOUS = WriteOptions(write_type=WriteType.synchronous) ASYNCHRONOUS = WriteOptions(write_type=WriteType.asynchronous) @@ -322,3 +333,16 @@ def _on_error(ex): def _on_complete(self): self._disposable.dispose() logger.info("the batching processor was disposed") + + def __getstate__(self): + state = self.__dict__.copy() + # Remove rx + del state['_subject'] + del state['_disposable'] + del state['_write_service'] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + # Init Rx + self.__init__(self._influxdb_client, self._write_options, self._point_settings) diff --git a/influxdb_client/rest.py b/influxdb_client/rest.py index 7073a35e..7dc3eacd 100644 --- a/influxdb_client/rest.py +++ b/influxdb_client/rest.py @@ -58,6 +58,10 @@ def __init__(self, configuration, pools_size=4, maxsize=None): # maxsize is the number of requests to host that are allowed in parallel # noqa: E501 # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501 + self.configuration = configuration + self.pools_size = pools_size + self.maxsize = maxsize + # cert_reqs if configuration.verify_ssl: cert_reqs = ssl.CERT_REQUIRED @@ -293,6 +297,17 @@ def PATCH(self, url, headers=None, query_params=None, post_params=None, _request_timeout=_request_timeout, body=body) + def __getstate__(self): + state = self.__dict__.copy() + # Remove Pool managaer + del state['pool_manager'] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + # Init Pool manager + self.__init__(self.configuration, self.pools_size, self.maxsize) + class ApiException(Exception): diff --git a/openapi-generator/src/main/resources/python/rest.mustache b/openapi-generator/src/main/resources/python/rest.mustache index f7979064..ba261ae2 100644 --- a/openapi-generator/src/main/resources/python/rest.mustache +++ b/openapi-generator/src/main/resources/python/rest.mustache @@ -50,6 +50,10 @@ class RESTClientObject(object): # maxsize is the number of requests to host that are allowed in parallel # noqa: E501 # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501 + self.configuration = configuration + self.pools_size = pools_size + self.maxsize = maxsize + # cert_reqs if configuration.verify_ssl: cert_reqs = ssl.CERT_REQUIRED @@ -285,6 +289,17 @@ class RESTClientObject(object): _request_timeout=_request_timeout, body=body) + def __getstate__(self): + state = self.__dict__.copy() + # Remove Pool managaer + del state['pool_manager'] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + # Init Pool manager + self.__init__(self.configuration, self.pools_size, self.maxsize) + class ApiException(Exception): diff --git a/tests/test_WriteApiPickle.py b/tests/test_WriteApiPickle.py new file mode 100644 index 00000000..b226f7ac --- /dev/null +++ b/tests/test_WriteApiPickle.py @@ -0,0 +1,55 @@ +import pickle +import sys + +import pytest + +from influxdb_client import InfluxDBClient, WriteOptions +from influxdb_client.client.write_api import WriteType +from tests.base_test import current_milli_time, BaseTest + + +class InfluxDBWriterToPickle: + + def __init__(self): + self.client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=False) + self.write_api = self.client.write_api( + write_options=WriteOptions(write_type=WriteType.batching, batch_size=50_000, flush_interval=10_000)) + + def write(self, record): + self.write_api.write(bucket="my-bucket", record=record) + + def terminate(self) -> None: + self.write_api.__del__() + self.client.__del__() + + +class WriteApiPickle(BaseTest): + + def setUp(self) -> None: + super().setUp() + + def tearDown(self) -> None: + super().tearDown() + + @pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7 or higher") + def test_write_line_protocol(self): + writer = InfluxDBWriterToPickle() + + pickle_out = open("writer.pickle", "wb") + pickle.dump(writer, pickle_out) + pickle_out.close() + + writer = pickle.load(open("writer.pickle", "rb")) + + measurement = "h2o_feet_" + str(current_milli_time()) + writer.write(record=f"{measurement},location=coyote_creek water_level=1.0") + writer.terminate() + + tables = self.query_api.query( + f'from(bucket: "my-bucket") |> range(start: 0) |> filter(fn: (r) => r._measurement == "{measurement}")') + + self.assertEqual(len(tables), 1) + self.assertEqual(len(tables[0].records), 1) + self.assertEqual(tables[0].records[0].get_measurement(), measurement) + self.assertEqual(tables[0].records[0].get_value(), 1.0) + self.assertEqual(tables[0].records[0].get_field(), "water_level")