import thread, os from collections import deque import greensock class ThreadChannel(object): # a Channel usable to communicate between a greenlet and # a non-greenlet-based OS thread. Assumes that greenlets run # in the main thread. def __init__(self): self.localchannel = greensock.Channel() def send(self, obj, block=True): res = do_in_main(lambda: self.localchannel.send(obj, block=block)) assert type(res) is bool return res def receive(self): return do_in_main(self.localchannel.receive) def wait_sendable(self): do_in_main(self.localchannel.wait_sendable) def do_in_main(operation): if thread.get_ident() == ID_MAIN: #print 'DIRECT:', operation return operation() else: #print 'INDIRECT:', operation lock = thread.allocate_lock() lock.acquire() answer = [] QUEUE.append((operation, lock, answer)) os.write(PIPE_WRITE, '\x00') # wake up _proxy() #print 'wait for answer...' lock.acquire() # wait for answer result = answer[0] #print 'got answer:', result return result def _proxy(): while True: #print 'proxy waiting' data = greensock.read(PIPE_READ, 1) # wait for signal if not data: return operation, lock, answer = QUEUE.popleft() #print 'proxy woke up:', operation result = operation() #print 'proxy result:', result answer.append(result) lock.release() # signal we are done ID_MAIN = thread.get_ident() def startup(): global PIPE_READ, PIPE_WRITE, QUEUE assert thread.get_ident() == ID_MAIN PIPE_READ, PIPE_WRITE = os.pipe() QUEUE = deque() greensock.autogreenlet(_proxy) def shutdown(): global PIPE_READ, PIPE_WRITE, QUEUE os.close(PIPE_WRITE) os.close(PIPE_READ) del PIPE_READ, PIPE_WRITE, QUEUE