-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
95 lines (69 loc) · 2.77 KB
/
main.py
File metadata and controls
95 lines (69 loc) · 2.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from streamsx.topology.topology import Topology
from streamsx.topology import schema
from streamsx.topology.context import *
from streams.observations import observation_stream # Cannot use "as X"
import hashlib
from collections import deque
import threading
from streamsx_health.ingest.Observation import getReadingValue
service_name = "Streaming Analytics-no" # Service name
credentials = {} # Put redentials here
def anonymize(tuple_):
tuple_['patientId'] = hashlib.sha256(tuple_['patientId'].encode('utf - 8')).digest()
tuple_['device']['locationId'] = hashlib.sha256(tuple_['device']['locationId'].encode('utf - 8')).digest()
return tuple_
class Avg(object):
def __init__(self, max_):
self.history_max = max_ # The maximum length of the array we will use to calculate the average
self.last_n = []
# Object is callable
def __call__(self, tuple_):
self.last_n.append(getReadingValue(tuple_))
if len(self.last_n) > self.history_max:
self.last_n.pop(0)
return sum(self.last_n) / len(self.last_n)
def data_collecter(view):
for d in iter(view.get, None):
plot_queue.append(float(d))
# Set up access to Streaming Analytics service
vs = {'streaming-analytics': [{'name': service_name, 'credentials': credentials}]}
cfg = {
ConfigParams.SERVICE_NAME: service_name,
ConfigParams.VCAP_SERVICES: vs
}
# Define data source
# Create Topology and read from data source
topo = Topology()
patient_data_stream = topo.subscribe('ingest-beacon', schema.CommonSchema.Json)
# Anonymize patient
anonymous_patient_stream = patient_data_stream.map(anonymize)
# Filtering data
heart_rate_stream = anonymous_patient_stream.filter(lambda tuple_: (tuple_['reading']['readingType']['code'] == '8867-4'))
# Calculate the patient's heart rate average based on the latest 10 tuples
heart_average_stream = heart_rate_stream.map(Avg(10))
# Creating view
heart_average_view = heart_average_stream.view()
# Print data stream
heart_average_stream.sink(print)
# Submit on Bluemix
submit(ContextTypes.ANALYTICS_SERVICE, topo, cfg)
view = heart_average_view.start_data_fetch()
plot_queue = deque([], 2000)
data_pull_thread = threading.Thread(target=data_collecter, args=(view, ))
data_pull_thread.start()
import time
from IPython import display
# For matlab to work properly on mac, you have to install Python as framework, with pyenv run:
# env PYTHON_CONFIGURE_OPTS="--enable-framework CC=clang" pyenv install 3.5.5
from matplotlib import pylab as pl
pl.rcParams['figure.figsize'] = (14.0, 8.0)
while (True):
pl.clf()
ax = pl.gca()
ax.set_autoscale_on(False)
ax.plot(plot_queue)
ax.axis([0, 2000, 50, 120])
display.display(pl.gcf())
print(len(plot_queue))
display.clear_output(wait=True)
time.sleep(1.0)