This repository has been archived by the owner on Sep 24, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
vocabulary.py
419 lines (397 loc) · 20 KB
/
vocabulary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
from rdflib import Graph, URIRef, Namespace, RDF
from rdflib.namespace import SKOS, XSD, OWL, DC
import logging
import copy
import re
import unicodedata
import unidecode
class Vocabulary():
def __init__(self, vocabulary_code, language_codes):
if vocabulary_code.startswith('slm'):
self.target_vocabulary_code = "slm"
elif vocabulary_code == 'seko':
self.target_vocabulary_code = vocabulary_code
else:
self.target_vocabulary_code = "yso"
self.language_codes = language_codes
#tallennetaan deprekoitujen käsitteiden ja niiden voimassaolevien korvaajien URIt
self.geographical_concepts = set()
#ketjutetut maantieteelliset termit:
self.geographical_chained_labels = set()
#sisältää deprekoidut käsitteet avaimina ja arvoina lista korvaajista
self.deprecated_concepts = {}
self.aggregate_concepts = set()
#sisältää pref- ja altLabelit:
self.labels = {}
#labelit pienillä kirjaimilla case-insensitive-hakua varten
self.labels_lowercase = {}
#labelit ilman diakriittejä diakriitittömän hakua varten
self.stripped_labels = {}
#asiasanat, joilla tarkenteellisia ja tarkenteettomia muotoja:
self.labels_with_and_without_specifiers = {}
#asiasanat, joilla vain tarkenteellisia muotoja:
self.labels_with_specifiers = {}
#vastinsanat toisella kielellä (SLM-sanastoon):
self.translations = {}
self.dct = Namespace("http://purl.org/dc/terms/")
self.namespace = 'http://www.yso.fi/onto/yso/'
self.nodes = [] #for temporary use
def parse_musa_vocabulary(self, g, secondary_graph):
"""
g: käsiteltävän sanaston graafi
secondary_graph: Ysa-sanaston graafi
"""
#sisältää pelkät prefLabelit (tarvitaan poikkeustapauksiin, jossa voi olla sama termi pref- ja altLabelina):
exact_matches = {}
for conc in secondary_graph.subjects(RDF.type, SKOS.Concept):
matches = secondary_graph.preferredLabel(conc, labelProperties=[SKOS.exactMatch])
uris = set()
for m in matches:
#lisää YSO-linkit:
if self.namespace in str(m[1]):
uris.add(str(m[1]))
if not uris:
matches = secondary_graph.preferredLabel(conc, labelProperties=[SKOS.closeMatch])
for m in matches:
if self.namespace in str(m[1]):
uris.add(str(m[1]))
exact_matches.update({str(conc): uris})
for conc in g.subjects(RDF.type, SKOS.Concept):
replaced_by = g.preferredLabel(conc, labelProperties=[self.dct.isReplacedBy])
replacer = None
replacers = set()
for rb in replaced_by:
#HUOM! oletetaan, että musa-käsitteillä on vain yksi korvaaja:
replacer = str(rb[1])
if replacer in exact_matches:
for em in exact_matches[replacer]:
replacers.add(em)
for lc in self.language_codes:
alt_labels = g.preferredLabel(conc, lang=lc, labelProperties=[SKOS.altLabel])
for al in alt_labels:
alt_label = str(al[1])
uris = copy.copy(replacers)
if alt_label in self.labels:
self.labels[alt_label].update(uris)
else:
self.labels.update({alt_label: uris})
pref_label = g.preferredLabel(conc, lang=lc)
if pref_label:
pref_label = str(pref_label[0][1])
uris = copy.copy(replacers)
if pref_label in self.labels:
self.labels[pref_label].update(uris)
else:
self.labels.update({pref_label: uris})
self.create_additional_dicts()
def parse_yso_vocabulary(self, g):
aggregateconceptscheme = URIRef("http://www.yso.fi/onto/yso/aggregateconceptscheme")
deprecated_temp = {} #väliaikainen sanasto deprekoiduille käsitteille ja niiden seuraajille
for conc in g.subjects(RDF.type, SKOS.Concept):
uri = str(conc)
in_scheme = g.preferredLabel(conc, labelProperties=[SKOS.inScheme])
for scheme in in_scheme:
if scheme[1] == aggregateconceptscheme:
self.aggregate_concepts.add(uri)
#kerätään ensin deprekoitujen käsitteiden seuraajat
deprecated = g.preferredLabel(conc, labelProperties=[OWL.deprecated])
if deprecated:
replaced_by = g.preferredLabel(conc, labelProperties=[self.dct.isReplacedBy])
for rb in replaced_by:
replacer = str(rb[1])
if uri in deprecated_temp:
deprecated_temp[uri].append(replacer)
else:
deprecated_temp.update({uri: [replacer]})
else:
for lc in self.language_codes:
pref_label = g.preferredLabel(conc, lang=lc)
if pref_label:
pref_label = str(pref_label[0][1])
if uri in self.labels:
self.labels[uri].update({lc: pref_label})
else:
self.labels.update({uri: {lc: pref_label}})
#selvitetään deprekoitujen käsitteiden korvaajat:
for dc in deprecated_temp:
replacers = []
self.get_replacers(deprecated_temp, dc, replacers)
self.deprecated_concepts.update({dc: replacers})
def get_replacers(self, deprecated_dict, concept_uri, replacers):
if concept_uri in deprecated_dict:
for replacer in deprecated_dict[concept_uri]:
if not replacer in deprecated_dict:
replacers.append(replacer)
self.get_replacers(deprecated_dict, replacer, replacers)
def parse_origin_vocabulary(self, g):
#sisältää pelkät prefLabelit (tarvitaan poikkeustapauksiin, jossa voi olla sama termi pref- ja altLabelina):
geographical_namespaces = [URIRef("http://www.yso.fi/onto/ysa-meta/GeographicalConcept"),
URIRef("http://www.yso.fi/onto/allars-meta/GeographicalConcept")]
for conc in g.subjects(RDF.type, SKOS.Concept):
is_geographical = False
rdf_types = g.preferredLabel(conc, labelProperties=[RDF.type])
for rdf_type in rdf_types:
try:
if rdf_type[1] in geographical_namespaces:
is_geographical = True
except IndexError:
logging.info("viallinen käsite: %s" %conc)
for lc in self.language_codes:
alt_labels = g.preferredLabel(conc, lang=lc, labelProperties=[SKOS.altLabel])
matches = g.preferredLabel(conc, labelProperties=[SKOS.exactMatch])
uris = set()
for m in matches:
#lisätään YSO-vastineiden linkit:
if self.namespace in str(m[1]):
uris.add(str(m[1]))
if is_geographical:
self.geographical_concepts.add(str(m[1]))
if not uris:
matches = g.preferredLabel(conc, labelProperties=[SKOS.closeMatch])
for m in matches:
if self.namespace in str(m[1]):
uris.add(str(m[1]))
if is_geographical:
self.geographical_concepts.add(str(m[1]))
for al in alt_labels:
alt_label = str(al[1])
uris = copy.copy(uris)
if "--" in alt_label and is_geographical:
self.geographical_chained_labels.add(alt_label)
if alt_label in self.labels:
self.labels[alt_label].update(uris)
else:
self.labels.update({alt_label: uris})
pref_label = g.preferredLabel(conc, lang=lc)
if pref_label:
pref_label = str(pref_label[0][1])
uris = copy.copy(uris)
if pref_label in self.labels:
self.labels[pref_label].update(uris)
else:
self.labels.update({pref_label: uris})
if "--" in pref_label and is_geographical:
self.geographical_chained_labels.add(pref_label)
self.create_additional_dicts()
def parse_label_vocabulary(self, g):
"""
muodostaa sanastolle, joka sisältää vain käsitteiden labelit ja niiden prefLabelit
"""
temp_labels = {}
for lc in self.language_codes:
self.labels.update({lc: {}})
self.labels_lowercase.update({lc: {}})
self.stripped_labels.update({lc: {}})
self.labels_with_and_without_specifiers.update({lc: {}})
self.labels_with_specifiers.update({lc: {}})
temp_labels.update({lc: {}})
#SLM:n deprekoidut käsitteet laitetaan altLabeleihin
for conc in g.subjects(RDF.type, SKOS.Concept):
uri = str(conc)
for lc in self.language_codes:
pref_label = g.preferredLabel(conc, lang=lc)
if pref_label:
pref_label = str(pref_label[0][1])
if len(self.language_codes) > 1:
if uri in self.translations:
self.translations[uri].update({lc: pref_label})
else:
self.translations.update({uri: {lc: pref_label}})
if pref_label in self.labels[lc]:
self.labels[lc][pref_label]["pref_label"].add(pref_label)
self.labels[lc][pref_label]["uris"].add(uri)
else:
self.labels[lc].update({pref_label: {"pref_label": {pref_label}, "uris":{uri}}})
alt_labels = g.preferredLabel(conc, lang=lc, labelProperties=[SKOS.altLabel])
for al in alt_labels:
alt_label = str(al[1])
if alt_label in self.labels[lc]:
self.labels[lc][alt_label]["pref_label"].add(pref_label)
self.labels[lc][alt_label]["uris"].add(uri)
else:
self.labels[lc].update({alt_label: {"pref_label": {pref_label}, "uris":{uri}}})
for lc in self.language_codes:
for label in self.labels[lc]:
label_info = self.labels[lc][label]
ll = label.lower()
pref_labels = label_info['pref_label'].copy()
uris = label_info['uris'].copy()
if ll in self.labels_lowercase[lc]:
self.labels_lowercase[lc][ll]["pref_label"].update(pref_labels)
self.labels_lowercase[lc][ll]["uris"].update(uris)
else:
self.labels_lowercase[lc].update({ll: {"pref_label": pref_labels, "uris":uris}})
for label in self.labels[lc]:
label_info = self.labels[lc][label]
ll = self.remove_diacritical_chars(label).lower()
pref_labels = label_info['pref_label'].copy()
uris = label_info['uris'].copy()
if ll in self.stripped_labels[lc]:
self.stripped_labels[lc][ll]["pref_label"].update(pref_labels)
self.stripped_labels[lc][ll]["uris"].update(uris)
else:
self.stripped_labels[lc].update({ll: {"pref_label": pref_labels, "uris":uris}})
#tehdään sanasto termeille, joilla on sulkutarkenteellinen ja sulkutarkenteeton muoto:
stripped_label = re.sub("[\(].*?[\)]", "", ll)
stripped_label = stripped_label.strip()
pref_labels = label_info['pref_label'].copy()
uris = label_info['uris'].copy()
if stripped_label in temp_labels[lc]:
temp_labels[lc][stripped_label]["pref_label"].update(pref_labels)
temp_labels[lc][stripped_label]["uris"].update(uris)
else:
temp_labels[lc].update({stripped_label: {"pref_label": pref_labels, "uris":uris}})
for lc in self.language_codes:
for tl in temp_labels[lc]:
if tl in self.stripped_labels[lc]:
if len(temp_labels[lc][tl]) > len(self.stripped_labels[lc][tl]):
self.labels_with_and_without_specifiers[lc].update({tl: temp_labels[lc][tl]})
else:
self.labels_with_specifiers[lc].update({tl: temp_labels[lc][tl]})
def create_additional_dicts(self):
#luo sanahakuja varten 2 dictionaryä, joissa avaimet pienillä kirjaimilla ja ilman diakriittejä
temp_labels = {}
for label in self.labels:
uris = copy.copy(self.labels[label])
ll = label.lower()
if ll in self.labels_lowercase:
self.labels_lowercase[ll].update(uris)
else:
self.labels_lowercase.update({ll: uris})
#sanasto ilman diakriittejä:
uris = copy.copy(uris)
stripped_label = self.remove_diacritical_chars(label).lower()
if stripped_label in self.stripped_labels:
self.stripped_labels[stripped_label].update(uris)
else:
self.stripped_labels.update({stripped_label: uris})
#sanasto ilman diakriittejä ja sulkutarkenteita:
stripped_label = re.sub("[\(].*?[\)]", "", stripped_label)
stripped_label = stripped_label.strip()
uris = copy.copy(uris)
if stripped_label in temp_labels:
temp_labels[stripped_label].update(uris)
else:
temp_labels.update({stripped_label: uris})
for tl in temp_labels:
if tl in self.stripped_labels:
if len(temp_labels[tl]) > len(self.stripped_labels[tl]):
self.labels_with_and_without_specifiers.update({tl: temp_labels[tl]})
else:
self.labels_with_specifiers.update({tl: temp_labels[tl]})
def get_concept_with_uri(self, uri, language):
#muutetaan kaksikirjaimiset kielikoodit kolmikirjaimiseksi sanastokoodia varten:
if uri in self.deprecated_concepts:
replacers = self.deprecated_concepts[uri]
valid_uris = []
if replacers:
for r in replacers:
if not r in self.deprecated_concepts:
valid_uris.append(r)
if len(valid_uris) == 0 or len(valid_uris) > 1:
raise ValueError("2")
if valid_uris:
label = self.labels[valid_uris[0]][language]
return {"label": label, "uris": valid_uris, "code": self.target_vocabulary_code + "/" + self.convert_to_ISO_639_2(language)}
else:
raise ValueError("2")
elif uri in self.labels:
if language in self.labels[uri]:
label = self.labels[uri][language]
return {"label": label, "uris": [uri], "code": self.target_vocabulary_code + "/" + self.convert_to_ISO_639_2(language)}
def get_concept_with_label(self, label, language):
"""
label: haettavan käsiten pref- tai altLabel
language: kieliversio, jota haetaan
all_languages: jos True, niin tuotetaan kummatkin kieliversiot
"""
uris = []
valid_uris = []
labels = {}
pref_labels = []
"""
Poistettu virhe 5: jos löytyy täsmälleen yksi sulkutarkenteeton muoto pref- tai altLabelina, niin konvertoidaan tähän labeliin
if label.lower() in self.labels_with_and_without_specifiers:
uris = self.labels_with_and_without_specifiers[label.lower()]["uris"]
if len(uris) > 1:
raise ValueError("5")
"""
if label in self.labels[language]:
uris = self.labels[language][label]["uris"]
labels = self.labels[language][label]["pref_label"]
elif label.lower() in self.labels_lowercase[language]:
uris = self.labels_lowercase[language][label.lower()]["uris"]
labels = self.labels_lowercase[language][label.lower()]["pref_label"]
elif label.lower() in self.stripped_labels[language]:
uris = self.stripped_labels[language][label.lower()]["uris"]
labels = self.stripped_labels[language][label.lower()]["pref_label"]
elif label.lower() in self.labels_with_specifiers[language]:
uris = self.labels_with_specifiers[language][label.lower()]["uris"]
if len(uris) > 1:
raise ValueError("4")
elif len(uris) == 1:
raise ValueError("3")
for uri in uris:
if uri not in self.deprecated_concepts:
valid_uris.append(uri)
for l in labels:
pref_labels.append(l)
if valid_uris:
return {"label": pref_labels[0], "uris": valid_uris, "code": self.target_vocabulary_code + "/" + self.convert_to_ISO_639_2(language)}
def get_uris_with_concept(self, concept):
uris = []
"""
Poistettu virhe 5: jos löytyy täsmälleen yksi sulkutarkenteeton muoto pref- tai altLabelina, niin konvertoidaan tähän labeliin
if concept.lower() in self.labels_with_and_without_specifiers:
uris = self.labels_with_and_without_specifiers[concept.lower()]
if len(uris) > 1:
raise ValueError("5")
"""
if concept in self.labels:
uris = self.labels[concept]
elif concept.lower() in self.labels_lowercase:
uris = self.labels_lowercase[concept.lower()]
elif concept.lower() in self.stripped_labels:
uris = self.stripped_labels[concept.lower()]
elif concept.lower() in self.labels_with_specifiers:
uris = self.labels_with_specifiers[concept.lower()]
if len(uris) > 1:
raise ValueError("4")
elif len(uris) == 1:
raise ValueError("3")
valid_uris = []
for uri in uris:
if uri not in self.deprecated_concepts:
valid_uris.append(uri)
if valid_uris:
return {"uris": valid_uris}
def translate_label(self, uri, language):
translated_label = None
if language == "fi":
other_language = "sv"
if language == "sv":
other_language = "fi"
if self.target_vocabulary_code == "slm":
if other_language in self.translations[uri]:
translated_label = self.translations[uri][other_language]
if self.target_vocabulary_code == "yso":
if other_language in self.labels[uri]:
translated_label = self.labels[uri][other_language]
if translated_label:
return {"label": translated_label, "uris": [uri], "code": self.target_vocabulary_code + "/" + self.convert_to_ISO_639_2(other_language)}
def remove_diacritical_chars(self, word):
#poistaa tarkkeet kaikista muista merkeistä paitsi å, ä, ö
result = ""
for letter in word:
match = re.match(r'.*([0-9a-zA-ZåäöÅÄÖ\- \'])', letter)
if match:
result += letter
else:
result += unidecode.unidecode(letter)
return result
def convert_to_ISO_639_2(self, code):
if code == "fi":
code = "fin"
if code == "sv":
code = "swe"
return code