class SvnCommandPath(SvnPathBase):
path implementation that offers access to (possibly remote) subversion
repositories.
class attributes and properties:
basename: <property object (dynamically calculated value)>
ext: <property object (dynamically calculated value)>
purebasename: <property object (dynamically calculated value)>
sep: /
url: <property object (dynamically calculated value)>
methods:
def check(self, **kw):
check a path for existence, or query its properties
without arguments, this returns True if the path exists (on the
filesystem), False if not
with (keyword only) arguments, the object compares the value
of the argument with the value of a property with the same name
(if it has one, else it raises a TypeError)
when for example the keyword argument 'ext' is '.py', this will
return True if self.ext == '.py', False otherwise
arguments:
return value:
<Boolean>
source: path/common.py
| 96 |
| 97 |
| 98 |
| 99 |
| 100 |
| 101 |
| 102 |
| 103 |
| 104 |
| 105 |
| 106 |
| 107 |
| 108 |
| 109 |
| 110 |
| 111 |
| 112 |
| 113 |
| 114 |
| 115 | |
def check(self, **kw): |
""" check a path for existence, or query its properties |
|
without arguments, this returns True if the path exists (on the |
filesystem), False if not |
|
with (keyword only) arguments, the object compares the value |
of the argument with the value of a property with the same name |
(if it has one, else it raises a TypeError) |
|
when for example the keyword argument 'ext' is '.py', this will |
return True if self.ext == '.py', False otherwise |
""" |
if kw: |
kw = kw.copy() |
if not checktype(self, kw): |
return False |
else: |
kw = {'exists' : 1} |
return self.Checkers(self)._evaluate(kw) | |
def common(self, other):
return the common part shared with the other path
or None if there is no common part.
source: path/common.py
| 170 |
| 171 |
| 172 |
| 173 |
| 174 |
| 175 |
| 176 |
| 177 |
| 178 |
| 179 | |
def common(self, other): |
""" return the common part shared with the other path |
or None if there is no common part. |
""" |
last = None |
for x, y in zip(self.parts(), other.parts()): |
if x != y: |
return last |
last = x |
return last | |
def copy(self, target, msg='copied by py lib invocation'):
copy path to target with checkin message msg.
arguments:
return value:
<None>
source: path/svn/urlcommand.py
| 139 |
| 140 |
| 141 |
| 142 |
| 143 |
| 144 |
| 145 | |
def copy(self, target, msg='copied by py lib invocation'): |
""" copy path to target with checkin message msg.""" |
if getattr(target, 'rev', None) is not None: |
raise py.error.EINVAL(target, "revisions are immutable") |
self._svncmdexecauth('svn copy -m "%s" "%s" "%s"' %(msg, |
self._escape(self), self._escape(target))) |
self._norev_delentry(target.dirpath()) | |
def dirpath(self, *args, **kwargs):
return the directory path of the current path joined
with any given path arguments.
source: path/svn/urlcommand.py
| 117 |
| 118 |
| 119 |
| 120 |
| 121 |
| 122 |
| 123 |
| 124 |
| 125 |
| 126 |
| 127 | |
def dirpath(self, *args, **kwargs): |
""" return the directory path of the current path joined |
with any given path arguments. |
""" |
l = self.strpath.split(self.sep) |
if len(l) < 4: |
raise py.error.EINVAL(self, "base is not valid") |
elif len(l) == 4: |
return self.join(*args, **kwargs) |
else: |
return self.new(basename='').join(*args, **kwargs) | |
def ensure(self, *args, **kwargs):
ensure that an args-joined path exists (by default as
a file). If you specify a keyword argument 'dir=True'
then the path is forced to be a directory path.
source: path/svn/urlcommand.py
| 178 |
| 179 |
| 180 |
| 181 |
| 182 |
| 183 |
| 184 |
| 185 |
| 186 |
| 187 |
| 188 |
| 189 |
| 190 |
| 191 |
| 192 |
| 193 |
| 194 |
| 195 |
| 196 |
| 197 |
| 198 |
| 199 |
| 200 |
| 201 |
| 202 |
| 203 |
| 204 |
| 205 |
| 206 |
| 207 |
| 208 |
| 209 | |
def ensure(self, *args, **kwargs): |
""" ensure that an args-joined path exists (by default as |
a file). If you specify a keyword argument 'dir=True' |
then the path is forced to be a directory path. |
""" |
if getattr(self, 'rev', None) is not None: |
raise py.error.EINVAL(self, "revisions are immutable") |
target = self.join(*args) |
dir = kwargs.get('dir', 0) |
for x in target.parts(reverse=True): |
if x.check(): |
break |
else: |
raise py.error.ENOENT(target, "has not any valid base!") |
if x == target: |
if not x.check(dir=dir): |
raise dir and py.error.ENOTDIR(x) or py.error.EISDIR(x) |
return x |
tocreate = target.relto(x) |
basename = tocreate.split(self.sep, 1)[0] |
tempdir = py.path.local.mkdtemp() |
try: |
tempdir.ensure(tocreate, dir=dir) |
cmd = 'svn import -m "%s" "%s" "%s"' % ( |
"ensure %s" % self._escape(tocreate), |
self._escape(tempdir.join(basename)), |
x.join(basename)._encodedurl()) |
self._svncmdexecauth(cmd) |
self._norev_delentry(x) |
finally: |
tempdir.remove() |
return target | |
def export(self, topath):
export to a local path
topath should not exist prior to calling this, returns a
py.path.local instance
source: path/svn/urlcommand.py
| 164 |
| 165 |
| 166 |
| 167 |
| 168 |
| 169 |
| 170 |
| 171 |
| 172 |
| 173 |
| 174 |
| 175 |
| 176 | |
def export(self, topath): |
""" export to a local path |
|
topath should not exist prior to calling this, returns a |
py.path.local instance |
""" |
topath = py.path.local(topath) |
args = ['"%s"' % (self._escape(self),), |
'"%s"' % (self._escape(topath),)] |
if self.rev is not None: |
args = ['-r', str(self.rev)] + args |
self._svncmdexecauth('svn export %s' % (' '.join(args),)) |
return topath | |
def info(self):
return an Info structure with svn-provided information.
arguments:
return value:
AnyOf(<Instance of Class InfoSvnCommand>, <None>)
source: path/svn/svncommon.py
| 179 |
| 180 |
| 181 |
| 182 |
| 183 |
| 184 |
| 185 |
| 186 |
| 187 | |
def info(self): |
""" return an Info structure with svn-provided information. """ |
parent = self.dirpath() |
nameinfo_seq = parent._listdir_nameinfo() |
bn = self.basename |
for name, info in nameinfo_seq: |
if name == bn: |
return info |
raise py.error.ENOENT(self) | |
def join(self, *args):
return a new Path (with the same revision) which is composed
of the self Path followed by 'args' path components.
source: path/svn/svncommon.py
| 134 |
| 135 |
| 136 |
| 137 |
| 138 |
| 139 |
| 140 |
| 141 |
| 142 |
| 143 |
| 144 | |
def join(self, *args): |
""" return a new Path (with the same revision) which is composed |
of the self Path followed by 'args' path components. |
""" |
if not args: |
return self |
|
|
args = tuple([arg.strip(self.sep) for arg in args]) |
parts = (self.strpath, ) + args |
newpath = self.__class__(self.sep.join(parts), self.rev, self.auth) |
return newpath | |
def listdir(self, fil=None, sort=None):
list directory contents, possibly filter by the given fil func
and possibly sorted.
arguments:
- self: <Instance of Class SvnCommandPath>
- fil: AnyOf(<String>, <None>, <Instance of Class checker>)
- sort: AnyOf(<Boolean>, <None>)
return value:
AnyOf(<None>, <Tuple>, <List>)
source: path/svn/svncommon.py
| 156 |
| 157 |
| 158 |
| 159 |
| 160 |
| 161 |
| 162 |
| 163 |
| 164 |
| 165 |
| 166 |
| 167 |
| 168 |
| 169 |
| 170 |
| 171 |
| 172 |
| 173 |
| 174 |
| 175 |
| 176 |
| 177 | |
def listdir(self, fil=None, sort=None): |
""" list directory contents, possibly filter by the given fil func |
and possibly sorted. |
""" |
if isinstance(fil, str): |
fil = common.fnmatch(fil) |
nameinfo_seq = self._listdir_nameinfo() |
if len(nameinfo_seq) == 1: |
name, info = nameinfo_seq[0] |
if name == self.basename and info.kind == 'file': |
|
raise py.error.ENOTDIR(self) |
paths = self._make_path_tuple(nameinfo_seq) |
|
|
if fil or sort: |
paths = filter(fil, paths) |
paths = isinstance(paths, list) and paths or list(paths) |
if callable(sort): |
paths.sort(sort) |
elif sort: |
paths.sort() |
return paths | |
def load(self):
return object unpickled from self.read()
arguments:
return value:
<Dict>
source: path/common.py
| 359 |
| 360 |
| 361 |
| 362 |
| 363 |
| 364 |
| 365 |
| 366 | |
def load(self): |
""" return object unpickled from self.read() """ |
f = self.open('rb') |
try: |
from cPickle import load |
return self._callex(load, f) |
finally: |
f.close() | |
def log(self, rev_start=None, rev_end=1, verbose=False):
return a list of LogEntry instances for this path.
rev_start is the starting revision (defaulting to the first one).
rev_end is the last revision (defaulting to HEAD).
if verbose is True, then the LogEntry instances also know which files changed.
arguments:
return value:
<List>
source: path/svn/urlcommand.py
| 256 |
| 257 |
| 258 |
| 259 |
| 260 |
| 261 |
| 262 |
| 263 |
| 264 |
| 265 |
| 266 |
| 267 |
| 268 |
| 269 |
| 270 |
| 271 |
| 272 |
| 273 |
| 274 |
| 275 |
| 276 |
| 277 |
| 278 |
| 279 | |
def log(self, rev_start=None, rev_end=1, verbose=False): |
""" return a list of LogEntry instances for this path. |
rev_start is the starting revision (defaulting to the first one). |
rev_end is the last revision (defaulting to HEAD). |
if verbose is True, then the LogEntry instances also know which files changed. |
""" |
assert self.check() |
rev_start = rev_start is None and _Head or rev_start |
rev_end = rev_end is None and _Head or rev_end |
|
|
if rev_start is _Head and rev_end == 1: |
rev_opt = "" |
else: |
rev_opt = "-r %s:%s" % (rev_start, rev_end) |
verbose_opt = verbose and "-v" or "" |
xmlpipe = self._svnpopenauth('svn log --xml %s %s "%s"' % |
(rev_opt, verbose_opt, self.strpath)) |
from xml.dom import minidom |
tree = minidom.parse(xmlpipe) |
result = [] |
for logentry in filter(None, tree.firstChild.childNodes): |
if logentry.nodeType == logentry.ELEMENT_NODE: |
result.append(LogEntry(logentry)) |
return result | |
def mkdir(self, *args, **kwargs):
create & return the directory joined with args. You can provide
a checkin message by giving a keyword argument 'msg'
source: path/svn/urlcommand.py
| 130 |
| 131 |
| 132 |
| 133 |
| 134 |
| 135 |
| 136 |
| 137 | |
def mkdir(self, *args, **kwargs): |
""" create & return the directory joined with args. You can provide |
a checkin message by giving a keyword argument 'msg'""" |
commit_msg=kwargs.get('msg', "mkdir by py lib invocation") |
createpath = self.join(*args) |
createpath._svnwrite('mkdir', '-m', commit_msg) |
self._norev_delentry(createpath.dirpath()) |
return createpath | |
def move(self, target):
move this path to target.
arguments:
return value:
<None>
source: path/common.py
| 368 |
| 369 |
| 370 |
| 371 |
| 372 |
| 373 |
| 374 |
| 375 |
| 376 | |
def move(self, target): |
""" move this path to target. """ |
if target.relto(self): |
raise py.error.EINVAL(target, "cannot move path into a subdirectory of itself") |
try: |
self.rename(target) |
except py.error.EXDEV: |
self.copy(target) |
self.remove() | |
def mtime(self):
Return the last modification time of the file.
arguments:
return value:
<Int>
def new(self, **kw):
create a modified version of this path. A 'rev' argument
indicates a new revision.
the following keyword arguments modify various path parts:
http://host.com/repo/path/file.ext
|-----------------------| dirname
|------| basename
|--| purebasename
|--| ext
source: path/svn/svncommon.py
| 56 |
| 57 |
| 58 |
| 59 |
| 60 |
| 61 |
| 62 |
| 63 |
| 64 |
| 65 |
| 66 |
| 67 |
| 68 |
| 69 |
| 70 |
| 71 |
| 72 |
| 73 |
| 74 |
| 75 |
| 76 |
| 77 |
| 78 |
| 79 |
| 80 |
| 81 |
| 82 |
| 83 |
| 84 |
| 85 |
| 86 |
| 87 |
| 88 | |
def new(self, **kw): |
""" create a modified version of this path. A 'rev' argument |
indicates a new revision. |
the following keyword arguments modify various path parts: |
|
http://host.com/repo/path/file.ext |
|-----------------------| dirname |
|------| basename |
|--| purebasename |
|--| ext |
""" |
obj = object.__new__(self.__class__) |
obj.rev = kw.get('rev', self.rev) |
obj.auth = kw.get('auth', self.auth) |
dirname, basename, purebasename, ext = self._getbyspec( |
"dirname,basename,purebasename,ext") |
if 'basename' in kw: |
if 'purebasename' in kw or 'ext' in kw: |
raise ValueError("invalid specification %r" % kw) |
else: |
pb = kw.setdefault('purebasename', purebasename) |
ext = kw.setdefault('ext', ext) |
if ext and not ext.startswith('.'): |
ext = '.' + ext |
kw['basename'] = pb + ext |
|
|
kw.setdefault('dirname', dirname) |
kw.setdefault('sep', self.sep) |
if kw['basename']: |
obj.strpath = "%(dirname)s%(sep)s%(basename)s" % kw |
else: |
obj.strpath = "%(dirname)s" % kw |
return obj | |
def open(self, mode='r'):
return an opened file with the given mode.
arguments:
return value:
<Instance of Class file>
source: path/svn/urlcommand.py
| 106 |
| 107 |
| 108 |
| 109 |
| 110 |
| 111 |
| 112 |
| 113 |
| 114 |
| 115 | |
def open(self, mode='r'): |
""" return an opened file with the given mode. """ |
assert 'w' not in mode and 'a' not in mode, "XXX not implemented for svn cmdline" |
assert self.check(file=1) |
if self.rev is None: |
return self._svnpopenauth('svn cat "%s"' % ( |
self._escape(self.strpath), )) |
else: |
return self._svnpopenauth('svn cat -r %s "%s"' % ( |
self.rev, self._escape(self.strpath))) | |
def parts(self, reverse=False):
return a root-first list of all ancestor directories
plus the path itself.
arguments:
return value:
<List>
source: path/common.py
| 154 |
| 155 |
| 156 |
| 157 |
| 158 |
| 159 |
| 160 |
| 161 |
| 162 |
| 163 |
| 164 |
| 165 |
| 166 |
| 167 |
| 168 | |
def parts(self, reverse=False): |
""" return a root-first list of all ancestor directories |
plus the path itself. |
""" |
current = self |
l = [self] |
while 1: |
last = current |
current = current.dirpath() |
if last == current: |
break |
l.insert(0, current) |
if reverse: |
l.reverse() |
return l | |
def propget(self, name):
return the content of the given property.
arguments:
return value:
<String>
source: path/svn/svncommon.py
|
|
def propget(self, name): |
""" return the content of the given property. """ |
value = self._propget(name) |
return value | |
def proplist(self):
list all property names.
arguments:
return value:
<Instance of Class PropListDict>
def read(self, mode='rb'):
read and return a bytestring from reading the path.
arguments:
return value:
<String>
source: path/common.py
| 334 |
| 335 |
| 336 |
| 337 |
| 338 |
| 339 |
| 340 |
| 341 |
| 342 |
| 343 |
| 344 | |
def read(self, mode='rb'): |
""" read and return a bytestring from reading the path. """ |
if py.std.sys.version_info < (2,3): |
for x in 'u', 'U': |
if x in mode: |
mode = mode.replace(x, '') |
f = self.open(mode) |
try: |
return f.read() |
finally: |
f.close() | |
def readlines(self, cr=