[pypy-svn] r42648 - pypy/dist/pypy/module/select/test

afa at codespeak.net afa at codespeak.net
Thu May 3 22:38:23 CEST 2007


Author: afa
Date: Thu May  3 22:38:23 2007
New Revision: 42648

Modified:
   pypy/dist/pypy/module/select/test/test_select.py
Log:
Refactor test_select.py, to allow multiple kinds of selectable objects:
- pipes created with os.pipe()
- connected sockets, but without using socketpair(), to have a chance to make it work on win32.


Modified: pypy/dist/pypy/module/select/test/test_select.py
==============================================================================
--- pypy/dist/pypy/module/select/test/test_select.py	(original)
+++ pypy/dist/pypy/module/select/test/test_select.py	Thu May  3 22:38:23 2007
@@ -1,14 +1,7 @@
 import py, sys
 from pypy.conftest import gettestobjspace
 
-class AppTestSelect:
-    def setup_class(cls):
-        if sys.platform == 'win':
-            py.test.skip("select() doesn't work with pipes, "
-                         "we would need tests using sockets")
-        space = gettestobjspace(usemodules=('select',))
-        cls.space = space
-
+class _AppTestSelect:
     def test_sleep(self):
         import time, select
         start = time.time()
@@ -18,22 +11,22 @@
         assert end - start > 0.25
 
     def test_readable(self):
-        import os, select
-        readend, writeend = os.pipe()
+        import select
+        readend, writeend = getpair()
         try:
             iwtd, owtd, ewtd = select.select([readend], [], [], 0)
             assert iwtd == owtd == ewtd == []
-            os.write(writeend, 'X')
+            writeend.send('X')
             iwtd, owtd, ewtd = select.select([readend], [], [])
             assert iwtd == [readend]
             assert owtd == ewtd == []
         finally:
-            os.close(writeend)
-            os.close(readend)
+            writeend.close()
+            readend.close()
 
     def test_write_read(self):
-        import os, select
-        readend, writeend = os.pipe()
+        import select
+        readend, writeend = getpair()
         try:
             total_out = 0
             while True:
@@ -42,7 +35,7 @@
                 if owtd == []:
                     break
                 assert owtd == [writeend]
-                total_out += os.write(writeend, 'x' * 512)
+                total_out += writeend.send('x' * 512)
             total_in = 0
             while True:
                 iwtd, owtd, ewtd = select.select([readend], [], [], 0)
@@ -50,45 +43,45 @@
                 if iwtd == []:
                     break
                 assert iwtd == [readend]
-                data = os.read(readend, 4096)
+                data = readend.recv(4096)
                 assert len(data) > 0
                 assert data == 'x' * len(data)
                 total_in += len(data)
             assert total_in == total_out
         finally:
-            os.close(writeend)
-            os.close(readend)
+            writeend.close()
+            readend.close()
 
     def test_close(self):
-        import os, select
-        readend, writeend = os.pipe()
+        import select
+        readend, writeend = getpair()
         try:
             try:
-                total_out = os.write(writeend, 'x' * 512)
+                total_out = writeend.send('x' * 512)
             finally:
-                os.close(writeend)
+                writeend.close()
             assert 1 <= total_out <= 512
             total_in = 0
             while True:
                 iwtd, owtd, ewtd = select.select([readend], [], [])
                 assert iwtd == [readend]
                 assert owtd == ewtd == []
-                data = os.read(readend, 4096)
+                data = readend.recv(4096)
                 if len(data) == 0:
                     break
                 assert data == 'x' * len(data)
                 total_in += len(data)
             assert total_in == total_out
         finally:
-            os.close(readend)
+            readend.close()
 
     def test_read_many(self):
-        import os, select
+        import select
         readends = []
         writeends = []
         try:
             for i in range(10):
-                fd1, fd2 = os.pipe()
+                fd1, fd2 = getpair()
                 readends.append(fd1)
                 writeends.append(fd2)
             iwtd, owtd, ewtd = select.select(readends, [], [], 0)
@@ -96,24 +89,91 @@
 
             for i in range(50):
                 n = (i*3) % 10
-                os.write(writeends[n], 'X')
+                writeends[n].send('X')
                 iwtd, owtd, ewtd = select.select(readends, [], [])
                 assert iwtd == [readends[n]]
                 assert owtd == ewtd == []
-                data = os.read(readends[n], 1)
+                data = readends[n].recv(1)
                 assert data == 'X'
 
         finally:
             for fd in readends + writeends:
-                os.close(fd)
+                fd.close()
 
     def test_read_end_closed(self):
-        import os, select
-        readend, writeend = os.pipe()
-        os.close(readend)
+        import select
+        readend, writeend = getpair()
+        readend.close()
         try:
             iwtd, owtd, ewtd = select.select([], [writeend], [])
             assert owtd == [writeend]
             assert iwtd == ewtd == []
         finally:
-            os.close(writeend)
+            writeend.close()
+
+class AppTestSelectWithPipes(_AppTestSelect):
+    "Use a pipe to get pairs of file descriptors"
+    def setup_class(cls):
+        if sys.platform == 'win32':
+            py.test.skip("select() doesn't work with pipes on win32")
+        space = gettestobjspace(usemodules=('select',))
+        cls.space = space
+
+        # Wraps a file descriptor in an socket-like object
+        space.exec_('''if 1:
+        import os
+        class FileAsSocket:
+            def __init__(self, fd):
+                self.fd = fd
+            def fileno(self):
+                return self.fd
+            def send(self, data):
+                return os.write(self.fd, data)
+            def recv(self, length):
+                return os.read(self.fd, length)
+            def close(self):
+                return os.close(self.fd)
+        def getpair():
+            s1, s2 = os.pipe()
+            return FileAsSocket(s1), FileAsSocket(s2)''',
+                    space.builtin.w_dict, space.builtin.w_dict)
+
+class AppTestSelectWithSockets(_AppTestSelect):
+    """Same tests with connected sockets.
+    socket.socketpair() does not exists on win32,
+    so we start our own server."""
+    def setup_class(cls):
+        space = gettestobjspace(usemodules=('select',))
+        cls.space = space
+
+        space.setitem(space.builtin.w_dict, space.wrap('getpair'),
+                      space.wrap(cls.getsocketpair))
+
+        import socket
+        cls.sock = socket.socket()
+
+        try_ports = [1023] + range(20000, 30000, 437)
+        for port in try_ports:
+            print 'binding to port %d:' % (port,),
+            cls.sockaddress = ('127.0.0.1', port)
+            try:
+                cls.sock.bind(cls.sockaddress)
+                print 'works'
+                break
+            except socket.error, e:   # should get a "Permission denied"
+                print e
+            else:
+                raise e
+
+    @classmethod
+    def getsocketpair(cls):
+        """Helper method which returns a pair of connected sockets.
+        Note that they become faked objects at AppLevel"""
+        import thread, socket
+
+        cls.sock.listen(1)
+        s2 = socket.socket()
+        thread.start_new_thread(s2.connect, (cls.sockaddress,))
+        s1, addr2 = cls.sock.accept()
+
+        return s1, s2


More information about the pypy-svn mailing list