This repository was archived by the owner on Nov 22, 2022. It is now read-only.
forked from bslatkin/ringbuffer
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathexample_numpy.py
More file actions
executable file
·67 lines (50 loc) · 1.45 KB
/
example_numpy.py
File metadata and controls
executable file
·67 lines (50 loc) · 1.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python3
"""Simple example with numpy arrays."""
import ctypes
import multiprocessing
import numpy
import numpy.matlib
import ringbuffer
def writer(ring):
for i in range(10000):
m = numpy.matlib.randn(25, 100)
x = numpy.ctypeslib.as_ctypes(m)
try:
ring.try_write(x)
except ringbuffer.WaitingForReaderError:
print('Reader is too slow, dropping %r' % x)
continue
if i and i % 100 == 0:
print('Wrote %d so far' % i)
ring.writer_done()
print('Writer is done')
def reader(ring, pointer):
while True:
try:
data = ring.blocking_read(pointer)
except ringbuffer.WriterFinishedError:
break
x = numpy.array(data[0])
x.shape = (25, 100)
x[1, 1] = 1.1 # Verify it's mutable
print('Reader %r is done' % pointer)
def main():
t = (ctypes.c_double * 100) * 25
ring = ringbuffer.RingBuffer(c_type=t, slot_count=100)
ring.new_writer()
processes = [
multiprocessing.Process(target=writer, args=(ring, )),
]
for i in range(10):
processes.append(
multiprocessing.Process(
target=reader, args=(ring, ring.new_reader())))
for p in processes:
p.start()
for p in processes:
print('join')
p.join(timeout=50)
assert not p.is_alive()
assert p.exitcode == 0
if __name__ == '__main__':
main()