from socket import * from bluesock import * def test_channel(): ev1 = Channel() ev2 = Channel() lst = [] def g1(): lst.append(0) x = ev2.receive() assert x == 'hello' lst.append(2) ev1.send('world') lst.append(5) def g2(): lst.append(1) ev2.send('hello') lst.append(3) y = ev1.receive() assert y == 'world' lst.append(4) autogreenlet(g1) autogreenlet(g2) mainloop() assert lst == [0, 1, 2, 3, 4, 5] def test_timer(): lst = [] def g1(): sleep(0.1) lst.append(1) sleep(0.2) lst.append(3) def g2(): lst.append(0) sleep(0.2) lst.append(2) sleep(0.2) lst.append(4) autogreenlet(g1) autogreenlet(g2) mainloop() assert lst == [0, 1, 2, 3, 4] def test_socket(): s1 = socket(AF_INET, SOCK_DGRAM) s2 = socket(AF_INET, SOCK_DGRAM) s1.bind(('', INADDR_ANY)) s2.bind(('', INADDR_ANY)) s1.connect(s2.getsockname()) s2.connect(s1.getsockname()) lst = [] def g1(): lst.append(0) x = recv(s1, 5) assert x == 'hello' lst.append(3) sendall(s1, 'world') lst.append(4) def g2(): lst.append(1) sendall(s2, 'hello') lst.append(2) y = recv(s2, 5) assert y == 'world' lst.append(5) autogreenlet(g1) autogreenlet(g2) mainloop() assert lst == [0, 1, 2, 3, 4, 5]