-
Notifications
You must be signed in to change notification settings - Fork 0
/
delete_dometadata.py
273 lines (230 loc) · 12.1 KB
/
delete_dometadata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#!/usr/bin/env python
import io
# This script iterates through all the digital objects in every repository in SI's ArchivesSpace
# instance - except Test, Training, and NMAH-AF, parses them for any data in the following fields:
# agents, dates, extents, languages, notes, and subjects, and then deletes any data within those
# fields except digitized date and uploads the updated digital object back to ArchivesSpace
import json
import jsonlines
from collections import namedtuple
from copy import deepcopy
from http.client import HTTPException
from pathlib import Path
from jsonlines import InvalidLineError
from secrets import *
from asnake.client import ASnakeClient
from asnake.client.web_client import ASnakeAuthError
from loguru import logger
logger.remove()
log_path = Path('../logs', 'delete_dometadata_{time:YYYY-MM-DD}.log')
logger.add(str(log_path), format="{time}-{level}: {message}")
class ArchivesSpace:
def __init__(self, aspace_api, aspace_un, aspace_pw):
"""
Establishes connection to ASnakeClient and runs queries to the ArchivesSpace API
Args:
aspace_api (str): ArchivesSpace API URL
aspace_un (str): ArchivesSpace username - admin rights preferred
aspace_pw (str): ArchivesSpace password
"""
try:
self.aspace_client = ASnakeClient(baseurl=aspace_api, username=aspace_un, password=aspace_pw)
self.aspace_client.authorize()
except ASnakeAuthError as e:
record_error('ArchivesSpace __init__() - Failed to authorize ASnake client', e)
raise ASnakeAuthError
self.repo_info = []
def get_repo_info(self):
"""
Gets all the repository information for an ArchivesSpace instance in a list and assigns it to self.repo_info
Returns:
self.repo_info (list): a list of dictionaries containing all the repository information for an ArchivesSpace
instance
"""
self.repo_info = self.aspace_client.get('repositories').json()
if self.repo_info:
return self.repo_info
print(f'get_repo_info() - There are no repositories in the Archivesspace Instance: {self.repo_info}')
logger.info(f'get_repo_info() - There are no repositories in the Archivesspace Instance: {self.repo_info}')
def get_objects(self, repository_uri, record_type, parameters = ('all_ids', True)):
"""
Intakes a repository URI and returns all the digital object IDs as a list for that repository
Args:
repository_uri (str): the repository URI
record_type (str): the type of record object you want to get (resources, archival_objects, digital_objects,
accessions, etc.)
parameters (tuple): Selected parameter and value: ('all_ids', 'True'), ('page', '#'), and
('id_set',' '1,2,3,etc.') Default is ('all_ids', 'True')
Returns:
digital_objects (list): all the digital object IDs
"""
parameter_options = ['all_ids', 'page', 'id_set']
if parameters[0] not in parameter_options:
record_error('get_objects() - parameter not valid', parameters)
raise ValueError
if parameters[0] == 'all_ids' and not isinstance(parameters[1], bool):
record_error('get_objects() - parameter not valid', parameters)
raise ValueError
if parameters[0] == 'page' and not isinstance(parameters[1], int):
record_error('get_objects() - parameter not valid', parameters)
raise ValueError
if parameters[0] == 'id_set' and not isinstance(parameters[1], str): # TODO: how to handle id_set validation and multiple inputs
record_error('get_objects() - parameter not valid', parameters)
raise ValueError
digital_objects = self.aspace_client.get(f'{repository_uri}/{record_type}?{parameters[0]}={parameters[1]}').json()
return digital_objects
def get_object(self, record_type, object_id, repo_uri = ''):
"""
Get and return a digital object JSON metadata from its URI
Args:
record_type (str): the type of record object you want to get (resources, archival_objects, digital_objects,
accessions, etc.)
object_id (int): the original object ArchivesSpace ID
repo_uri (str): the repository ArchivesSpace URI including ending forward slash, default is None
Returns:
do_json (dict): the JSON metadata for a digital object
"""
try:
object_json = self.aspace_client.get(f'{repo_uri}/{record_type}/{object_id}').json()
except HTTPException as get_error:
record_error('get_object() - Unable to retrieve object', get_error)
else:
if 'error' in object_json:
record_error('get_object() - Unable to retrieve object with provided URI', object_json)
else:
return object_json
def update_object(self, object_uri, updated_json):
"""
Posts the updated JSON metadata for the given object_uri to ArchivesSpace
Args:
object_uri (str): the original object's URI for posting to the client
updated_json (dict): the updated metadata for the object
Returns:
update_message (dict): ArchivesSpace response or None if an error was encountered and logged
"""
update_message = self.aspace_client.post(f'{object_uri}', json=updated_json).json()
if 'error' in update_message:
record_error('update_object() - Update failed due to following error', update_message)
return None
return update_message
def record_error(message, status_input):
"""
Prints and logs an error message and the code/parameters causing the error
Args:
message (str): message to prefix the error code
status_input (str, tuple, bool): error code or input parameters producing the error
"""
try:
print(f'{message}: {status_input}')
logger.error(f'{message}: {status_input}')
except TypeError as input_error:
print(f'record_error() - Input is invalid for recording error: {input_error}')
logger.error(f'record_error() - Input is invalid for recording error: {input_error}')
# def read_csv(delete_domd_csv):
# """
# Takes a csv input of ASpace digital objects - ran from SQL query - and returns a list of dictionaries of all the
# digital objects metadata
#
# Args:
# delete_domd_csv (str): filepath for the delete digital object metadata csv containing metadata for all digital
# objects to edit
#
# Returns:
# digital_objects (list): a list of dictionaries for each column name (key) and row values (value)
# """
# digital_objects = []
# try:
# open_csv = open(delete_domd_csv, 'r', encoding='UTF-8')
# digital_objects = csv.DictReader(open_csv)
# except IOError as csverror:
# logger.error(f'ERROR reading csv file: {csverror}')
# print(f'ERROR reading csv file: {csverror}')
# else:
# return digital_objects
def write_to_file(filepath, write_data):
"""
Writes or appends JSON data to a specified file using jsonlines
Args:
filepath (str): the path of the file being written to
write_data (str): the data to be written on the given filepath
"""
try:
with jsonlines.open(filepath, mode='a') as org_data_file:
try:
org_data_file.write(write_data)
except InvalidLineError as bad_write_error:
record_error('write_to_file() - Unable to write data to file', bad_write_error)
org_data_file.close()
except (FileNotFoundError, PermissionError, OSError) as write_file_error:
record_error('write_to_file() - Unable to open or access jsonl file', write_file_error)
def parse_delete_fields(object_json):
"""
Iterate through digital object JSON for specific fields and if the field is found, add it to a fields_to_delete list
as a named tuple, with Field and Subrecord, returning the list
Args:
object_json (dict): metadata for the specific object in JSON
Returns:
fields_to_delete (list): list of named tuples (named DeleteField), with Field being the name of the field
parsed (ex. dates, extents, etc.) and Subrecord being the dictionary subrecord of the field, with multiple
subrecords added
"""
fields_to_check = ['linked_agents', 'dates', 'extents', 'lang_materials', 'notes', 'subjects']
fields_to_delete = []
DeleteField = namedtuple('DeleteField', 'Field Subrecord')
for field in fields_to_check:
if field in object_json:
if object_json[f'{field}']:
for subrecord in object_json[f'{field}']:
if 'label' in subrecord and field == 'dates':
if not subrecord['label'] == 'digitized':
fields_to_delete.append(DeleteField(field, subrecord))
else:
fields_to_delete.append(DeleteField(field, subrecord))
return fields_to_delete
def delete_field_info(object_json, field, subrecord):
"""
Take the digital object JSON metadata and remove the given subrecord from the given field, returning the updated
JSON
Args:
object_json (dict): metadata for the specific object in JSON
field (str): the name of the field (key) to remove its data (value)
subrecord (any): the subrecord for the field, ex. a single subrecord for dates if more than one subrecord exists
Returns:
updated_json (dict): updated metadata for the object in JSON with the data for the selected field deleted
"""
updated_json = deepcopy(object_json)
updated_json[field].remove(subrecord)
return updated_json
def main():
"""
Runs the functions of the script by creating an ArchivesSpacec class instance, getting the repository info for every
repository, then all the digital object IDs per each repository, gets the JSON data for the digital object, deletes
all information not contained within the Basic Information or File Version sections and posts the updated JSON to
ArchivesSpace, saving the old JSON data in a separate file.
"""
donotrun_repos = ['Test', 'TRAINING', 'NMAH-AF']
original_do_json_data = str(Path('../test_data', 'delete_dometadata_original_data.jsonl'))
archivesspace_instance = ArchivesSpace(as_api_stag, as_un, as_pw) # TODO: replace as_api_stag with as_api_prod
archivesspace_instance.get_repo_info()
for repo in archivesspace_instance.repo_info:
if repo['repo_code'] not in donotrun_repos:
all_digital_object_ids = archivesspace_instance.get_objects(repo['uri'], 'digital_objects')
if len(all_digital_object_ids) > 0:
for do_id in all_digital_object_ids:
digital_object_json = archivesspace_instance.get_object('digital_objects', do_id,
repo['uri'])
delete_fields = parse_delete_fields(digital_object_json)
if delete_fields:
updated_digital_object_json = deepcopy(digital_object_json)
for field in delete_fields:
updated_digital_object_json = delete_field_info(updated_digital_object_json,
field.Field,
field.Subrecord)
write_to_file(original_do_json_data, digital_object_json)
update_response = archivesspace_instance.update_object(updated_digital_object_json['uri'],
updated_digital_object_json)
if update_response:
print(f'Updated {updated_digital_object_json["uri"]}: {update_response}')
logger.info(f'Updated {updated_digital_object_json["uri"]}: {update_response}')
if __name__ == "__main__":
main()