[pypy-svn] r42651 - in pypy/dist/pypy: module/select module/select/test rlib

afa at codespeak.net afa at codespeak.net
Thu May 3 23:56:27 CEST 2007


Author: afa
Date: Thu May  3 23:56:26 2007
New Revision: 42651

Modified:
   pypy/dist/pypy/module/select/interp_select.py
   pypy/dist/pypy/module/select/test/test_select.py
   pypy/dist/pypy/rlib/_rsocket_ctypes.py
Log:
win32 implementation of poll().

Had to change some tests:
- select.select() requires at least one socket, even on CPython
- different behaviour when closing a socket with unread data.


Modified: pypy/dist/pypy/module/select/interp_select.py
==============================================================================
--- pypy/dist/pypy/module/select/interp_select.py	(original)
+++ pypy/dist/pypy/module/select/interp_select.py	Thu May  3 23:56:26 2007
@@ -2,6 +2,7 @@
 from pypy.interpreter.baseobjspace import Wrappable
 from pypy.interpreter.gateway import W_Root, ObjSpace, interp2app
 from pypy.rlib import _rsocket_ctypes as _c
+from ctypes import POINTER, byref
 from pypy.rpython.rctypes.aerrno import geterrno
 from pypy.interpreter.error import OperationError
 
@@ -50,43 +51,136 @@
                                  space.wrap(fd))
     unregister.unwrap_spec = ['self', ObjSpace, W_Root]
 
