-
Notifications
You must be signed in to change notification settings - Fork 0
/
departements.py
121 lines (99 loc) · 5.52 KB
/
departements.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import csv
import logging
import time
from datetime import datetime
from wikibaseintegrator import WikibaseIntegrator, wbi_fastrun, wbi_helpers, wbi_login
from wikibaseintegrator.datatypes import ExternalID, Item, Quantity, Time
from wikibaseintegrator.entities import ItemEntity
from wikibaseintegrator.wbi_config import config as wbi_config
from wikibaseintegrator.wbi_enums import ActionIfExists, WikibaseRank
from wikibaseintegrator.wbi_exceptions import MWApiError
# Import local config for user and password
import config
wbi_config['USER_AGENT'] = 'Update French Population'
# login object
login_instance = wbi_login.Login(user=config.user, password=config.password)
wbi = WikibaseIntegrator(login=login_instance, is_bot=True)
logging.basicConfig(level=logging.DEBUG)
qualifiers = [
Time(prop_nr='P585', time=config.point_in_time), # point in time
Item(prop_nr='P459', value='Q39825') # determination method: census
]
references = [
[
Item(value=config.stated_in, prop_nr='P248') # stated in: Populations légales 2020
]
]
base_filter = [
Item(prop_nr='P31', value='Q6465'), # instance of department of France
Item(prop_nr='P17', value='Q142'), # country France
ExternalID(prop_nr='P2586') # INSEE department code
]
print('Creating fastrun container')
frc = wbi_fastrun.get_fastrun_container(base_filter=base_filter)
skip_to_insee = 0
print('Start parsing CSV')
with open('annees/' + config.year + '/donnees_departements.csv', newline='', encoding='utf-8') as csvfile:
spamreader = csv.reader(csvfile, delimiter=';')
start_time = time.time()
for row in spamreader:
if row[0].isnumeric():
code_insee = row[2]
if int(code_insee.replace('A', '0').replace('B', '0')) > skip_to_insee:
population = int(row[7]) # PMUN
# Search the Wikidata Element for this commune
id_items = frc.get_entities(claims=[ExternalID(prop_nr='P2586', value=str(code_insee))], cache=True)
if not id_items:
continue
id_item = None
final_items = id_items.copy()
if not len(id_items) == 1:
for item in id_items:
test_item = ItemEntity(api=wbi).get(item)
claims = test_item.claims.get('P31') # instance of
for claim in claims:
if claim.mainsnak.datavalue['value']['id'] == 'Q484170': # commune of France (Q484170)
if 'P580' in claim.qualifiers_order and 'P582' not in claim.qualifiers_order: # start time (P580) and end time (P582)
d = datetime.strptime(claim.qualifiers.get('P580')[0].datavalue['value']['time'].replace('-00-00T', '-01-01T'), '+%Y-%m-%dT00:00:00Z')
census = datetime.strptime(config.point_in_time, '+%Y-%m-%dT00:00:00Z')
if d.time() >= census.time():
id_item = item
break
if 'P582' in claim.qualifiers_order: # end time (P582)
final_items.discard(item) # If the item have an end time, we remove it from the list
else:
continue
break
if not id_item and len(final_items) == 1: # if only one item remains, we take it
id_item = final_items.pop()
if id_item:
wb_item = ItemEntity(api=wbi).get(id_item)
# Check if a write is needed or not
write_needed = True
for claim in wb_item.claims.get('P1082'):
if claim.rank == WikibaseRank.PREFERRED:
for qualifier in claim.qualifiers.get('P585'):
if qualifier == qualifiers[0].mainsnak and claim.mainsnak.datavalue['value']['amount'] == wbi_helpers.format_amount(population):
write_needed = False
break
else:
continue
break
# Set the preferred rank of others claims to normal
for claim in wb_item.claims.get('P1082'):
claim.rank = WikibaseRank.NORMAL
# Create the claim to add with references, qualifiers and preferred rank
wb_item.claims.add(claims=Quantity(amount=population, prop_nr='P1082', references=references, qualifiers=qualifiers, rank=WikibaseRank.PREFERRED),
action_if_exists=ActionIfExists.APPEND_OR_REPLACE)
if write_needed:
logging.debug(f'Write to Wikidata for {row[3]} ({row[2]})')
try:
logging.debug('write')
wb_item.write(summary='Update population for ' + config.year, limit_claims=['P1082'])
except MWApiError as e:
logging.debug(e)
# finally:
# exit(0)
else:
logging.debug(f'Skipping {row[3]} ({row[2]})')
print("--- %s seconds ---" % (time.time() - start_time))