import py import sys queue = py.std.Queue.Queue() def test_gateway(): for i in range(10): yield assert_receiver_callback_is_safe def receiver_callback(data): #py.std.time.sleep(0.1) queue.put(data) def assert_receiver_callback_is_safe(): gw = py.execnet.PopenGateway() channel = gw.newchannel(receiver = queue.put) gw.remote_exec(channel = channel, source =''' import py num = channel.receive() for i in range(num): channel.send(i) #py.std.time.sleep(0.2) ''') number = 100 channel.send(number) wait_finish(channel) assert channel.isclosed() received = [] while not queue.empty(): received.append(queue.get(False)) assert received == range(number) del channel del gw def wait_finish(channel): try: while 1: try: channel.waitclose(0.1) except (IOError, py.error.Error): continue else: return finally: try: channel.gateway.exit() except EOFError: # the gateway receiver callback will get woken up # and see an EOFError exception pass del channel