import sys try: import thread except: import dummy_thread as thread from ctypes import * import ctypes.util __doc__ = """This module performs file control and I/O control on file descriptors. It is an interface to the fcntl() and ioctl() Unix routines. File descriptors can be obtained with the fileno() method of a file or socket object.""" _libc = cdll.LoadLibrary(ctypes.util.find_library("c")) # constants, look in fcntl.h and platform docs for the meaning LOCK_SH = 1 LOCK_EX = 2 LOCK_NB = 4 LOCK_UN = 8 F_DUPFD = 0 F_GETFD = 1 F_SETFD = 2 F_GETFL = 3 F_SETFL = 4 F_UNLCK = 2 FD_CLOEXEC = 1 if "linux" in sys.platform: LOCK_MAND = 32 LOCK_READ = 64 LOCK_WRITE = 128 LOCK_RW = 192 F_GETSIG = 11 F_SETSIG = 10 F_GETLK64 = 12 F_SETLK64 = 13 F_SETLKW64 = 14 # in CPython they are 12, 13 and 14 # in plain C they are 5, 6 and 7. Look at bits/fcntl.h F_GETLK = F_GETLK64 F_SETLK = F_SETLK64 F_SETLKW = F_SETLKW64 F_GETOWN = 9 F_SETOWN = 8 F_RDLCK = 0 F_WRLCK = 1 F_SETLEASE = 1024 F_GETLEASE = 1025 F_NOTIFY = 1026 F_EXLCK = 4 F_SHLCK = 8 DN_ACCESS = 0x00000001 DN_MODIFY = 0x00000002 DN_CREATE = 0x00000004 DN_DELETE = 0x00000008 DN_RENAME = 0x00000010 DN_ATTRIB = 0x00000020 # Python turns it and long integer, we need the C's signed integer DN_MULTISHOT = c_int(0x80000000).value __SID = ord('S') << 8 I_NREAD = __SID | 1 I_PUSH = __SID | 2 I_POP = __SID | 3 I_LOOK = __SID | 4 I_FLUSH = __SID | 5 I_SRDOPT = __SID | 6 I_GRDOPT = __SID | 7 I_STR = __SID | 8 I_SETSIG = __SID | 9 I_GETSIG = __SID | 10 I_FIND = __SID | 11 I_LINK = __SID | 12 I_UNLINK = __SID | 13 I_PEEK = __SID | 15 I_FDINSERT = __SID | 16 I_SENDFD = __SID | 17 I_RECVFD = __SID | 14 I_SWROPT = __SID | 19 I_LIST = __SID | 21 I_PLINK = __SID | 22 I_PUNLINK = __SID | 23 I_FLUSHBAND = __SID | 28 I_CKBAND = __SID | 29 I_GETBAND = __SID | 30 I_ATMARK = __SID | 31 I_SETCLTIME = __SID | 32 I_GETCLTIME = __SID | 33 I_CANPUT = __SID | 34 del __SID else: F_GETLK = 7 F_SETLK = 8 F_SETLKW = 9 F_GETOWN = 5 F_SETOWN = 6 F_RDLCK = 1 F_WRLCK = 3 class _flock(Structure): _fields_ = [("l_type", c_short), ("l_whence", c_short), ("l_start", c_long), ("l_len", c_long), ("l_pid", c_int)] def _conv_descriptor(f): if not (isinstance(f, (int, long)) or hasattr(f, "fileno")): raise TypeError, "argument must be an int, or have a fileno() method." try: return f.fileno() except AttributeError: return f def _get_error_msg(): errno = c_int.in_dll(_libc, "errno") _libc.strerror.restype = c_char_p msg = _libc.strerror(errno) return msg def _call_func(f_name, fd, code, arg): func = getattr(_libc, f_name) lock = thread.allocate_lock() lock.acquire() rv = func(fd, code, arg) lock.release() if rv < 0: raise IOError, _get_error_msg() return rv def _check_flock_op(op): l = _flock() if op == LOCK_UN: l.l_type = F_UNLCK elif op & LOCK_SH: l.l_type = F_RDLCK elif op & LOCK_EX: l.l_type = F_WRLCK else: raise ValueError, "unrecognized flock argument" return l def _is_mutable(arg): try: arg[0] = arg[0] except TypeError: return False else: return True def fcntl(fd, op, arg=0): """fcntl(fd, op, [arg]) Perform the requested operation on file descriptor fd. The operation is defined by op and is operating system dependent. These constants are available from the fcntl module. The argument arg is optional, and defaults to 0; it may be an int or a string. If arg is given as a string, the return value of fcntl is a string of that length, containing the resulting value put in the arg buffer by the operating system. The length of the arg string is not allowed to exceed 1024 bytes. If the arg given is an integer or if none is specified, the result value is an integer corresponding to the return value of the fcntl call in the C code.""" fd = _conv_descriptor(fd) if not isinstance(op, (int, long)): raise TypeError, "an integer is required" if isinstance(arg, (int, long)): rv = _call_func("fcntl", fd, op, arg) return rv elif isinstance(arg, str): if len(arg) > 1024: raise ValueError, "fcntl string arg too long" _call_func("fcntl", fd, op, arg) return arg else: raise TypeError, "int or string required" def flock(fd, op): """flock(fd, operation) Perform the lock operation op on file descriptor fd. See the Unix manual flock(3) for details. (On some systems, this function is emulated using fcntl().)""" fd = _conv_descriptor(fd) if not isinstance(op, (int, long)): raise TypeError, "an integer is required" if hasattr(_libc, "flock"): lock = thread.allocate_lock() lock.acquire() rv = _libc.flock(fd, op) lock.release() if rv < 0: raise IOError, _get_error_msg() else: l = _check_flock_op(op) l.l_whence = l.l_start = l.l_len = 0 op = (F_SETLKW, F_SETLK)[op & LOCK_NB] _libc.fcntl.argtypes = [c_int, c_int, POINTER(_flock)] rv = _call_func("fcntl", fd, op, byref(l)) def lockf(fd, op, length=0, start=0, whence=0): """lockf (fd, operation, length=0, start=0, whence=0) This is essentially a wrapper around the fcntl() locking calls. fd is the file descriptor of the file to lock or unlock, and operation is one of the following values: LOCK_UN - unlock LOCK_SH - acquire a shared lock LOCK_EX - acquire an exclusive lock When operation is LOCK_SH or LOCK_EX, it can also be bit-wise OR'd with LOCK_NB to avoid blocking on lock acquisition. If LOCK_NB is used and the lock cannot be acquired, an IOError will be raised and the exception will have an errno attribute set to EACCES or EAGAIN (depending on the operating system -- for portability, check for either value). length is the number of bytes to lock, with the default meaning to lock to EOF. start is the byte offset, relative to whence, to that the lock starts. whence is as with fileobj.seek(), specifically: 0 - relative to the start of the file (SEEK_SET) 1 - relative to the current buffer position (SEEK_CUR) 2 - relative to the end of the file (SEEK_END)""" fd = _conv_descriptor(fd) for v in [op, length, start, whence]: if not isinstance(v, (int, long)): raise TypeError, "an integer is required" l = _check_flock_op(op) l.l_start = l.l_len = 0 if start: l.l_start = long(start) if len: l.l_len = long(length) l.l_whence = whence op = (F_SETLKW, F_SETLK)[op & LOCK_NB] _libc.fcntl.argtypes = [c_int, c_int, POINTER(_flock)] _call_func("fcntl", fd, op, byref(l)) def ioctl(fd, op, arg=0, mutate_flag=True): """ioctl(fd, opt[, arg[, mutate_flag]]) Perform the requested operation on file descriptor fd. The operation is defined by opt and is operating system dependent. Typically these codes are retrieved from the fcntl or termios library modules. The argument arg is optional, and defaults to 0; it may be an int or a buffer containing character data (most likely a string or an array). If the argument is a mutable buffer (such as an array) and if the mutate_flag argument (which is only allowed in this case) is true then the buffer is (in effect) passed to the operating system and changes made by the OS will be reflected in the contents of the buffer after the call has returned. The return value is the integer returned by the ioctl system call. If the argument is a mutable buffer and the mutable_flag argument is not passed or is false, the behavior is as if a string had been passed. This behavior will change in future releases of Python. If the argument is an immutable buffer (most likely a string) then a copy of the buffer is passed to the operating system and the return value is a string of the same length containing whatever the operating system put in the buffer. The length of the arg buffer in this case is not allowed to exceed 1024 bytes. If the arg given is an integer or if none is specified, the result value is an integer corresponding to the return value of the ioctl call in the C code.""" fd = _conv_descriptor(fd) # Python turns number > sys.maxint into long, we need the signed C value op = c_int(op).value IOCTL_BUFSZ = 1024 if "linux" in sys.platform: _libc.ioctl.argtypes = [c_int, c_int, c_char_p] else: _libc.ioctl.argtypes = [c_int, c_ulong, c_char_p] if not isinstance(op, (int, long)): raise TypeError, "an integer is required" if not isinstance(mutate_flag, int): raise TypeError, "a boolean is required" if isinstance(arg, (int, long)): rv = _call_func("ioctl", fd, op, arg) return int(rv) elif _is_mutable(arg): # AFAIK array.array is the only mutable buffer in Python try: buf = create_string_buffer(arg.tostring()) except AttributeError: buf = create_string_buffer(str(arg)) if not mutate_flag: if len(arg) > IOCTL_BUFSZ: raise ValueError, "ioctl string arg too long" rv = _call_func("ioctl", fd, op, buf) if mutate_flag: return rv else: return buf.value elif not _is_mutable(arg): if len(arg) > IOCTL_BUFSZ: raise ValueError, "ioctl string arg too long" buf = create_string_buffer(str(arg)) rv = _call_func("ioctl", fd, op, buf) return buf.value else: raise TypeError, "an integer or a buffer required" __all__ = ["fcntl", "flock", "lockf", "ioctl"] if __name__ == "__main__": import struct import os import array import termios f = open("temp", "w") print "fcntl(f, F_SETFL, os.O_NDELAY):", fcntl(f, F_SETFL, os.O_NDELAY) lockdata = struct.pack('hhllhh', F_WRLCK, 0, 0, 0, 0, 0) print "lockdata:", repr(lockdata) print "fcntl(f, F_SETLKW, lockdata):", repr(fcntl(f, F_SETLKW, lockdata)) os.unlink("temp") buf = array.array('h', [0]) ret = ioctl(0, termios.TIOCGPGRP, buf, False) print "buf:", buf print "ioctl(0, termios.TIOCGPGRP, buf, False):", repr(ret)