-    def poll(self, space, w_timeout=None):
-        if space.is_w(w_timeout, space.w_None):
-            timeout = -1
-        else:
-            timeout = space.int_w(w_timeout)
-            
-        numfd = len(self.fddict)
-        buf = _c.create_string_buffer(_c.sizeof(_c.pollfd) * numfd)
-        pollfds = _c.cast(buf, _c.POINTER(_c.pollfd))
-        i = 0
-        for fd, events in self.fddict.iteritems():
-            pollfds[i].fd = fd
-            pollfds[i].events = events
-            i += 1
-
-        # XXX Temporary hack for releasing the GIL
-        GIL = space.threadlocals.getGIL()
-        if GIL is not None: GIL.release()
-        ret = _c.poll(pollfds, numfd, timeout)
-        if GIL is not None: GIL.acquire(True)
-
-        if ret < 0:
-            errno = geterrno()
-            w_module = space.getbuiltinmodule('select')
-            w_errortype = space.getattr(w_module, space.wrap('error'))
-            message = _c.strerror(errno)
-            raise OperationError(w_errortype,
-                                 space.newtuple([space.wrap(errno),
-                                                 space.wrap(message)]))
-        
-        retval_w = []
-        for i in range(numfd):
-            pollfd = pollfds[i]
-            if pollfd.revents:
-                retval_w.append(space.newtuple([space.wrap(pollfd.fd),
-                                                space.wrap(pollfd.revents)]))
-        return space.newlist(retval_w)
+    if hasattr(_c, 'poll'):
+        def poll(self, space, w_timeout=None):
+            if space.is_w(w_timeout, space.w_None):
+                timeout = -1
+            else:
+                timeout = space.int_w(w_timeout)
+
+            numfd = len(self.fddict)
+            buf = _c.create_string_buffer(_c.sizeof(_c.pollfd) * numfd)
+            pollfds = _c.cast(buf, POINTER(_c.pollfd))
+            i = 0
+            for fd, events in self.fddict.iteritems():
+                pollfds[i].fd = fd
+                pollfds[i].events = events
+                i += 1
+
+            # XXX Temporary hack for releasing the GIL
+            GIL = space.threadlocals.getGIL()
+            if GIL is not None: GIL.release()
+            ret = _c.poll(pollfds, numfd, timeout)
+            if GIL is not None: GIL.acquire(True)
+
+            if ret < 0:
+                errno = geterrno()
+                w_module = space.getbuiltinmodule('select')
+                w_errortype = space.getattr(w_module, space.wrap('error'))
+                message = _c.strerror(errno)
+                raise OperationError(w_errortype,
+                                     space.newtuple([space.wrap(errno),
+                                                     space.wrap(message)]))
+
+            retval_w = []
+            for i in range(numfd):
+                pollfd = pollfds[i]
+                if pollfd.revents:
+                    retval_w.append(space.newtuple([space.wrap(pollfd.fd),
+                                                    space.wrap(pollfd.revents)]))
+            return space.newlist(retval_w)
+
+    elif hasattr(_c, 'WSAEventSelect'):
+        # win32 implementation
+        def poll(self, space, w_timeout=None):
+            numfd = len(self.fddict)
+
+            socketevents = _c.ARRAY(_c.WSAEVENT, numfd)()
+
+            numevents = 0
+            eventdict = {}
+
+            for fd, events in self.fddict.iteritems():
+                # select desired events
+                wsaEvents = 0
+                if events & _c.POLLIN:
+                    wsaEvents |= _c.FD_READ | _c.FD_ACCEPT | _c.FD_CLOSE
+                if events & _c.POLLOUT:
+                    wsaEvents |= _c.FD_WRITE | _c.FD_CONNECT | _c.FD_CLOSE
+
+                # if no events then ignore socket
+                if wsaEvents == 0:
+                    continue
+
+ 		# select socket for desired events
+                event = _c.WSACreateEvent()
+                _c.WSAEventSelect(fd, event, wsaEvents)
+
+                eventdict[fd] = event
+ 		socketevents[numevents] = event
+                numevents += 1
+
+            # if no sockets then return immediately
+            if numevents == 0:
+                return space.newlist([])
+
+            # prepare timeout
+            if space.is_w(w_timeout, space.w_None):
+                timeout = -1
+            else:
+                timeout = space.int_w(w_timeout)
+            if timeout < 0:
+                timeout = _c.INFINITE
+
+            # XXX Temporary hack for releasing the GIL
+            GIL = space.threadlocals.getGIL()
+            if GIL is not None: GIL.release()
+            ret = _c.WSAWaitForMultipleEvents(numevents, socketevents,
+                                              False, timeout, False)
+            if GIL is not None: GIL.acquire(True)
+
+            if ret == _c.WSA_WAIT_TIMEOUT:
+                return space.newlist([])
+
+            if ret < 0: # WSA_WAIT_FAILED is unsigned...
+                from pypy.rlib._rsocket_ctypes import socket_strerror, geterrno
+                errno = geterrno()
+                w_module = space.getbuiltinmodule('select')
+                w_errortype = space.getattr(w_module, space.wrap('error'))
+                message = socket_strerror(errno)
+                raise OperationError(w_errortype,
+                                     space.newtuple([space.wrap(errno),
+                                                     space.wrap(message)]))
+
+            retval_w = []
+            info = _c.WSANETWORKEVENTS()
+            for fd, event in eventdict.iteritems():
+                if _c.WSAEnumNetworkEvents(fd, event, byref(info)) < 0:
+                    continue
+                revents = 0
+                if info.lNetworkEvents & _c.FD_READ:
+                    revents |= _c.POLLIN
+                if info.lNetworkEvents & _c.FD_ACCEPT:
+                    revents |= _c.POLLIN
+                if info.lNetworkEvents & _c.FD_WRITE:
+                    revents |= _c.POLLOUT
+                if info.lNetworkEvents & _c.FD_CONNECT:
+                    if info.iErrorCode[_c.FD_CONNECT_BIT]:
+                        revents |= _c.POLLERR
+                    else:
+                        revents |= _c.POLLOUT
+                if info.lNetworkEvents & _c.FD_CLOSE:
+                    if info.iErrorCode[_c.FD_CLOSE_BIT]:
+                        revents |= _c.POLLERR
+                    else:
+                        if self.fddict[fd] & _c.POLLIN:
+                            revents |= _c.POLLIN
+                        if self.fddict[fd] & _c.POLLOUT:
+                            revents |= _c.POLLOUT
+                if revents:
+                    retval_w.append(space.newtuple([space.wrap(fd),
+                                                    space.wrap(revents)]))
+            return space.newlist(retval_w)
     poll.unwrap_spec = ['self', ObjSpace, W_Root]
 
 pollmethods = {}

Modified: pypy/dist/pypy/module/select/test/test_select.py
==============================================================================
--- pypy/dist/pypy/module/select/test/test_select.py	(original)
+++ pypy/dist/pypy/module/select/test/test_select.py	Thu May  3 23:56:26 2007
@@ -4,8 +4,9 @@
 class _AppTestSelect:
     def test_sleep(self):
         import time, select
+        readend, writeend = getpair()
         start = time.time()
-        iwtd, owtd, ewtd = select.select([], [], [], 0.3)
+        iwtd, owtd, ewtd = select.select([readend], [], [], 0.3)
         end = time.time()
         assert iwtd == owtd == ewtd == []
         assert end - start > 0.25
@@ -53,13 +54,16 @@
             readend.close()
 
     def test_close(self):
-        import select
+        import select, sys
         readend, writeend = getpair()
         try:
             try:
                 total_out = writeend.send('x' * 512)
             finally:
