-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathvip.py
More file actions
486 lines (427 loc) · 16.3 KB
/
vip.py
File metadata and controls
486 lines (427 loc) · 16.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
"""
This package is used to communicate with the VIP RESTful API.
It implements the most basic elements of this Python client.
NB: This is a synchronous implementation.
"""
# Author: Timothée Chabat
# Maintainer: Gaël Vila
# Built-in libraries
from concurrent.futures import ThreadPoolExecutor
from os.path import exists
from pathlib import *
import threading
# Third-Party
import requests
########################### VARIABLES & ERRORS ################################
# -----------------------------------------------------------------------------
# API URL
__PREFIX = "https://vip.creatis.insa-lyon.fr/rest/"
def set_vip_url(vip_portal_url: str) -> None:
"""Change the API prefix"""
global __PREFIX
__PREFIX = vip_portal_url + "/rest/"
# API key
__apikey = None
__headers = {'apikey': __apikey}
# Void `requests` session (inefficient until __api_key is unset)
SESSION = requests.Session() # with retry strategy
SESSION_NO_RETRY = requests.Session() # without retry strategy
# Strategy for retrying requests
retry_strategy = requests.adapters.Retry(
total = 4, # Retry 3 times at most
status_forcelist = [
104, # ConnectionResetError ?
500, # Internal Server Error
502, # Bad Gateway or Proxy Error
503, # Service Unavailable
504 # Gateway Time-out
],
backoff_factor = 8 # retries after 0s, 16s, 32s, 64s
)
# Mount a `requests` Session with the API key and retry strategy
def new_session() -> requests.Session:
"""Creates a new `requests` Session with headers and retry strategy"""
new_session = requests.Session()
new_session.mount(__PREFIX, requests.adapters.HTTPAdapter(max_retries=retry_strategy))
new_session.headers.update(__headers)
return new_session
# Mount a `requests` Session without retry strategy
def new_session_no_retry() -> requests.Session:
"""Creates a new `requests` Session without retry strategy"""
new_session = requests.Session()
new_session.headers.update(__headers)
return new_session
# Parallel downloads are implemented with a multithreading
# strategy for IO-bound operations.
# Maximum number of threads to parallelize
MAX_THREADS = 10
# The `request` Session is not thread-safe:
# must be local to each thread when parallelized.
# Local object to gather thread-safe variables
thread_local = threading.local()
# Function to create a new Session object when initializing the current thread
def init_thread() -> requests.Session:
"""Creates a new thread-safe version of the `requests` Session with a retry strategy"""
assert not hasattr(thread_local, "session")
thread_local.session = new_session()
# -----------------------------------------------------------------------------
def setApiKey(value) -> bool:
"""
Return True is correct apikey, False otherwise.
Raise an error if an other problems occured
"""
url = __PREFIX + 'platform'
head_test = {
'apikey': value,
}
# Send a test request
rq = requests.put(url, headers=head_test)
res = detect_errors(rq)
if res[0]:
# Error
if res[1] == 40101:
return False
else:
raise RuntimeError("Error {} from VIP : {}".format(res[1], res[2]))
else:
# OK
global __apikey, __headers, SESSION, SESSION_NO_RETRY
# Set the API key
__apikey = value
__headers['apikey'] = __apikey
SESSION = new_session()
SESSION_NO_RETRY = new_session_no_retry()
return True
# -----------------------------------------------------------------------------
def detect_errors(req)->tuple:
"""
[0]True if an error, [0]False otherwise
If True, [1] and [2] are error details.
"""
if (not 'content-type' in req.headers or
not req.headers['content-type'].startswith("application/json")):
return (False,)
try:
res = req.json()
except:
return (False,)
else:
if isinstance(res, dict) and \
list(res.keys())==['errorCode', 'errorMessage']:
return (True, res['errorCode'], res['errorMessage'])
return (False,)
# -----------------------------------------------------------------------------
def manage_errors(req)->None:
"""
raise an runtime error if the result of a request is an error message
# TODO: implement better management based on `req.status_code`
"""
res = detect_errors(req)
if res[0]:
raise RuntimeError("Error {} from VIP : {}".format(res[1], res[2]))
################################### PATH ######################################
# -----------------------------------------------------------------------------
def create_dir(path)->bool:
"""
Return True if done, False otherwise
"""
path = path + '/' if path[-1] != '/' else path
url = __PREFIX + 'path' + path
rq = SESSION.put(url, headers=__headers)
try:
manage_errors(rq)
except RuntimeError:
return False
else:
return True
# -----------------------------------------------------------------------------
def create_dir_smart(path)->str:
"""
If 'path' already exist, add a number suffix
'path' should NOT have a '/' at the end
return a path with the same syntax
"""
ind = 0
res_path = path
while exists(res_path):
ind += 1
res_path = path + str(ind)
create_dir(res_path)
return res_path
# -----------------------------------------------------------------------------
def _path_action(path, action) -> requests.models.Response:
"""
Be carefull tho because 'md5' seems to not work.
Also 'content' is not accepted here, use download() function instead.
"""
assert action in ['list', 'exists', 'properties', 'md5']
url = __PREFIX + 'path' + path + '?action=' + action
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq
# -----------------------------------------------------------------------------
def list_content(path) -> list:
return _path_action(path, 'list').json()
# -----------------------------------------------------------------------------
def list_directory(path) -> list:
res = list_content(path)
return [d for d in res if d['isDirectory'] == True]
# -----------------------------------------------------------------------------
def list_elements(path) -> list:
res = list_content(path)
return [e for e in res if e['isDirectory'] != True]
# -----------------------------------------------------------------------------
def exists(path) -> bool:
return _path_action(path, 'exists').json()['exists']
# -----------------------------------------------------------------------------
def get_path_properties(path) -> dict:
return _path_action(path, 'properties').json()
# -----------------------------------------------------------------------------
def is_dir(path) -> bool:
return get_path_properties(path)['isDirectory']
# -----------------------------------------------------------------------------
def delete_path(path)->bool:
"""
Delete a file or a path (with all its content).
Return True if done, False otherwise
"""
url = __PREFIX + 'path' + path
rq = SESSION.delete(url, headers=__headers)
try:
manage_errors(rq)
except RuntimeError:
return False
else:
return True
# -----------------------------------------------------------------------------
def upload(path, where_to_save) -> bool:
"""
- `path` : on local computer, the file to upload
- `where_to_save` : on VIP, something like "/vip/Home/RandomName.ext"
Return True if done, False otherwise
"""
url = __PREFIX + 'path' + where_to_save
headers = {
'apikey': __apikey,
'Content-Type': 'application/octet-stream',
}
with open(path, 'rb') as fid:
data = fid.read()
rq = SESSION.put(url, headers=headers, data=data)
try:
manage_errors(rq)
except RuntimeError:
return False
else:
return True
# -----------------------------------------------------------------------------
def download(path, where_to_save) -> bool :
"""
Downloads a single file from VIP.
- `path`: on VIP, something like "/vip/Home/RandomName.ext", content to dl
- `where_to_save` : on local computer
"""
# Parse arguments
url = __PREFIX + 'path' + path + '?action=content'
rq = SESSION.get(url, headers=__headers, stream=True)
if rq.status_code != 200:
return False
else:
with open(where_to_save, 'wb') as out_file:
out_file.write(rq.content)
return True
# Methods for parallel downloads
# Method to downlad data in a thread-safe session
def download_thread(file: tuple) -> tuple :
"""
Downloads a single file from VIP with a thread-safe session.
- `file` must be in format: (`vip_filename`, `local_filename`)
- `vip_filename`, `local_filename` can be strings or os.PathLike objects.
Returns the Vip path and a success flag.
"""
# Parameters
path, where_to_save = map(str, file)
# URL for request
url = __PREFIX + 'path' + str(path) + '?action=content'
# Parallel download
with (thread_local.session.get(url, headers=__headers, stream=True) as rq,
open(where_to_save, 'wb') as out_file):
# TODO: manage HTTP return code
if rq.status_code != 200:
return file, False
else:
with open(where_to_save, 'wb') as out_file:
out_file.write(rq.content)
return file, True
def download_parallel(files):
"""
Downloads files from VIP in parallel.
- `files`: iterable of tuples in format (`vip_file`, `local_file`)
where file paths can be `str` or `os.PathLike` objects;
- Yields a filename and a success flag as soon as the file is downloaded from VIP.
"""
# Threads are run in a context manager to secure their closing
with ThreadPoolExecutor(
max_workers = min(MAX_THREADS, len(files)), # Number of threads
thread_name_prefix = "vip_requests",
initializer = init_thread # Method to create a thread-safe `requests` Session
) as executor:
# Transparent connexion between executor.map() and the caller of download_parallel()
yield from executor.map(download_thread, files)
def generic_get(endpoint)->list:
url = __PREFIX + endpoint
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq.json()
def generic_put(endpoint,data)->list:
url = __PREFIX + endpoint
rq = SESSION.put(url, headers=__headers, json=data)
manage_errors(rq)
return rq.json()
################################ EXECUTIONS ###################################
# -----------------------------------------------------------------------------
def list_executions()->list:
url = __PREFIX + 'executions'
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq.json()
# -----------------------------------------------------------------------------
def list_examples()->list:
url = __PREFIX + "executions/examples"
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq.json()
# -----------------------------------------------------------------------------
def example_info(id_exec)->dict:
url = __PREFIX + "executions/examples/" + id_exec
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq.json()
# -----------------------------------------------------------------------------
def count_executions()->int:
url = __PREFIX + 'executions/count'
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return int(rq.text)
# -----------------------------------------------------------------------------
def init_exec(pipeline, name="default", inputValues={}, resultsLocation="/vip/Home") -> str:
url = __PREFIX + 'executions'
headers = {
'apikey': __apikey,
'Content-Type': 'application/json'
}
data_ = {
"name": name,
'pipelineIdentifier': pipeline,
"inputValues": inputValues,
"resultsLocation": resultsLocation
}
rq = SESSION.post(url, headers=headers, json=data_)
manage_errors(rq)
return rq.json()["identifier"]
# -----------------------------------------------------------------------------
def init_exec_without_resultsLocation(pipeline, name="default", inputValues={}) -> str:
"""Initiate executions with "results-directory" in the `inputValues`"""
url = __PREFIX + 'executions'
headers = {
'apikey': __apikey,
'Content-Type': 'application/json'
}
data_ = {
"name": name,
'pipelineIdentifier': pipeline,
"inputValues": inputValues
}
rq = requests.post(url, headers=headers, json=data_)
manage_errors(rq)
return rq.json()["identifier"]
# -----------------------------------------------------------------------------
def execution_info(id_exec)->dict:
url = __PREFIX + 'executions/' + id_exec
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq.json()
# -----------------------------------------------------------------------------
def is_running(id_exec)->bool:
info = execution_info(id_exec)
return info['status'] == 'Running'
# -----------------------------------------------------------------------------
def get_exec_stderr(exec_id) -> str:
url = __PREFIX + 'executions/' + exec_id + '/stderr'
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq.text
# -----------------------------------------------------------------------------
def get_exec_stdout(exec_id) -> str:
url = __PREFIX + 'executions/' + exec_id + '/stdout'
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq.text
# -----------------------------------------------------------------------------
def get_exec_results(exec_id, timeout: int=None) -> str:
"""
New version with `timeout` parameter.
If `timeout` is set, `requests will make a single try with timeout
(without the persistent session).
"""
url = __PREFIX + 'executions/' + exec_id + '/results'
try:
# Use the session without retry strategy
rq = SESSION_NO_RETRY.get(url, headers=__headers, timeout=timeout)
# This will throw TimeoutError in case of timeout
except requests.exceptions.ReadTimeout as e:
raise TimeoutError(e) # builtin Python error
manage_errors(rq)
return rq.json()
# -----------------------------------------------------------------------------
def kill_execution(exec_id, deleteFiles=False) -> bool:
url = __PREFIX + 'executions/' + exec_id + '/kill'
if deleteFiles:
url += '?deleteFiles=true'
rq = SESSION.put(url, headers=__headers)
try:
manage_errors(rq)
except RuntimeError:
return False
else:
return True
################################ PIPELINES ####################################
# -----------------------------------------------------------------------------
def list_pipeline()->list:
url = __PREFIX + 'pipelines'
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq.json()
# -----------------------------------------------------------------------------
def pipeline_def(pip_id)->dict:
url = __PREFIX + 'pipelines/' + pip_id
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq.json()
################################## OTHER ######################################
# -----------------------------------------------------------------------------
def platform_info()->dict:
url = __PREFIX + 'platform'
rq = SESSION.get(url, headers=__headers)
manage_errors(rq)
return rq.json()
# -----------------------------------------------------------------------------
def get_apikey(username, password)->str:
"""
username is the email account you used to create your VIP account
"""
url = __PREFIX + 'authenticate'
headers = {
'apikey': __apikey,
'Content-Type': 'application/json'
}
data_ = {
"username": username,
"password": password
}
rq = SESSION.post(url, headers=headers, json=data_)
manage_errors(rq)
return rq.json()['httpHeaderValue']
###############################################################################
if __name__=='__main__':
pass