[pypy-svn] r42648 - pypy/dist/pypy/module/select/test
afa at codespeak.net
afa at codespeak.net
Thu May 3 22:38:23 CEST 2007
Author: afa
Date: Thu May 3 22:38:23 2007
New Revision: 42648
Modified:
pypy/dist/pypy/module/select/test/test_select.py
Log:
Refactor test_select.py, to allow multiple kinds of selectable objects:
- pipes created with os.pipe()
- connected sockets, but without using socketpair(), to have a chance to make it work on win32.
Modified: pypy/dist/pypy/module/select/test/test_select.py
==============================================================================
--- pypy/dist/pypy/module/select/test/test_select.py (original)
+++ pypy/dist/pypy/module/select/test/test_select.py Thu May 3 22:38:23 2007
@@ -1,14 +1,7 @@
import py, sys
from pypy.conftest import gettestobjspace
-class AppTestSelect:
- def setup_class(cls):
- if sys.platform == 'win':
- py.test.skip("select() doesn't work with pipes, "
- "we would need tests using sockets")
- space = gettestobjspace(usemodules=('select',))
- cls.space = space
-
+class _AppTestSelect:
def test_sleep(self):
import time, select
start = time.time()
@@ -18,22 +11,22 @@
assert end - start > 0.25
def test_readable(self):
- import os, select
- readend, writeend = os.pipe()
+ import select
+ readend, writeend = getpair()
try:
iwtd, owtd, ewtd = select.select([readend], [], [], 0)
assert iwtd == owtd == ewtd == []
- os.write(writeend, 'X')
+ writeend.send('X')
iwtd, owtd, ewtd = select.select([readend], [], [])
assert iwtd == [readend]
assert owtd == ewtd == []
finally:
- os.close(writeend)
- os.close(readend)
+ writeend.close()
+ readend.close()
def test_write_read(self):
- import os, select
- readend, writeend = os.pipe()
+ import select
+ readend, writeend = getpair()
try:
total_out = 0
while True:
@@ -42,7 +35,7 @@
if owtd == []:
break
assert owtd == [writeend]
- total_out += os.write(writeend, 'x' * 512)
+ total_out += writeend.send('x' * 512)
total_in = 0
while True:
iwtd, owtd, ewtd = select.select([readend], [], [], 0)
@@ -50,45 +43,45 @@
if iwtd == []:
break
assert iwtd == [readend]
- data = os.read(readend, 4096)
+ data = readend.recv(4096)
assert len(data) > 0
assert data == 'x' * len(data)
total_in += len(data)
assert total_in == total_out
finally:
- os.close(writeend)
- os.close(readend)
+ writeend.close()
+ readend.close()
def test_close(self):
- import os, select
- readend, writeend = os.pipe()
+ import select
+ readend, writeend = getpair()
try:
try:
- total_out = os.write(writeend, 'x' * 512)
+ total_out = writeend.send('x' * 512)
finally:
- os.close(writeend)
+ writeend.close()
assert 1 <= total_out <= 512
total_in = 0
while True:
iwtd, owtd, ewtd = select.select([readend], [], [])
assert iwtd == [readend]
assert owtd == ewtd == []
- data = os.read(readend, 4096)
+ data = readend.recv(4096)
if len(data) == 0:
break
assert data == 'x' * len(data)
total_in += len(data)
assert total_in == total_out
finally:
- os.close(readend)
+ readend.close()
def test_read_many(self):
- import os, select
+ import select
readends = []
writeends = []
try:
for i in range(10):
- fd1, fd2 = os.pipe()
+ fd1, fd2 = getpair()
readends.append(fd1)
writeends.append(fd2)
iwtd, owtd, ewtd = select.select(readends, [], [], 0)
@@ -96,24 +89,91 @@
for i in range(50):
n = (i*3) % 10
- os.write(writeends[n], 'X')
+ writeends[n].send('X')
iwtd, owtd, ewtd = select.select(readends, [], [])
assert iwtd == [readends[n]]
assert owtd == ewtd == []
- data = os.read(readends[n], 1)
+ data = readends[n].recv(1)
assert data == 'X'
finally:
for fd in readends + writeends:
- os.close(fd)
+ fd.close()
def test_read_end_closed(self):
- import os, select
- readend, writeend = os.pipe()
- os.close(readend)
+ import select
+ readend, writeend = getpair()
+ readend.close()
try:
iwtd, owtd, ewtd = select.select([], [writeend], [])
assert owtd == [writeend]
assert iwtd == ewtd == []
finally:
- os.close(writeend)
+ writeend.close()
+
+class AppTestSelectWithPipes(_AppTestSelect):
+ "Use a pipe to get pairs of file descriptors"
+ def setup_class(cls):
+ if sys.platform == 'win32':
+ py.test.skip("select() doesn't work with pipes on win32")
+ space = gettestobjspace(usemodules=('select',))
+ cls.space = space
+
+ # Wraps a file descriptor in an socket-like object
+ space.exec_('''if 1:
+ import os
+ class FileAsSocket:
+ def __init__(self, fd):
+ self.fd = fd
+ def fileno(self):
+ return self.fd
+ def send(self, data):
+ return os.write(self.fd, data)
+ def recv(self, length):
+ return os.read(self.fd, length)
+ def close(self):
+ return os.close(self.fd)
+ def getpair():
+ s1, s2 = os.pipe()
+ return FileAsSocket(s1), FileAsSocket(s2)''',
+ space.builtin.w_dict, space.builtin.w_dict)
+
+class AppTestSelectWithSockets(_AppTestSelect):
+ """Same tests with connected sockets.
+ socket.socketpair() does not exists on win32,
+ so we start our own server."""
+ def setup_class(cls):
+ space = gettestobjspace(usemodules=('select',))
+ cls.space = space
+
+ space.setitem(space.builtin.w_dict, space.wrap('getpair'),
+ space.wrap(cls.getsocketpair))
+
+ import socket
+ cls.sock = socket.socket()
+
+ try_ports = [1023] + range(20000, 30000, 437)
+ for port in try_ports:
+ print 'binding to port %d:' % (port,),
+ cls.sockaddress = ('127.0.0.1', port)
+ try:
+ cls.sock.bind(cls.sockaddress)
+ print 'works'
+ break
+ except socket.error, e: # should get a "Permission denied"
+ print e
+ else:
+ raise e
+
+ @classmethod
+ def getsocketpair(cls):
+ """Helper method which returns a pair of connected sockets.
+ Note that they become faked objects at AppLevel"""
+ import thread, socket
+
+ cls.sock.listen(1)
+ s2 = socket.socket()
+ thread.start_new_thread(s2.connect, (cls.sockaddress,))
+ s1, addr2 = cls.sock.accept()
+
+ return s1, s2
More information about the pypy-svn
mailing list