Skip to content

Commit

Permalink
fix(elastic): inconsistent offset on bulk index error and improve bul…
Browse files Browse the repository at this point in the history
…k performance (#422)
  • Loading branch information
alphinside authored Jul 1, 2022
1 parent ad9aa77 commit bd56f52
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 8 deletions.
13 changes: 9 additions & 4 deletions docarray/array/storage/elastic/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import numpy as np
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import parallel_bulk

from ..base.backend import BaseBackendMixin, TypeMap
from .... import Document
Expand Down Expand Up @@ -159,7 +159,12 @@ def _build_client(self):
return client

def _send_requests(self, request):
bulk(self._client, request)
failed_index = []
for success, info in parallel_bulk(self._client, request, raise_on_error=False):
if not success:
failed_index.append(info['index'])

return failed_index

def _refresh(self, index_name):
self._client.indices.refresh(index=index_name)
Expand All @@ -179,7 +184,7 @@ def _update_offset2ids_meta(self):
} # id here
for offset_, id_ in enumerate(self._offset2ids.ids)
]
r = bulk(self._client, requests)
self._send_requests(requests)
self._client.indices.refresh(index=self._index_name_offset2id)

# Clean trailing unused offsets
Expand All @@ -195,7 +200,7 @@ def _update_offset2ids_meta(self):
}
for offset_ in unused_offsets
]
r = bulk(self._client, requests)
self._send_requests(requests)
self._client.indices.refresh(index=self._index_name_offset2id)

def _get_offset2ids_meta(self) -> List:
Expand Down
22 changes: 18 additions & 4 deletions docarray/array/storage/elastic/seqlike.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Union, Iterable, Dict
import warnings

from ..base.seqlike import BaseSequenceLikeMixin
from .... import Document
Expand Down Expand Up @@ -59,19 +60,32 @@ def __repr__(self):

def _upload_batch(self, docs: Iterable['Document']):
batch = []
failed_index = []
for doc in docs:
batch.append(self._document_to_elastic(doc))
if len(batch) > self._config.batch_size:
self._send_requests(batch)
failed_index.extend(self._send_requests(batch))
self._refresh(self._config.index_name)
batch = []
if len(batch) > 0:
self._send_requests(batch)
failed_index.extend(self._send_requests(batch))
self._refresh(self._config.index_name)

return failed_index

def extend(self, docs: Iterable['Document']):
docs = list(docs)
self._upload_batch(docs)
failed_index = self._upload_batch(docs)
failed_ids = [index['_id'] for index in failed_index]
self._offset2ids.extend(
[doc.id for doc in docs if doc.id not in self._offset2ids.ids]
[
doc.id
for doc in docs
if (doc.id not in self._offset2ids.ids) and (doc.id not in failed_ids)
]
)

if len(failed_ids) > 0:
err_msg = f'fail to add Documents with ids: {failed_ids}'
warnings.warn(err_msg)
raise IndexError(err_msg)
43 changes: 43 additions & 0 deletions tests/unit/array/storage/elastic/test_add.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from docarray import Document, DocumentArray

import pytest


def test_add_ignore_existing_doc_id(start_storage):
elastic_doc = DocumentArray(
Expand Down Expand Up @@ -41,3 +43,44 @@ def test_add_ignore_existing_doc_id(start_storage):
assert len(elastic_doc) == len(elastic_doc[:, 'embedding'])
assert len(elastic_doc) == indexed_offset_count
assert len(elastic_doc[:, 'embedding']) == 7


def test_add_skip_wrong_data_type_and_fix_offset(start_storage):
elastic_doc = DocumentArray(
storage='elasticsearch',
config={
'n_dim': 3,
'columns': [('price', 'int')],
'index_name': 'test_add_skip_wrong_data_type_and_fix_offset',
},
)

with elastic_doc:
elastic_doc.extend(
[
Document(id='0', price=1000),
Document(id='1', price=20000),
Document(id='2', price=103000),
]
)

with pytest.raises(IndexError):
with elastic_doc:
elastic_doc.extend(
[
Document(id='0', price=10000),
Document(id='1', price=20000),
Document(id='3', price=30000),
Document(id='4', price=100000000000), # overflow int32
Document(id='5', price=2000),
Document(id='6', price=100000000000), # overflow int32
Document(id='7', price=30000),
]
)

expected_ids = ['0', '1', '2', '3', '5', '7']

assert len(elastic_doc) == 6
assert len(elastic_doc[:, 'id']) == 6
assert elastic_doc[:, 'id'] == expected_ids
assert elastic_doc._offset2ids.ids == expected_ids

0 comments on commit bd56f52

Please sign in to comment.