-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconvert_to_raw.py
More file actions
70 lines (63 loc) · 1.9 KB
/
convert_to_raw.py
File metadata and controls
70 lines (63 loc) · 1.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from test_doubles import (
create_f144_message_double,
create_ep01_message_double,
create_al00_message_double,
create_ad00_message_uint16,
create_da00_message_int32s
)
def convert_to_raw_flatbuffer(item):
schema = item["schema"]
if schema == "f144":
f144_message = create_f144_message_double(
item["source_name"],
item["value"],
item["timestamp"]
)
return f144_message
elif schema == "ep01":
if item["connection_status"] == "CONNECTED":
connection_status = "CONNECTED_unquoted"
else:
connection_status = "DISCONNECTED_unquoted"
ep01_message = create_ep01_message_double(
item["source_name"],
connection_status,
item["timestamp"]
)
return ep01_message
elif schema == "al00":
if item["severity"] == "INVALID":
severity = "INVALID_unquoted"
elif item["severity"] == "MAJOR":
severity = "MAJOR_unquoted"
elif item["severity"] == "MINOR":
severity = "MINOR_unquoted"
else:
severity = "OK_unquoted"
al00_message = create_al00_message_double(
item["source_name"],
item["timestamp"],
severity,
item["message"]
)
return al00_message
elif schema == "ad00":
data = item["data"]
ad00_message = create_ad00_message_uint16(
item["source_name"],
data,
item["timestamp"]
)
return ad00_message
elif schema == "da00":
data = item["data"]
da00_message = create_da00_message_int32s(
item["source_name"],
item["name"],
item["axis_name"],
item["timestamp"],
data,
)
return da00_message
else:
raise ValueError(f"Unknown schema {schema}")