-
Notifications
You must be signed in to change notification settings - Fork 0
/
update_subjects.py
150 lines (129 loc) · 4.99 KB
/
update_subjects.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# This script updates existing subjects in bulk from a csv
import csv
import os
import sys
from asnake.client import ASnakeClient
from asnake.client.web_client import ASnakeAuthError
from dotenv import load_dotenv, find_dotenv
from loguru import logger
from pathlib import Path
logger.remove()
log_path = Path(f'./logs', 'update_subjects_{time:YYYY-MM-DD}.log')
logger.add(str(log_path), format="{time}-{level}: {message}")
# Find and load environment-specific .env file
env_file = find_dotenv(f'.env.{os.getenv("ENV", "dev")}')
load_dotenv(env_file)
def client_login(as_api, as_un, as_pw):
"""
Login to the ArchivesSnake client and return client
Args:
as_api (str): ArchivesSpace API URL
as_un (str): ArchivesSpace username - admin rights preferred
as_pw (str): ArchivesSpace password
Returns:
client (ASnake.client object): client object from ASnake.client to allow to connect to the ASpace API
"""
client = ASnakeClient(baseurl=as_api, username=as_un, password=as_pw)
try:
client.authorize()
except ASnakeAuthError as e:
print(f'ERROR authorizing ASnake client: {e}')
logger.error(f'ERROR authorizing ASnake client: {e}')
return ASnakeAuthError
else:
return client
def read_csv(updated_subjects_csv):
"""
Args:
updated_subjects_csv (str): filepath for the subjects csv
Returns:
updated_subjects (list): a list of subjects to update and their metadata based on the csv contents
"""
updated_subjects = []
try:
open_csv = open(updated_subjects_csv, 'r', encoding='UTF-8')
updated_subjects = csv.DictReader(open_csv)
except IOError as csverror:
logger.error(f'ERROR reading csv file: {csverror}')
print(f'ERROR reading csv file: {csverror}')
else:
return updated_subjects
def get_subject(client, existing_subject_id):
"""
Args:
client (ASnake.client object): client object from ASnake.client
existing_subject_id (str): id of the subject to be updated
Returns:
existing_subj (dict): the existing subject returned from ArchivesSpace's API
None (NoneType): if problem retrieving existing subject
"""
existing_subj = client.get(f'subjects/{existing_subject_id}').json()
if 'error' in existing_subj:
logger.error(f'ERROR getting existing subject {existing_subject_id}: {existing_subj}')
print(f'ERROR getting existing subject {existing_subject_id}: {existing_subj}')
else:
return existing_subj
def build_subject(existing_subj, subj):
"""
Builds out the updated subjects based on a mixture of existing subject content and csv content.
Args:
existing_subj (dict): Existing ArchivesSpace subject
subj (dict): subject metadata from csv
Returns:
existing_subj (dict): updated subject ready to post to ArchivesSpace
"""
existing_subj['terms'] = [
{
'term': subj['new_title'],
'term_type': 'cultural_context',
'vocabulary': '/vocabularies/1'
}
]
existing_subj['scope_note'] = subj['new_scope_note']
existing_subj['external_ids'] = [
{
'external_id': subj['new_EMu_ID'],
'source': 'EMu_ID'
}
]
return existing_subj
def update_subject(client, existing_subj_id, data):
"""
Args:
client (ASnake.client object): client object from ASnake.client
existing_subj_id (str): id of the subject to be updated
data (dict): updated subject ready to post to ArchivesSpace
Returns:
update_message (dict): ArchivesSpace API response
"""
update_message = client.post(f'subjects/{existing_subj_id}', json=data).json()
if 'error' in update_message:
logger.error(update_message)
print(f'ERROR: {update_message}')
else:
logger.info(f'{update_message}')
print(f'Updated object data: {update_message}')
return update_message
def main(updated_subjects_csv):
"""
Runs the functions of the script, collecting, building, then updating subject metadata in ArchivesSpace, printing
error messages if they occur
Takes a csv input of existing ASpace subjects to update with the following columns, at minimum:
- aspace_subject_id
- new_title
- new_scope_note
- new_EMu_ID
Args:
updated_subjects_csv (str): filepath for the subjects csv
"""
client = client_login(os.getenv('as_api'), os.getenv('as_un'), os.getenv('as_pw'))
updated_subjects = read_csv(updated_subjects_csv)
for subj in updated_subjects:
existing_subj_id = subj['aspace_subject_id']
existing_subj = get_subject(client, existing_subj_id)
if not existing_subj is None:
data = build_subject(existing_subj, subj)
update_subject(client, existing_subj_id, data)
# Call with `python update_subjects.py <filename>.csv`
if __name__ == "__main__":
main(str(Path(f'{sys.argv[1]}')))