# since the buffer object protocol is not exposed to the Python level # this module doesn't support it. You can't write a mmap directly to a file # use m.read() instead. You cannot also match it with regexp import os import stat import sys import platform from ctypes import * import ctypes.util if os.name == "posix": _libc = cdll.LoadLibrary(ctypes.util.find_library("c")) else: _libc = None _POSIX = os.name == "posix" _MS_WINDOWS = os.name == "nt" _FREEBSD = "freebsd" in sys.platform _64BIT = "64bit" in platform.architecture()[0] error = EnvironmentError _ACCESS_DEFAULT = 0 # constants, look in sys/mman.h and platform docs for the meaning ACCESS_READ = 0x1 ACCESS_WRITE = 0x2 ACCESS_COPY = 0x3 if _POSIX: MAP_SHARED = 0x01 MAP_PRIVATE = 0x02 if _FREEBSD: MAP_ANON = MAP_ANONYMOUS = 0x1000 _MS_SYNC = 0x0010 else: MAP_ANON = MAP_ANONYMOUS = 0x20 MAP_DENYWRITE = 0x00800 MAP_EXECUTABLE = 0x1000 _MS_SYNC = 4 PROT_READ = 0x1 PROT_WRITE = 0x2 PROT_EXEC = 0x4 def _get_page_size(): return _libc.getpagesize() def _get_error_msg(): errno = c_int.in_dll(_libc, "errno") _libc.strerror.restype = c_char_p msg = _libc.strerror(errno) return msg elif _MS_WINDOWS: from ctypes import wintypes _LPVOID = c_void_p _LPCVOID = _LPVOID _DWORD_PTR = wintypes.DWORD _INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value _PAGE_READONLY = 0x02 _PAGE_READWRITE = 0x04 _PAGE_WRITECOPY = 0x08 _FILE_MAP_READ = 0x0004 _FILE_MAP_WRITE = 0x0002 _FILE_MAP_COPY = 0x0001 class _STRUCT(Structure): _fields_ = [("wProcessorArchitecture", wintypes.WORD), ("wReserved", wintypes.WORD)] class _UNION(Union): _fields_ = [("dwOemId", wintypes.DWORD), ("struct", _STRUCT)] class _SYSTEM_INFO(Structure): _fields_ = [("union", _UNION), ("dwPageSize", wintypes.DWORD), ("lpMinimumApplicationAddress", _LPVOID), ("lpMaximumApplicationAddress", _LPVOID), ("dwActiveProcessorMask", _DWORD_PTR), ("dwNumberOfProcessors", wintypes.DWORD), ("dwProcessorType", wintypes.DWORD), ("dwAllocationGranularity", wintypes.DWORD), ("wProcessorLevel", wintypes.WORD), ("wProcessorRevision", wintypes.WORD)] def _get_page_size(): si = _SYSTEM_INFO() windll.kernel32.GetSystemInfo.argtypes = [POINTER(_SYSTEM_INFO)] windll.kernel32.GetSystemInfo(byref(si)) return int(si.dwPageSize) def _get_file_size(handle): windll.kernel32.GetFileSize.restype = wintypes.DWORD low = wintypes.DWORD() high = wintypes.DWORD() low = wintypes.DWORD(windll.kernel32.GetFileSize(handle, byref(high))) # low might just happen to have the value INVALID_FILE_SIZE # so we need to check the last error also INVALID_FILE_SIZE = wintypes.DWORD(0xFFFFFFFF).value NO_ERROR = 0 dwErr = GetLastError() if low.value == INVALID_FILE_SIZE and dwErr != NO_ERROR: raise WinError(dwErr) return low, high def _get_error_msg(): cdll.msvcrt.strerror.restype = c_char_p msg = cdll.msvcrt.strerror(GetLastError()) return msg windll.kernel32.CloseHandle.argtypes = [wintypes.HANDLE] windll.kernel32.CloseHandle.restype = wintypes.BOOL PAGESIZE = _get_page_size() class _mmap(object): def __init__(self): self._data = POINTER(c_char) self._size = 0 self._pos = 0 self._access = _ACCESS_DEFAULT if _MS_WINDOWS: self._map_handle = wintypes.HANDLE() self._file_handle = wintypes.HANDLE() self._tagname = None self._memcpy = getattr(cdll.msvcrt, "memcpy") elif _POSIX: self._fd = 0 self._memcpy = getattr(_libc, "memcpy") self._memcpy.argtypes = [POINTER(c_char), POINTER(c_char), c_int] def __del__(self): if _MS_WINDOWS: if self._data: self._unmapview() if self._map_handle.value != _INVALID_HANDLE_VALUE: windll.kernel32.CloseHandle(self._map_handle) if self._file_handle.value != _INVALID_HANDLE_VALUE: windll.kernel32.CloseHandle(self._file_handle) self.tagname = None elif _POSIX: if self._fd >= 0: _libc.close.argtypes = [c_int] _libc.close(self._fd) if self._data: self._data = cast(self._data, c_void_p) _libc.msync.argtypes = [c_void_p, c_int, c_int] _libc.msync(self._data, self._size, _MS_SYNC) _libc.munmap(self._data, self._size) def __len__(self): self._check_valid() return self._size def __getitem__(self, index): self._check_valid() if not isinstance(index, (int, long, slice)): raise TypeError, "sequence index must be integer" try: return self._to_str()[index] except IndexError: raise IndexError, "mmap index out of range" def __setitem__(self, index, value): self._check_valid() if not isinstance(index, (int, long, slice)): raise TypeError, "sequence index must be integer" if _is_int(index) and not len(value) == 1: raise IndexError, "mmap assignment must be single-character string" if isinstance(index, slice) and not isinstance(value, str): raise IndexError, "mmap slice assignment must be a string" if isinstance(index, slice): if not index.start: start = 0 index = slice(start, index.stop, index.step) if not index.stop: stop = self._size index = slice(index.start, stop, index.step) if len(value) != index.stop - index.start: raise IndexError, "mmap slice assignment is wrong size" try: str_data = self._to_str() lst = list(str_data) lst[index] = value str_data = "".join(lst) except IndexError: raise IndexError, "mmap index out of range" self._check_writeable() self._copy_mem(str_data) def __delitem__(self, index): self._check_valid() if not isinstance(index, (int, slice)): raise TypeError, "sequence index must be integer" if isinstance(index, int): raise TypeError, "mmap object doesn't support item deletion" elif isinstance(index, slice): raise TypeError, "mmap object doesn't support slice deletion" def __add__(self, other): self._check_valid() raise SystemError, "mmaps don't support concatenation" def __mul__(self, other): self._check_valid() raise SystemError, "mmaps don't support repeat operation" def _unmapview(self): self._data = cast(self._data, c_void_p) UnmapViewOfFile = windll.kernel32.UnmapViewOfFile UnmapViewOfFile.argtypes = [_LPCVOID] UnmapViewOfFile.restype = wintypes.BOOL UnmapViewOfFile(self._data) def _check_valid(self): if _MS_WINDOWS: if self._map_handle.value == _INVALID_HANDLE_VALUE: raise ValueError, "map closed or invalid" elif _POSIX: if not self._data: raise ValueError, "map closed or invalid" def _check_writeable(self): if not (self._access != ACCESS_READ): raise TypeError, "mmap can't modify a readonly memory map." def _check_resizeable(self): if not (self._access == ACCESS_WRITE or self._access == _ACCESS_DEFAULT): raise TypeError, "mmap can't resize a readonly or copy-on-write memory map." def _to_str(self): return "".join(self._data[i] for i in range(self._size)) def _copy_mem(self, str): self._memcpy(self._data, cast(str, POINTER(c_char)), len(str)) def close(self): if _MS_WINDOWS: if self._data: self._unmapview() self._data = None if self._map_handle.value != _INVALID_HANDLE_VALUE: windll.kernel32.CloseHandle(self._map_handle) self._map_handle.value = _INVALID_HANDLE_VALUE if self._file_handle.value != _INVALID_HANDLE_VALUE: windll.kernel32.CloseHandle(self._file_handle) self._file_handle.value = _INVALID_HANDLE_VALUE else: _libc.close.argtypes = [c_int] _libc.close(self._fd) self._fd = -1 if self._data: self._data = cast(self._data, c_void_p) _libc.munmap(self._data, self._size) self._data = None def read_byte(self): self._check_valid() if self._pos < self._size: value = self._data[self._pos] self._pos += 1 return value else: raise ValueError, "read byte out of range" def readline(self): self._check_valid() found = False for pos in range(self._pos, self._size): if self._data[pos] == '\n': found = True break if not found: eol = self._size else: eol = pos + 1 # we're interested in the position after new line res = self._data[self._pos:eol-self._pos] self._pos += eol - self._pos return res def read(self, num): self._check_valid() if not _is_int(num): raise TypeError, "an integer is required" num_bytes = num # silently adjust out of range requests if self._pos + num_bytes > self._size: num_bytes -= (self._pos + num_bytes) - self._size # due to slicing of python, the last char is not always returned if num_bytes < self._size - 1: res = self._data[self._pos:num_bytes] else: res = self._data[self._pos:self._size] self._pos += num_bytes return res def find(self, str, start=0): self._check_valid() if not _is_str(str): raise TypeError, "find() argument 1 must be string or read-only buffer" if not _is_int(start): raise TypeError, "an integer is required" # since we don't have to update positions we gain advantage of python strings :-) str_data = self._to_str() return str_data.find(str, start) def seek(self, pos, whence=0): self._check_valid() if not _is_int(pos): raise TypeError, "an integer is required" if not _is_int(whence): raise TypeError, "an integer is required" dist = pos how = whence if how == 0: # relative to start if dist < 0: raise ValueError, "seek out of range" where = dist elif how == 1: # relative to current position if self._pos + dist < 0: raise ValueError, "seek out of range" where = self._pos + dist elif how == 2: # relative to the end if self._size + dist < 0: raise ValueError, "seek out of range" where = self._size + dist else: raise ValueError, "unknown seek type" if where > self._size: raise ValueError, "seek out of range" self._pos = where def write(self, str): self._check_valid() if not _is_str(str): raise TypeError, "write() argument must be string or read-only buffer" self._check_writeable() if self._pos + len(str) > self._size: raise ValueError, "data out of range" self._copy_mem(str) self._pos += len(str) def write_byte(self, byte): self._check_valid() if not _is_str(byte) or len(byte) > 1: raise TypeError, "write_byte() argument must be char" self._check_writeable() str_data = self._to_str() str_data = list(str_data) str_data[self._pos] = byte str_data = "".join(str_data) self._copy_mem(str_data) self._pos += 1 def size(self): self._check_valid() if _MS_WINDOWS: if self._file_handle.value != _INVALID_HANDLE_VALUE: low, high = _get_file_size(self._file_handle) if not high and low.value < sys.maxint: return int(low.value) size = (long(high.value) << 32) + low.value return long(size) else: return self._size elif _POSIX: st = os.fstat(self._fd) return st.st_size def tell(self): self._check_valid() return self._pos def flush(self, *args): self._check_valid() if len(args) > 2: raise TypeError, "flush() take at most 2 arguments" res = map(_is_int, args) if False in res: raise TypeError, "an integer is required" if args: offset, size = args else: offset = 0 size = self._size if offset + size > self._size: raise ValueError, "flush values out of range" else: data = cast(self._data[offset:size], c_void_p) if _MS_WINDOWS: FlushViewOfFile = windll.kernel32.FlushViewOfFile FlushViewOfFile.argtypes = [c_void_p, c_int] res = FlushViewOfFile(data, size) return res elif _POSIX: _libc.msync.argtypes = [c_void_p, c_int, c_int] if _FREEBSD: res = _libc.msync(data, size, _MS_SYNC) else: # alignment size = size + data.value & (PAGESIZE - 1) res = _libc.msync(c_void_p(data.value & ~(PAGESIZE - 1)), size, _MS_SYNC) if res == -1: raise error, _get_error_msg() return 0 def move(self, dest, src, count): self._check_valid() res = map(_is_int, [dest, src, count]) if False in res: raise TypeError, "an integer is required" self._check_writeable() # check boundings if (src + count > self._size) or (dest + count > self._size): raise ValueError, "source or destination out of range" # memmove seems buggy so i rely on memcpy instead data = self._to_str() lst_src = list(data[src:src+count]) lst_dest = list(data[dest:]) for idx, c in enumerate(lst_src): lst_dest[idx] = lst_src[idx] data = "%s%s" % (data[0:dest], "".join(lst_dest)) self._copy_mem(data) def resize(self, newsize): self._check_valid() if not _is_int(newsize): raise TypeError, "an integer is required" self._check_resizeable() if _POSIX: if not hasattr(_libc, "mremap"): raise error, "mmap: resizing not available--no mremap()" # resize the underlying file first _libc.ftruncate.argtypes = [c_int, c_int] res = _libc.ftruncate(self._fd, newsize) if res == -1: raise error, _get_error_msg() # now resize the mmap _libc.mremap.argtypes = [c_void_p, c_int, c_int, c_int] _libc.mremap.restype = c_void_p MREMAP_MAYMOVE = 1 res = _libc.mremap(cast(self._data, c_void_p), self._size, newsize, MREMAP_MAYMOVE) res = c_void_p(res) # linux does not wrap c_void_p if res.value == -1: raise error, _get_error_msg() self._data = cast(res, POINTER(c_char)) self._size = newsize elif _MS_WINDOWS: # disconnect the mapping self._unmapview() windll.kernel32.CloseHandle(self._map_handle) # move to the desired EOF position if _64BIT: newsize_high = wintypes.DWORD(newsize >> 32) newsize_low = wintypes.DWORD(newsize & 0xFFFFFFFF) else: newsize_high = wintypes.DWORD(0) newsize_low = wintypes.DWORD(newsize) FILE_BEGIN = wintypes.DWORD(0) SetFilePointer = windll.kernel32.SetFilePointer SetFilePointer.argtypes = [wintypes.HANDLE, wintypes.LONG, POINTER(wintypes.LONG), wintypes.DWORD] SetFilePointer(self._file_handle, wintypes.LONG(newsize_low.value), byref(wintypes.LONG(newsize_high.value)), FILE_BEGIN) # resize the file SetEndOfFile = windll.kernel32.SetEndOfFile SetEndOfFile.argtypes = [wintypes.HANDLE] SetEndOfFile(self._file_handle) # create another mapping object and remap the file view CreateFileMapping = windll.kernel32.CreateFileMappingA CreateFileMapping.argtypes = [wintypes.HANDLE, c_void_p, wintypes.DWORD, wintypes.DWORD, wintypes.DWORD, c_char_p] CreateFileMapping.restype = wintypes.HANDLE self._map_handle = wintypes.HANDLE(CreateFileMapping(self._file_handle, None, _PAGE_READWRITE, newsize_high, newsize_low, self._tagname)) dwErrCode = wintypes.DWORD(0) if self._map_handle: MapViewOfFile = windll.kernel32.MapViewOfFile MapViewOfFile.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.DWORD, wintypes.DWORD, wintypes.DWORD] MapViewOfFile.restype = c_void_p self._data = MapViewOfFile(self._map_handle, _FILE_MAP_WRITE, 0, 0, 0) if self._data: self._data = cast(self._data, POINTER(c_char)) self._size = newsize return else: dwErrCode = GetLastError() else: dwErrCode = GetLastError() raise WinError(dwErrCode) def _check_map_size(size): if size < 0: raise TypeError, "memory mapped size must be positive" if size == sys.maxint: raise OverflowError, "memory mapped size is too large (limited by C int)" _is_int = lambda n: isinstance(n, (int, long)) _is_str = lambda s: isinstance(s, (str, unicode)) def _check_args(fileno, length, max_n_args, keywords, *args, **kwargs): if not _is_int(fileno): raise TypeError, "an integer is required" if not _is_int(length): raise TypeError, "map size must be an integral value" if len(args) + len(kwargs) > max_n_args: raise TypeError, "function takes at most %d arguments" % max_n_args if kwargs: for k in kwargs.keys(): if k not in keywords: err =\ "'%s' is an invalid keyword argument for this function" % k raise TypeError, err if _POSIX: def mmap(fileno, length, *args, **kwargs): flags = MAP_SHARED prot = PROT_WRITE | PROT_READ access = _ACCESS_DEFAULT keywords = ["flags", "prot", "access"] fd = fileno _check_args(fileno, length, 5, keywords, args, kwargs) args_to_check = [] args_to_check.extend(args) args_to_check.extend(kwargs.values()) res = map(_is_int, args_to_check) if False in res: raise TypeError, "an integer is required" # set the proper values for each argument params = [] for i, param in enumerate([flags, prot, access]): try: param = args[i] except IndexError: try: param = kwargs[keywords[i]] except KeyError: pass params.append(param) flags, prot, access = params # check size boundaries _check_map_size(length) map_size = length # check access is not there when flags and prot are there if access != _ACCESS_DEFAULT and ((flags != MAP_SHARED) or\ (prot != (PROT_WRITE | PROT_READ))): raise ValueError, "mmap can't specify both access and flags, prot." if access == ACCESS_READ: flags = MAP_SHARED prot = PROT_READ elif access == ACCESS_WRITE: flags = MAP_SHARED prot = PROT_READ | PROT_WRITE elif access == ACCESS_COPY: flags = MAP_PRIVATE prot = PROT_READ | PROT_WRITE elif access == _ACCESS_DEFAULT: pass else: raise ValueError, "mmap invalid access parameter." # check file size st = os.fstat(fd) if stat.S_ISREG(st.st_mode): if map_size == 0: map_size = st.st_size elif map_size > st.st_size: raise ValueError, "mmap length is greater than file size" m = _mmap() m._size = map_size if fd == -1: # Assume the caller wants to map anonymous memory. # This is the same behaviour as Windows. mmap.mmap(-1, size) # on both Windows and Unix map anonymous memory. m._fd = -1 flags |= MAP_ANONYMOUS else: m._fd = os.dup(fd) if m._fd == -1: raise error, _get_error_msg() _libc.mmap.restype = c_void_p # for an unknown reason mmap crashes under FreeBSD or OSX if # called with 6 parameters like man says. Call it with 7 and works well. # that's really odd but it works res = _libc.mmap(0, map_size, prot, flags, fd, 0, 0) if res == c_void_p(-1).value: raise error, _get_error_msg() m._data = cast(res, POINTER(c_char)) m._access = access return m elif _MS_WINDOWS: def mmap(fileno, length, *args, **kwargs): size_hi = wintypes.DWORD() # upper 32 bits of m._size size_lo = wintypes.DWORD() # lower 32 bits of m._size tagname = "" dwErr = wintypes.DWORD(0) fh = wintypes.HANDLE(0) access = _ACCESS_DEFAULT flProtect = wintypes.DWORD() dwDesiredAccess = wintypes.DWORD() keywords = ["tagname", "access"] _check_args(fileno, length, 4, keywords, args, kwargs) try: tagname = args[0] except IndexError: try: tagname = kwargs[keywords[0]] except KeyError: pass if not _is_str(tagname) or tagname: raise TypeError, "tagname must be string or None" try: access = args[1] except IndexError: try: access = kwargs[keywords[1]] except KeyError: pass if not _is_int(access): raise TypeError, "an integer is required" if access == ACCESS_READ: flProtect = wintypes.DWORD(_PAGE_READONLY) dwDesiredAccess = wintypes.DWORD(_FILE_MAP_READ) elif access == _ACCESS_DEFAULT or access == ACCESS_WRITE: flProtect = wintypes.DWORD(_PAGE_READWRITE) dwDesiredAccess = wintypes.DWORD(_FILE_MAP_WRITE) elif access == ACCESS_COPY: flProtect = wintypes.DWORD(_PAGE_WRITECOPY) dwDesiredAccess = wintypes.DWORD(_FILE_MAP_COPY) else: raise ValueError, "mmap invalid access parameter." # check size boundaries _check_map_size(length) map_size = length # assume -1 and 0 both mean invalid filedescriptor # to 'anonymously' map memory. if fileno != -1 and fileno != 0: fh = cdll.msvcr71._get_osfhandle(fileno) if fh == -1: raise error, _get_error_msg() # Win9x appears to need us seeked to zero SEEK_SET = 0 cdll.msvcrt._lseek(fileno, 0, SEEK_SET) m = _mmap() m._file_handle = wintypes.HANDLE(_INVALID_HANDLE_VALUE) m._map_handle = wintypes.HANDLE(_INVALID_HANDLE_VALUE) if fh: # it is necessary to duplicate the handle, so the # Python code can close it on us DUPLICATE_SAME_ACCESS = 0x00000002 GetCurrentProcess = windll.kernel32.GetCurrentProcess GetCurrentProcess.restype = wintypes.HANDLE DuplicateHandle = windll.kernel32.DuplicateHandle DuplicateHandle.argtypes = [wintypes.HANDLE, wintypes.HANDLE, wintypes.HANDLE, POINTER(wintypes.HANDLE), wintypes.DWORD, wintypes.BOOL, wintypes.DWORD] DuplicateHandle.restype = wintypes.BOOL res = DuplicateHandle(GetCurrentProcess(), # source process handle fh, # handle to be duplicated GetCurrentProcess(), # target process handle byref(m._file_handle), # result 0, # access - ignored due to options value wintypes.BOOL(False), # inherited by child procs? DUPLICATE_SAME_ACCESS) # options if not res: raise WinError() if not map_size: low, high = _get_file_size(fh) if _64BIT: m._size = (c_long(low.value).value << 32) + 1 else: if high: # file is too large to map completely m._size = -1L else: m._size = low.value else: m._size = map_size else: m._size = map_size # set the tag name if tagname: m._tagname = tagname m._access = access # DWORD is a 4-byte int. If int > 4-byte it must be divided if _64BIT: size_hi = wintypes.DWORD(m._size >> 32) size_lo = wintypes.DWORD(m._size & 0xFFFFFFFF) else: size_hi = wintypes.DWORD(0) size_lo = wintypes.DWORD(m._size) CreateFileMapping = windll.kernel32.CreateFileMappingA CreateFileMapping.argtypes = [wintypes.HANDLE, c_void_p, wintypes.DWORD, wintypes.DWORD, wintypes.DWORD, c_char_p] CreateFileMapping.restype = wintypes.HANDLE m._map_handle = wintypes.HANDLE(CreateFileMapping(m._file_handle, None, flProtect, size_hi, size_lo, m._tagname)) if m._map_handle: MapViewOfFile = windll.kernel32.MapViewOfFile MapViewOfFile.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.DWORD, wintypes.DWORD, wintypes.DWORD] MapViewOfFile.restype = c_void_p m._data = MapViewOfFile(m._map_handle, dwDesiredAccess, 0, 0, 0) if m._data: m._data = cast(m._data, POINTER(c_char)) return m else: dwErr = GetLastError() else: dwErr = GetLastError() raise WinError(dwErr)