""" |
specialized local path implementation. |
|
This Path implementation offers some methods like chmod(), owner() |
and so on that may only make sense on unix. |
|
""" |
from __future__ import generators |
import sys, os, stat, re, atexit |
import py |
from py.__.path import common |
|
|
iswin32 = sys.platform == "win32" |
|
|
if iswin32: |
from py.__.path.local.win import WinMixin as PlatformMixin |
else: |
from py.__.path.local.posix import PosixMixin as PlatformMixin |
|
|
class LocalPath(common.FSPathBase, PlatformMixin): |
""" Local path implementation offering access/modification |
methods similar to os.path. |
""" |
_path_cache = {} |
|
sep = os.sep |
class Checkers(common.FSCheckers): |
def _stat(self): |
try: |
return self._statcache |
except AttributeError: |
try: |
self._statcache = self.path.stat() |
except py.error.ELOOP: |
self._statcache = self.path.lstat() |
return self._statcache |
|
|
def dir(self): |
return stat.S_ISDIR(self._stat().mode) |
|
|
def file(self): |
return stat.S_ISREG(self._stat().mode) |
|
|
def exists(self): |
return self._stat() |
|
|
def link(self): |
st = self.path.lstat() |
return stat.S_ISLNK(st.mode) |
|
|
def __new__(cls, path=None): |
""" Initialize and return a local Path instance. |
|
Path can be relative to the current directory. |
If it is None then the current working directory is taken. |
Note that Path instances always carry an absolute path. |
Note also that passing in a local path object will simply return |
the exact same path object. Use new() to get a new copy. |
""" |
if isinstance(path, common.FSPathBase): |
if path.__class__ == cls: |
return path |
path = path.strpath |
|
self = object.__new__(cls) |
if not path: |
self.strpath = os.getcwd() |
elif isinstance(path, str): |
self.strpath = os.path.abspath(os.path.normpath(str(path))) |
else: |
raise ValueError("can only pass None, Path instances " |
"or non-empty strings to LocalPath") |
assert isinstance(self.strpath, str) |
return self |
|
|
def __hash__(self): |
return hash(self.strpath) |
|
|
def computehash(self, hashtype="md5", chunksize=524288): |
""" return hexdigest of hashvalue for this file. """ |
hash = self._gethashinstance(hashtype) |
f = self.open('rb') |
try: |
while 1: |
buf = f.read(chunksize) |
if not buf: |
return hash.hexdigest() |
hash.update(buf) |
finally: |
f.close() |
|
|
def new(self, **kw): |
""" create a modified version of this path. |
the following keyword arguments modify various path parts: |
|
a:/some/path/to/a/file.ext |
|| drive |
|-------------| dirname |
|------| basename |
|--| purebasename |
|--| ext |
""" |
obj = object.__new__(self.__class__) |
drive, dirname, basename, purebasename,ext = self._getbyspec( |
"drive,dirname,basename,purebasename,ext") |
if 'basename' in kw: |
if 'purebasename' in kw or 'ext' in kw: |
raise ValueError("invalid specification %r" % kw) |
else: |
pb = kw.setdefault('purebasename', purebasename) |
try: |
ext = kw['ext'] |
except KeyError: |
pass |
else: |
if ext and not ext.startswith('.'): |
ext = '.' + ext |
kw['basename'] = pb + ext |
|
|
kw.setdefault('drive', drive) |
kw.setdefault('dirname', dirname) |
kw.setdefault('sep', self.sep) |
obj.strpath = os.path.normpath( |
"%(drive)s%(dirname)s%(sep)s%(basename)s" % kw) |
return obj |
|
def _getbyspec(self, spec): |
""" return a sequence of specified path parts. 'spec' is |
a comma separated string containing path part names. |
according to the following convention: |
a:/some/path/to/a/file.ext |
|| drive |
|-------------| dirname |
|------| basename |
|--| purebasename |
|--| ext |
""" |
res = [] |
parts = self.strpath.split(self.sep) |
|
|
args = filter(None, spec.split(',') ) |
append = res.append |
for name in args: |
if name == 'drive': |
append(parts[0]) |
elif name == 'dirname': |
append(self.sep.join(['']+parts[1:-1])) |
else: |
basename = parts[-1] |
if name == 'basename': |
append(basename) |
else: |
i = basename.rfind('.') |
if i == -1: |
purebasename, ext = basename, '' |
else: |
purebasename, ext = basename[:i], basename[i:] |
if name == 'purebasename': |
append(purebasename) |
elif name == 'ext': |
append(ext) |
else: |
raise ValueError, "invalid part specification %r" % name |
return res |
|
|
def join(self, *args, **kwargs): |
""" return a new path by appending all 'args' as path |
components. if abs=1 is used restart from root if any |
of the args is an absolute path. |
""" |
if not args: |
return self |
strpath = self.strpath |
sep = self.sep |
strargs = [str(x) for x in args] |
if kwargs.get('abs', 0): |
for i in range(len(strargs)-1, -1, -1): |
if os.path.isabs(strargs[i]): |
strpath = strargs[i] |
strargs = strargs[i+1:] |
break |
for arg in strargs: |
arg = arg.strip(sep) |
if py.std.sys.platform == 'win32': |
|
arg = arg.strip('/') |
arg = arg.replace('/', sep) |
if arg: |
if not strpath.endswith(sep): |
strpath += sep |
strpath += arg |
obj = self.new() |
obj.strpath = os.path.normpath(strpath) |
return obj |
|
|
def __eq__(self, other): |
s1 = str(self) |
s2 = str(other) |
if iswin32: |
s1 = s1.lower() |
s2 = s2.lower() |
return s1 == s2 |
|
|
def open(self, mode='r'): |
""" return an opened file with the given mode. """ |
return self._callex(open, self.strpath, mode) |
|
|
def listdir(self, fil=None, sort=None): |
""" list directory contents, possibly filter by the given fil func |
and possibly sorted. |
""" |
if isinstance(fil, str): |
fil = common.fnmatch(fil) |
res = [] |
for name in self._callex(os.listdir, self.strpath): |
childurl = self.join(name) |
if fil is None or fil(childurl): |
res.append(childurl) |
if callable(sort): |
res.sort(sort) |
elif sort: |
res.sort() |
return res |
|
|
def size(self): |
""" return size of the underlying file object """ |
return self.stat().size |
|
|
def mtime(self): |
""" return last modification time of the path. """ |
return self.stat().mtime |
|
|
def copy(self, target, archive=False): |
""" copy path to target.""" |
assert not archive, "XXX archive-mode not supported" |
if self.check(file=1): |
if target.check(dir=1): |
target = target.join(self.basename) |
assert self!=target |
copychunked(self, target) |
else: |
target.ensure(dir=1) |
def rec(p): |
return p.check(link=0) |
for x in self.visit(rec=rec): |
relpath = x.relto(self) |
newx = target.join(relpath) |
if x.check(link=1): |
newx.mksymlinkto(x.readlink()) |
elif x.check(file=1): |
copychunked(x, newx) |
elif x.check(dir=1): |
newx.ensure(dir=1) |
|
|
def rename(self, target): |
""" rename this path to target. """ |
return self._callex(os.rename, str(self), str(target)) |
|
|
def dump(self, obj, bin=1): |
""" pickle object into path location""" |
f = self.open('wb') |
try: |
self._callex(py.std.cPickle.dump, obj, f, bin) |
finally: |
f.close() |
|
|
def mkdir(self, *args): |
""" create & return the directory joined with args. """ |
p = self.join(*args) |
self._callex(os.mkdir, str(p)) |
return p |
|
|
def write(self, content, mode='wb'): |
""" write string content into path. """ |
s = str(content) |
f = self.open(mode) |
try: |
f.write(s) |
finally: |
f.close() |
|
|
def _ensuredirs(self): |
parent = self.dirpath() |
if parent == self: |
return self |
if parent.check(dir=0): |
parent._ensuredirs() |
if self.check(dir=0): |
try: |
self.mkdir() |
except py.error.EEXIST: |
|
|
if self.check(dir=0): |
raise |
return self |
|
|
def ensure(self, *args, **kwargs): |
""" ensure that an args-joined path exists (by default as |
a file). if you specify a keyword argument 'dir=True' |
then the path is forced to be a directory path. |
""" |
p = self.join(*args) |
if kwargs.get('dir', 0): |
return p._ensuredirs() |
else: |
p.dirpath()._ensuredirs() |
if not p.check(file=1): |
p.write("") |
return p |
|
|
def stat(self): |
""" Return an os.stat() tuple. """ |
stat = self._callex(os.stat, self.strpath) |
return self._makestat(stat) |
|
|
def lstat(self): |
""" Return an os.lstat() tuple. """ |
return self._makestat(self._callex(os.lstat, self.strpath)) |
|
|
|
def setmtime(self, mtime=None): |
""" set modification time for the given path. if 'mtime' is None |
(the default) then the file's mtime is set to current time. |
|
Note that the resolution for 'mtime' is platform dependent. |
""" |
if mtime is None: |
return self._callex(os.utime, self.strpath, mtime) |
try: |
return self._callex(os.utime, self.strpath, (-1, mtime)) |
except py.error.EINVAL: |
return self._callex(os.utime, self.strpath, (self.atime(), mtime)) |
|
|
def chdir(self): |
""" change directory to self and return old current directory """ |
old = self.__class__() |
self._callex(os.chdir, self.strpath) |
return old |
|
|
def realpath(self): |
""" return a new path which contains no symbolic links.""" |
return self.__class__(os.path.realpath(self.strpath)) |
|
|
def atime(self): |
""" return last access time of the path. """ |
return self.stat().atime |
|
|
def __repr__(self): |
return 'local(%r)' % self.strpath |
|
|
def __str__(self): |
""" return string representation of the Path. """ |
return self.strpath |
|
|
def pypkgpath(self, pkgname=None): |
""&qu |