-                writeend.close()
+                # win32 sends the 'closed' event immediately, even when
+                # more data is available
+                if sys.platform != 'win32':
+                    writeend.close()
             assert 1 <= total_out <= 512
             total_in = 0
             while True:
@@ -71,6 +75,9 @@
                     break
                 assert data == 'x' * len(data)
                 total_in += len(data)
+                # win32: check that closing the socket exits the loop
+                if sys.platform == 'win32' and total_in == total_out:
+                    writeend.close()
             assert total_in == total_out
         finally:
             readend.close()

Modified: pypy/dist/pypy/rlib/_rsocket_ctypes.py
==============================================================================
--- pypy/dist/pypy/rlib/_rsocket_ctypes.py	(original)
+++ pypy/dist/pypy/rlib/_rsocket_ctypes.py	Thu May  3 23:56:26 2007
@@ -136,6 +136,10 @@
 
 POLLIN POLLPRI POLLOUT POLLERR POLLHUP POLLNVAL
 POLLRDNORM POLLRDBAND POLLWRNORM POLLWEBAND POLLMSG
+
+FD_READ FD_WRITE FD_ACCEPT FD_CONNECT FD_CLOSE
+WSA_WAIT_TIMEOUT WSA_WAIT_FAILED INFINITE
+FD_CONNECT_BIT FD_CLOSE_BIT
 '''.split()
 
 for name in constant_names:
@@ -164,7 +168,13 @@
                         ('INADDR_NONE', 0xffffffff),
                         ('SHUT_RD', 0),
                         ('SHUT_WR', 1),
-                        ('SHUT_RDWR', 2)]
+                        ('SHUT_RDWR', 2),
+                        ('POLLIN', 1),
+                        ('POLLPRI', 2),
+                        ('POLLOUT', 4),
+                        ('POLLERR', 8),
+                        ('POLLHUP', 16),
+                        ]
 for name, default in constants_w_defaults:
     setattr(CConfig, name, ctypes_platform.DefinedConstantInteger(name))
     
@@ -249,6 +259,13 @@
                                             [('fd', socketfd_type),
                                              ('events', c_short),
                                              ('revents', c_short)])
+if _MS_WINDOWS:
+    CConfig.WSAEVENT = ctypes_platform.SimpleType('WSAEVENT', c_void_p)
+    CConfig.WSANETWORKEVENTS = ctypes_platform.Struct('WSANETWORKEVENTS',
+                                  [('lNetworkEvents', c_long),
+                                   ('iErrorCode', c_int * 10), #FD_MAX_EVENTS
+                                   ])
+    
 
 CConfig.timeval = ctypes_platform.Struct('struct timeval',
                                          [('tv_sec', c_long),
@@ -351,6 +368,9 @@
 if _POSIX:
     nfds_t = cConfig.nfds_t
     pollfd = cConfig.pollfd
+if MS_WINDOWS:
+    WSAEVENT = cConfig.WSAEVENT
+    WSANETWORKEVENTS = cConfig.WSANETWORKEVENTS
 timeval = cConfig.timeval
 if MS_WINDOWS:
     fd_set = cConfig.fd_set
@@ -363,7 +383,7 @@
 # functions
 if MS_WINDOWS:
     from ctypes import windll
-    dllname = util.find_library('wsock32')
+    dllname = util.find_library('ws2_32')
     assert dllname is not None
     socketdll = windll.LoadLibrary(dllname)
 else:
@@ -573,6 +593,24 @@
                        POINTER(timeval)]
     select.restype = c_int
 
+    WSACreateEvent = socketdll.WSACreateEvent
+    WSACreateEvent.argtypes = []
+    WSACreateEvent.restype = WSAEVENT
+
+    WSAEventSelect = socketdll.WSAEventSelect
+    WSAEventSelect.argtypes = [socketfd_type, WSAEVENT, c_long]
+    WSAEventSelect.restype = c_int
+
+    WSAWaitForMultipleEvents = socketdll.WSAWaitForMultipleEvents
+    WSAWaitForMultipleEvents.argtypes = [c_long, POINTER(WSAEVENT),
+                                         c_int, c_long, c_int]
+    WSAWaitForMultipleEvents.restype = c_long
+
+    WSAEnumNetworkEvents = socketdll.WSAEnumNetworkEvents
+    WSAEnumNetworkEvents.argtypes = [socketfd_type, WSAEVENT,
+                                     POINTER(WSANETWORKEVENTS)]
+    WSAEnumNetworkEvents.restype = c_int
+
 if MS_WINDOWS:
     WSAData = cConfig.WSAData
     WSAStartup = socketdll.WSAStartup


More information about the pypy-svn mailing list