class LocalPath(FSPathBase, PosixMixin):
Local path implementation offering access/modification
methods similar to os.path.
class attributes and properties:
basename: <property object (dynamically calculated value)>
ext: <property object (dynamically calculated value)>
purebasename: <property object (dynamically calculated value)>
sep: /
methods:
def atime(self):
return last access time of the path.
arguments:
return value:
<UNKNOWN>
def chdir(self):
change directory to self and return old current directory
source: path/local/local.py
|
|
def chdir(self): |
""" change directory to self and return old current directory """ |
old = self.__class__() |
self._callex(os.chdir, self.strpath) |
return old | |
def check(self, **kw):
check a path for existence, or query its properties
without arguments, this returns True if the path exists (on the
filesystem), False if not
with (keyword only) arguments, the object compares the value
of the argument with the value of a property with the same name
(if it has one, else it raises a TypeError)
when for example the keyword argument 'ext' is '.py', this will
return True if self.ext == '.py', False otherwise
arguments:
return value:
AnyOf(<None>, <Boolean>)
source: path/common.py
| 96 |
| 97 |
| 98 |
| 99 |
| 100 |
| 101 |
| 102 |
| 103 |
| 104 |
| 105 |
| 106 |
| 107 |
| 108 |
| 109 |
| 110 |
| 111 |
| 112 |
| 113 |
| 114 |
| 115 | |
def check(self, **kw): |
""" check a path for existence, or query its properties |
|
without arguments, this returns True if the path exists (on the |
filesystem), False if not |
|
with (keyword only) arguments, the object compares the value |
of the argument with the value of a property with the same name |
(if it has one, else it raises a TypeError) |
|
when for example the keyword argument 'ext' is '.py', this will |
return True if self.ext == '.py', False otherwise |
""" |
if kw: |
kw = kw.copy() |
if not checktype(self, kw): |
return False |
else: |
kw = {'exists' : 1} |
return self.Checkers(self)._evaluate(kw) | |
def chmod(self, mode, rec=0):
change permissions to the given mode. If mode is an
integer it directly encodes the os-specific modes.
if rec is True perform recursively.
(xxx if mode is a string then it specifies access rights
in '/bin/chmod' style, e.g. a+r).
arguments:
- self: <Instance of Class LocalPath>
- mode: <Int>
- rec: AnyOf(<Function>, <Int>)
return value:
<None>
source: path/local/posix.py
| 53 |
| 54 |
| 55 |
| 56 |
| 57 |
| 58 |
| 59 |
| 60 |
| 61 |
| 62 |
| 63 |
| 64 |
| 65 |
| 66 | |
def chmod(self, mode, rec=0): |
""" change permissions to the given mode. If mode is an |
integer it directly encodes the os-specific modes. |
if rec is True perform recursively. |
|
(xxx if mode is a string then it specifies access rights |
in '/bin/chmod' style, e.g. a+r). |
""" |
if not isinstance(mode, int): |
raise TypeError("mode %r must be an integer" % (mode,)) |
if rec: |
for x in self.visit(rec=rec): |
self._callex(os.chmod, str(x), mode) |
self._callex(os.chmod, str(self), mode) | |
def chown(self, user, group, rec=0):
change ownership to the given user and group.
user and group may be specified by a number or
by a name. if rec is True change ownership
recursively.
arguments:
- self: <Instance of Class LocalPath>
- user: <String>
- group: <String>
- rec: <Int>
return value:
<None>
source: path/local/posix.py
| 68 |
| 69 |
| 70 |
| 71 |
| 72 |
| 73 |
| 74 |
| 75 |
| 76 |
| 77 |
| 78 |
| 79 |
| 80 | |
def chown(self, user, group, rec=0): |
""" change ownership to the given user and group. |
user and group may be specified by a number or |
by a name. if rec is True change ownership |
recursively. |
""" |
uid = getuserid(user) |
gid = getgroupid(group) |
if rec: |
for x in self.visit(rec=lambda x: x.check(link=0)): |
if x.check(link=0): |
self._callex(os.chown, str(x), uid, gid) |
self._callex(os.chown, str(self), uid, gid) | |
def common(self, other):
return the common part shared with the other path
or None if there is no common part.
source: path/common.py
| 170 |
| 171 |
| 172 |
| 173 |
| 174 |
| 175 |
| 176 |
| 177 |
| 178 |
| 179 | |
def common(self, other): |
""" return the common part shared with the other path |
or None if there is no common part. |
""" |
last = None |
for x, y in zip(self.parts(), other.parts()): |
if x != y: |
return last |
last = x |
return last | |
def computehash(self, hashtype='md5', chunksize=524288):
return hexdigest of hashvalue for this file.
arguments:
return value:
AnyOf(<String>, <None>)
source: path/local/local.py
| 80 |
| 81 |
| 82 |
| 83 |
| 84 |
| 85 |
| 86 |
| 87 |
| 88 |
| 89 |
| 90 |
| 91 | |
def computehash(self, hashtype="md5", chunksize=524288): |
""" return hexdigest of hashvalue for this file. """ |
hash = self._gethashinstance(hashtype) |
f = self.open('rb') |
try: |
while 1: |
buf = f.read(chunksize) |
if not buf: |
return hash.hexdigest() |
hash.update(buf) |
finally: |
f.close() | |
def copy(self, target, archive=False):
copy path to target.
arguments:
return value:
<None>
source: path/local/local.py
| 234 |
| 235 |
| 236 |
| 237 |
| 238 |
| 239 |
| 240 |
| 241 |
| 242 |
| 243 |
| 244 |
| 245 |
| 246 |
| 247 |
| 248 |
| 249 |
| 250 |
| 251 |
| 252 |
| 253 |
| 254 | |
def copy(self, target, archive=False): |
""" copy path to target.""" |
assert not archive, "XXX archive-mode not supported" |
if self.check(file=1): |
if target.check(dir=1): |
target = target.join(self.basename) |
assert self!=target |
copychunked(self, target) |
else: |
target.ensure(dir=1) |
def rec(p): |
return p.check(link=0) |
for x in self.visit(rec=rec): |
relpath = x.relto(self) |
newx = target.join(relpath) |
if x.check(link=1): |
newx.mksymlinkto(x.readlink()) |
elif x.check(file=1): |
copychunked(x, newx) |
elif x.check(dir=1): |
newx.ensure(dir=1) | |
def dirpath(self, *args, **kwargs):
return the directory Path of the current Path joined
with any given path arguments.
source: path/common.py
|
|
def dirpath(self, *args, **kwargs): |
""" return the directory Path of the current Path joined |
with any given path arguments. |
""" |
return self.new(basename='').join(*args, **kwargs) | |
def dump(self, obj, bin=1):
pickle object into path location
arguments:
return value:
<None>
source: path/local/local.py
| 260 |
| 261 |
| 262 |
| 263 |
| 264 |
| 265 |
| 266 | |
def dump(self, obj, bin=1): |
""" pickle object into path location""" |
f = self.open('wb') |
try: |
self._callex(py.std.cPickle.dump, obj, f, bin) |
finally: |
f.close() | |
def ensure(self, *args, **kwargs):
ensure that an args-joined path exists (by default as
a file). if you specify a keyword argument 'dir=True'
then the path is forced to be a directory path.
source: path/local/local.py
| 299 |
| 300 |
| 301 |
| 302 |
| 303 |
| 304 |
| 305 |
| 306 |
| 307 |
| 308 |
| 309 |
| 310 |
| 311 | |
def ensure(self, *args, **kwargs): |
""" ensure that an args-joined path exists (by default as |
a file). if you specify a keyword argument 'dir=True' |
then the path is forced to be a directory path. |
""" |
p = self.join(*args) |
if kwargs.get('dir', 0): |
return p._ensuredirs() |
else: |
p.dirpath()._ensuredirs() |
if not p.check(file=1): |
p.write("") |
return p | |
def get_temproot(cls):
return the system's temporary directory
(where tempfiles are usually created in)
source: path/local/local.py
|
|
def get_temproot(cls): |
""" return the system's temporary directory |
(where tempfiles are usually created in) |
""" |
return py.path.local(py.std.tempfile.gettempdir()) | |
def group(self):
return group name of file.
arguments:
return value:
<String>
def join(self, *args, **kwargs):
return a new path by appending all 'args' as path
components. if abs=1 is used restart from root if any
of the args is an absolute path.
source: path/local/local.py
| 167 |
| 168 |
| 169 |
| 170 |
| 171 |
| 172 |
| 173 |
| 174 |
| 175 |
| 176 |
| 177 |
| 178 |
| 179 |
| 180 |
| 181 |
| 182 |
| 183 |
| 184 |
| 185 |
| 186 |
| 187 |
| 188 |
| 189 |
| 190 |
| 191 |
| 192 |
| 193 |
| 194 |
| 195 | |
def join(self, *args, **kwargs): |
""" return a new path by appending all 'args' as path |
components. if abs=1 is used restart from root if any |
of the args is an absolute path. |
""" |
if not args: |
return self |
strpath = self.strpath |
sep = self.sep |
strargs = [str(x) for x in args] |
if kwargs.get('abs', 0): |
for i in range(len(strargs)-1, -1, -1): |
if os.path.isabs(strargs[i]): |
strpath = strargs[i] |
strargs = strargs[i+1:] |
break |
for arg in strargs: |
arg = arg.strip(sep) |
if py.std.sys.platform == 'win32': |
|
arg = arg.strip('/') |
arg = arg.replace('/', sep) |
if arg: |
if not strpath.endswith(sep): |
strpath += sep |
strpath += arg |
obj = self.new() |
obj.strpath = os.path.normpath(strpath) |
return obj | |
def listdir(self, fil=None, sort=None):
list directory contents, possibly filter by the given fil func
and possibly sorted.
arguments:
- self: <Instance of Class LocalPath>
- fil: AnyOf(<String>, <None>, <Function>, <Instance of Class checker>)
- sort: AnyOf(<Boolean>, <None>)
return value:
AnyOf(<List>, <None>)
source: path/local/local.py
| 209 |
| 210 |
| 211 |
| 212 |
| 213 |
| 214 |
| 215 |
| 216 |
| 217 |
| 218 |
| 219 |
| 220 |
| 221 |
| 222 |
| 223 |
| 224 | |
def listdir(self, fil=None, sort=None): |
""" list directory contents, possibly filter by the given fil func |
and possibly sorted. |
""" |
if isinstance(fil, str): |
fil = common.fnmatch(fil) |
res = [] |
for name in self._callex(os.listdir, self.strpath): |
childurl = self.join(name) |
if fil is None or fil(childurl): |
res.append(childurl) |
if callable(sort): |
res.sort(sort) |
elif sort: |
res.sort() |
return res | |
def load(self):
return object unpickled from self.read()
arguments:
return value:
<Dict>
source: path/common.py
| 359 |
| 360 |
| 361 |
| 362 |
| 363 |
| 364 |
| 365 |
| 366 | |
def load(self): |
""" return object unpickled from self.read() """ |
f = self.open('rb') |
try: |
from cPickle import load |
return self._callex(load, f) |
finally: |
f.close() | |
def lstat(self):
Return an os.lstat() tuple.
arguments:
return value:
AnyOf(<Instance of Class PosixStat>, <None>)
source: path/local/local.py
|
|
def lstat(self): |
""" Return an os.lstat() tuple. """ |
return self._makestat(self._callex(os.lstat, self.strpath)) | |
def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3, lock_timeout=172800):
return unique directory with a number greater than the current
maximum one. The number is assumed to start directly after prefix.
if keep is true directories with a number less than (maxnum-keep)
will be removed.
source: path/local/local.py
| 579 |
| 580 |
| 581 |
| 582 |
| 583 |
| 584 |
| 585 |
| 586 |
| 587 |
| 588 |
| 589 |
| 590 |
| 591 |
| 592 |
| 593 |
| 594 |
| 595 |
| 596 |
| 597 |
| 598 |
| 599 |
| 600 |
| 601 |
| 602 |
| 603 |
| 604 |
| 605 |
| 606 |
| 607 |
| 608 |
| 609 |
| 610 |
| 611 |
| 612 |
| 613 |
| 614 |
| 615 |
| 616 |
| 617 |
| 618 |
| 619 |
| 620 |
| 621 |
| 622 |
| 623 |
| 624 |
| 625 |
| 626 |
| 627 |
| 628 |
| 629 |
| 630 |
| 631 |
| 632 |
| 633 |
| 634 |
| 635 |
| 636 |
| 637 |
| 638 |
| 639 |
| 640 |
| 641 |
| 642 |
| 643 |
| 644 |
| 645 |
| 646 |
| 647 |
| 648 |
| 649 |
| 650 |
| 651 |
| 652 |
| 653 |
| 654 |
| 655 |
| 656 |
| 657 |
| 658 |
| 659 |
| 660 |
| 661 |
| 662 |
| 663 |
| 664 |
| 665 |
| 666 |
| 667 |
| 668 |
| 669 |
| 670 |
| 671 |
| 672 |
| 673 |
| 674 |
| 675 |
| 676 |
| 677 |
| 678 |
| 679 |
| 680 |
| 681 |
| 682 | |
def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3, |
lock_timeout = 172800): |
""" return unique directory with a number greater than the current |
maximum one. The number is assumed to start directly after prefix. |
if keep is true directories with a number less than (maxnum-keep) |
will be removed. |
""" |
if rootdir is None: |
rootdir = cls.get_temproot() |
|
|
def parse_num(path): |
""" parse the number out of a path (if it matches the prefix) """ |
bn = path.basename |
if bn.startswith(prefix): |
try: |
return int(bn[len(prefix):]) |
except ValueError: |
pass |
|
|
|
|
lastmax = None |
while True: |
maxnum = -1 |
for path in rootdir.listdir(): |
num = parse_num(path) |
if num is not None: |
maxnum = max(maxnum, num) |
|
|
|
try: |
udir = rootdir.mkdir(prefix + str(maxnum+1)) |
except py.error.EEXIST: |
|
|
if lastmax == maxnum: |
raise |
lastmax = maxnum |
continue |
break |
|
|
|
|
lockfile = udir.join('.lock') |
mypid = os.getpid() |
if hasattr(lockfile, 'mksymlinkto'): |
lockfile.mksymlinkto(str(mypid)) |
else: |
lockfile.write(str(mypid)) |
def try_remove_lockfile(): |
|
|
|
|
|
if os.getpid() != mypid: |
return |
try: |
lockfile.remove() |
except py.error.Error: |
pass |
atexit.register(try_remove_lockfile) |
|
|
|
if keep: |
for path in rootdir.listdir(): |
num = parse_num(path) |
if num is not None and num <= (maxnum - keep): |
lf = path.join('.lock') |
try: |
t1 = lf.lstat().mtime |
t2 = lockfile.lstat().mtime |
if abs(t2-t1) < lock_timeout: |
continue |
except py.error.Error: |
pass | |