-
Notifications
You must be signed in to change notification settings - Fork 17
/
claims.py
376 lines (302 loc) · 13.9 KB
/
claims.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
from __future__ import annotations
import copy
from abc import abstractmethod
from typing import Any, Callable
from wikibaseintegrator.models.basemodel import BaseModel
from wikibaseintegrator.models.qualifiers import Qualifiers
from wikibaseintegrator.models.references import Reference, References
from wikibaseintegrator.models.snaks import Snak, Snaks
from wikibaseintegrator.wbi_enums import ActionIfExists, WikibaseRank, WikibaseSnakType
class Claims(BaseModel):
def __init__(self) -> None:
self.claims: dict[str, list[Claim]] = {}
@property
def claims(self) -> dict[str, list[Claim]]:
return self.__claims
@claims.setter
def claims(self, claims: dict[str, list[Claim]]):
self.__claims = claims
def get(self, property: str) -> list[Claim]:
return self.claims[property]
def remove(self, property: str | None = None) -> None:
if property in self.claims:
for prop in self.claims[property]:
if prop.id:
prop.remove()
else:
self.claims[property].remove(prop)
if len(self.claims[property]) == 0:
del self.claims[property]
def add(self, claims: Claims | list[Claim] | Claim, action_if_exists: ActionIfExists = ActionIfExists.REPLACE_ALL) -> Claims:
"""
:param claims: A Claim, list of Claim or just a Claims object to add to this Claims object.
:param action_if_exists: Replace or append the statement. You can force an addition if the declaration already exists. Defaults to REPLACE_ALL.
KEEP: The original claim will be kept and the new one will not be added (because there is already one with this property number)
APPEND_OR_REPLACE: The new claim will be added only if the new one is different (by comparing values)
FORCE_APPEND: The new claim will be added even if already exists
REPLACE_ALL: The new claim will replace the old one
:return: Return the updated Claims object.
"""
if action_if_exists not in ActionIfExists:
raise ValueError(f'{action_if_exists} is not a valid action_if_exists value. Use the enum ActionIfExists')
if isinstance(claims, Claim):
claims = [claims]
elif claims is None or ((not isinstance(claims, list) or not all(isinstance(n, Claim) for n in claims)) and not isinstance(claims, Claims)):
raise TypeError("claims must be an instance of Claim or Claims or a list of Claim")
# TODO: Don't replace if claim is the same
# This code is separated from the rest to avoid looping multiple over `self.claims`.
if action_if_exists == ActionIfExists.REPLACE_ALL:
for claim in claims:
if claim is not None:
assert isinstance(claim, Claim)
property = claim.mainsnak.property_number
if property in self.claims:
for claim_to_remove in self.claims[property]:
if claim_to_remove not in claims:
claim_to_remove.remove()
for claim in claims:
if claim is not None:
assert isinstance(claim, Claim)
property = claim.mainsnak.property_number
if property not in self.claims:
self.claims[property] = []
if action_if_exists == ActionIfExists.KEEP:
if len(self.claims[property]) == 0:
self.claims[property].append(claim)
elif action_if_exists == ActionIfExists.FORCE_APPEND:
self.claims[property].append(claim)
elif action_if_exists == ActionIfExists.APPEND_OR_REPLACE:
if claim not in self.claims[property]:
self.claims[property].append(claim)
else:
# Force update the claim if already present
self.claims[property][self.claims[property].index(claim)].update(claim)
elif action_if_exists == ActionIfExists.REPLACE_ALL:
if claim not in self.claims[property]:
self.claims[property].append(claim)
return self
def from_json(self, json_data: dict[str, Any]) -> Claims:
for property in json_data:
for claim in json_data[property]:
from wikibaseintegrator.datatypes import BaseDataType
if 'datatype' in claim['mainsnak']:
data_type = [x for x in BaseDataType.subclasses if x.DTYPE == claim['mainsnak']['datatype']][0]
else:
data_type = BaseDataType
self.add(claims=data_type().from_json(claim), action_if_exists=ActionIfExists.FORCE_APPEND)
return self
def get_json(self) -> dict[str, list]:
json_data: dict[str, list] = {}
for property, claims in self.claims.items():
if property not in json_data:
json_data[property] = []
for claim in claims:
if not claim.removed or claim.id:
json_data[property].append(claim.get_json())
if len(json_data[property]) == 0:
del json_data[property]
return json_data
def __iter__(self):
iterate = []
for claim in self.claims.values():
iterate.extend(claim)
return iter(iterate)
def __len__(self):
return len(self.claims)
class Claim(BaseModel):
DTYPE = 'claim'
def __init__(self, qualifiers: Qualifiers | None = None, rank: WikibaseRank | None = None, references: References | list[Claim | list[Claim]] | None = None,
snaktype: WikibaseSnakType = WikibaseSnakType.KNOWN_VALUE) -> None:
"""
:param qualifiers:
:param rank:
:param references: A References object, a list of Claim object or a list of list of Claim object
:param snaktype:
"""
self.mainsnak = Snak(datatype=self.DTYPE, snaktype=snaktype)
self.type = 'statement'
self.qualifiers = qualifiers or Qualifiers()
self.qualifiers_order = []
self.id = None
self.rank = rank or WikibaseRank.NORMAL
self.removed = False
self.references = References()
if isinstance(references, References):
self.references = references
elif isinstance(references, list):
for ref_list in references:
ref = Reference()
if isinstance(ref_list, list):
snaks = Snaks()
for ref_claim in ref_list:
if isinstance(ref_claim, Claim):
snaks.add(Snak().from_json(ref_claim.get_json()['mainsnak']))
else:
raise ValueError("The references must be a References object or a list of Claim object")
ref.snaks = snaks
elif isinstance(ref_list, Claim):
ref.snaks = Snaks().add(Snak().from_json(ref_list.get_json()['mainsnak']))
elif isinstance(ref_list, Reference):
ref = ref_list
self.references.add(reference=ref)
elif references is not None:
raise ValueError("The references must be a References object or a list of Claim object")
@property
def mainsnak(self) -> Snak:
return self.__mainsnak
@mainsnak.setter
def mainsnak(self, value: Snak):
self.__mainsnak = value
@property
def type(self) -> str | dict:
return self.__type
@type.setter
def type(self, value: str | dict):
self.__type = value
@property
def qualifiers(self) -> Qualifiers:
return self.__qualifiers
@qualifiers.setter
def qualifiers(self, value: Qualifiers) -> None:
assert isinstance(value, (Qualifiers, list))
self.__qualifiers: Qualifiers = Qualifiers().set(value) if isinstance(value, list) else value
@property
def qualifiers_order(self) -> list[str]:
return self.__qualifiers_order
@qualifiers_order.setter
def qualifiers_order(self, value: list[str]):
self.__qualifiers_order = value
@property
def id(self) -> str | None:
return self.__id
@id.setter
def id(self, value: str | None):
self.__id = value
@property
def rank(self) -> WikibaseRank:
return self.__rank
@rank.setter
def rank(self, value: WikibaseRank):
"""Parse the rank. The enum throws an error if it is not one of the recognized values"""
self.__rank = WikibaseRank(value)
@property
def references(self) -> References:
return self.__references
@references.setter
def references(self, value: References):
self.__references = value
@property
def removed(self) -> bool:
return self.__removed
@removed.setter
def removed(self, value: bool):
self.__removed = value
def remove(self, remove=True) -> None:
self.removed = remove
def update(self, claim: Claim) -> None:
self.mainsnak = claim.mainsnak
self.qualifiers = claim.qualifiers
self.qualifiers_order = claim.qualifiers_order
self.rank = claim.rank
self.references = claim.references
def from_json(self, json_data: dict[str, Any]) -> Claim:
"""
:param json_data: a JSON representation of a Claim
"""
self.mainsnak = Snak().from_json(json_data['mainsnak'])
self.type = str(json_data['type'])
if 'qualifiers' in json_data:
self.qualifiers = Qualifiers().from_json(json_data['qualifiers'])
if 'qualifiers-order' in json_data:
self.qualifiers_order = list(json_data['qualifiers-order'])
self.id = str(json_data['id'])
self.rank: WikibaseRank = WikibaseRank(json_data['rank'])
if 'references' in json_data:
self.references = References().from_json(json_data['references'])
return self
def get_json(self) -> dict[str, Any]:
json_data: dict[str, str | list[dict] | list[str] | dict[str, str] | dict[str, list] | None] = {
'mainsnak': self.mainsnak.get_json(),
'type': self.type,
'id': self.id,
'rank': self.rank.value
}
# Remove id if it's a temporary one
if not self.id:
del json_data['id']
if len(self.qualifiers) > 0:
json_data['qualifiers'] = self.qualifiers.get_json()
json_data['qualifiers-order'] = list(self.qualifiers_order)
if len(self.references) > 0:
json_data['references'] = self.references.get_json()
if self.removed:
if self.id:
json_data['remove'] = ''
return json_data
def has_equal_qualifiers(self, other: Claim) -> bool:
# check if the qualifiers are equal with the 'other' object
self_qualifiers = copy.deepcopy(self.qualifiers)
other_qualifiers = copy.deepcopy(other.qualifiers)
if len(self_qualifiers) != len(other_qualifiers):
return False
for property_number in self_qualifiers.qualifiers:
if property_number not in other_qualifiers.qualifiers:
return False
if len(self_qualifiers.qualifiers[property_number]) != len(other_qualifiers.qualifiers[property_number]):
return False
flg = [False for _ in range(len(self_qualifiers.qualifiers[property_number]))]
for count, i in enumerate(self_qualifiers.qualifiers[property_number]):
for q in other_qualifiers:
if i == q:
flg[count] = True
if not all(flg):
return False
return True
def reset_id(self):
"""
Reset the ID of the current claim
"""
self.id = None
# TODO: rewrite this?
def __contains__(self, item):
if isinstance(item, Claim):
return self == item
if isinstance(item, str):
return self.mainsnak.datavalue == item
return super().__contains__(item)
def __eq__(self, other):
if isinstance(other, Claim):
return self.mainsnak.datavalue == other.mainsnak.datavalue and self.mainsnak.property_number == other.mainsnak.property_number and self.has_equal_qualifiers(other)
if isinstance(other, str):
return self.mainsnak.property_number == other
raise super().__eq__(other)
def equals(self, that: Claim, include_ref: bool = False, fref: Callable | None = None) -> bool:
"""
Tests for equality of two statements.
If comparing references, the order of the arguments matters!!!
self is the current statement, the next argument is the new statement.
Allows passing in a function to use to compare the references 'fref'. Default is equality.
fref accepts two arguments 'oldrefs' and 'newrefs', each of which are a list of references,
where each reference is a list of statements
"""
if not include_ref:
# return the result of BaseDataType.__eq__, which is testing for equality of value and qualifiers
return self == that
if self != that:
return False
if fref is None:
return Claim.refs_equal(self, that)
return fref(self, that)
@staticmethod
def refs_equal(olditem: Claim, newitem: Claim) -> bool:
"""
tests for exactly identical references
"""
oldrefs = olditem.references
newrefs = newitem.references
def ref_equal(oldref: References, newref: References) -> bool:
return (len(oldref) == len(newref)) and all(x in oldref for x in newref)
return len(oldrefs) == len(newrefs) and all(any(ref_equal(oldref, newref) for oldref in oldrefs) for newref in newrefs)
@abstractmethod
def get_sparql_value(self) -> str:
pass