From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) by finch.gentoo.org (Postfix) with ESMTP id A58071381F3 for ; Tue, 10 Sep 2013 14:41:14 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id F34BAE0B15; Tue, 10 Sep 2013 14:40:49 +0000 (UTC) Received: from smtp.gentoo.org (smtp.gentoo.org [140.211.166.183]) (using TLSv1 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id 5FAE2E0B15 for ; Tue, 10 Sep 2013 14:40:44 +0000 (UTC) Received: from hornbill.gentoo.org (hornbill.gentoo.org [94.100.119.163]) (using TLSv1 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by smtp.gentoo.org (Postfix) with ESMTPS id 4C1D533EC11 for ; Tue, 10 Sep 2013 14:40:43 +0000 (UTC) Received: from localhost.localdomain (localhost [127.0.0.1]) by hornbill.gentoo.org (Postfix) with ESMTP id E8999E5469 for ; Tue, 10 Sep 2013 14:40:40 +0000 (UTC) From: "André Erdmann" To: gentoo-commits@lists.gentoo.org Content-Transfer-Encoding: 8bit Content-type: text/plain; charset=UTF-8 Reply-To: gentoo-dev@lists.gentoo.org, "André Erdmann" Message-ID: <1378823377.bed92a84ba5816f9db6320d335c08db2a47f73e4.dywi@gentoo> Subject: [gentoo-commits] proj/R_overlay:master commit in: roverlay/ X-VCS-Repository: proj/R_overlay X-VCS-Files: roverlay/fsutil.py X-VCS-Directories: roverlay/ X-VCS-Committer: dywi X-VCS-Committer-Name: André Erdmann X-VCS-Revision: bed92a84ba5816f9db6320d335c08db2a47f73e4 X-VCS-Branch: master Date: Tue, 10 Sep 2013 14:40:40 +0000 (UTC) Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-commits@lists.gentoo.org X-Archives-Salt: 4f31347d-dab9-4a16-b386-db022d068754 X-Archives-Hash: 417171514d95099121c5a97b0b646945 commit: bed92a84ba5816f9db6320d335c08db2a47f73e4 Author: André Erdmann mailerd de> AuthorDate: Tue Sep 10 14:28:24 2013 +0000 Commit: André Erdmann mailerd de> CommitDate: Tue Sep 10 14:29:37 2013 +0000 URL: http://git.overlays.gentoo.org/gitweb/?p=proj/R_overlay.git;a=commit;h=bed92a84 fsutil: perform/simulate filesystem operations rudimentary support for common fs operations. The VirtualFsOps class can be used to print operations to a stream (stdout). --- roverlay/fsutil.py | 416 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 416 insertions(+) diff --git a/roverlay/fsutil.py b/roverlay/fsutil.py new file mode 100644 index 0000000..5efac20 --- /dev/null +++ b/roverlay/fsutil.py @@ -0,0 +1,416 @@ +# R overlay -- +# -*- coding: utf-8 -*- +# Copyright (C) 2013 André Erdmann +# Distributed under the terms of the GNU General Public License; +# either version 2 of the License, or (at your option) any later version. + +import errno +import os +import pwd +import stat +import sys + +import roverlay.util.common +import roverlay.util.objects + +_OS_CHOWN = getattr ( os, 'lchown', os.chown ) +_OS_CHMOD = getattr ( os, 'lchmod', os.chmod ) + + +def readlink_f ( fspath ): + try: + f = os.readlink ( fspath ) + except OSError as oserr: + if oserr.errno in { errno.ENOENT, errno.EINVAL }: + f = fspath + else: + raise + + return os.path.abspath ( f ) +# --- end of readlink_f (...) --- + +def create_subdir_check ( parent, fs_sep=os.sep ): + PARENT_PATH = parent.rstrip ( fs_sep ).split ( fs_sep ) + + def is_subdir ( dirpath ): + return all ( + this == expect for this, expect in zip ( + dirpath.rstrip ( fs_sep ).split ( fs_sep ), + PARENT_PATH + ) + ) + # --- end of is_subdir (...) --- + + return is_subdir +# --- end of create_subdir_check (...) --- + + +def pwd_expanduser ( fspath, uid ): + if not fspath or fspath[0] != '~': + return fspath + elif len ( fspath ) < 2: + return pwd.getpwuid ( uid ).pw_dir + elif fspath[1] == os.sep: + return pwd.getpwuid ( uid ).pw_dir + fspath[1:] + else: + return fspath +# --- end of pwd_expanduser (...) --- + +def get_bitwise_sum ( iterable, initial_value=None ): + ret = initial_value if initial_value is not None else 0 + for item in iterable: + ret |= item + return ret +# --- end of get_bitwise_sum (...) --- + +def get_stat_mode ( mode_str ): + + def iter_mode_values ( mode_str ): + # rwxrwxrwx + # 012345678 + # r -> pos % 3 == 0 + # w -> pos % 3 == 1 + # x -> pos % 3 == 2 + + + # COULDFIX: parse sticky bit etc. (not necessary, currently) + if len ( mode_str ) > 9: + raise ValueError ( mode_str ) + + for pos, char in enumerate ( mode_str ): + if char != '-': + block = pos // 3 + subpos = pos % 3 + if subpos == 0: + # r + assert char == 'r' + if block == 0: + yield stat.S_IRUSR + elif block == 1: + yield stat.S_IRGRP + else: + yield stat.S_IROTH + + elif subpos == 1: + # w + assert char == 'w' + if block == 0: + yield stat.S_IWUSR + elif block == 1: + yield stat.S_IWGRP + else: + yield stat.S_IWOTH + + elif subpos == 2: + # x + assert char == 'x' + if block == 0: + yield stat.S_IXUSR + elif block == 1: + yield stat.S_IXGRP + else: + yield stat.S_IXOTH + + # --- end of iter_mode_values (...) --- + + return get_bitwise_sum ( iter_mode_values ( mode_str ) ) +# --- end of get_stat_mode (...) --- + +class ChownChmod ( object ): + + def __init__ ( self, uid=None, gid=None, mode=None, pretend=False ): + super ( ChownChmod, self ).__init__() + + self.pretend = bool ( pretend ) + do_chown = uid is not None or gid is not None + + self.uid = -1 if uid is None else uid + self.gid = -1 if gid is None else gid + + if do_chown: + self.chown_str = "chown {uid:d}:{gid:d} {{}}".format ( + uid=self.uid, gid=self.gid + ) + if self.pretend: + self.chown = self.chown_str.format + else: + self.chown = self._do_chown + else: + self.chown_str = "NO CHOWN {}" + self.chown = self._nullfunc + + + if mode is None: + self.mode = None + elif isinstance ( mode, str ): + self.mode = get_stat_mode ( mode ) + else: + self.mode = mode + + if self.mode is None: + self.chmod_str = "NO CHMOD {}" + self.chmod = self._nullfunc + else: + self.chmod_str ="chmod {mode:o} {{}}".format ( mode=self.mode ) + if self.pretend: + self.chmod = self.chmod_str.format + else: + self.chmod = self._do_chmod + # --- end of __init__ (...) --- + + def _nullfunc ( self, fspath ): + return None + + def _do_chown ( self, fspath, _chown=_OS_CHOWN ): + _chown ( fspath, self.uid, self.gid ) + return self.chown_str.format ( fspath ) + + def _do_chmod ( self, fspath, _chmod=_OS_CHMOD ): + _chmod ( fspath, self.mode ) + return self.chmod_str.format ( fspath ) + + def chown_chmod ( self, fspath ): + # should be renamed to chmod_chown() + return ( + self.chmod ( fspath ), + self.chown ( fspath ) + ) + # --- end of chown_chmod (...) --- + + def chown_chmod_recursive ( self, root ): + chown = self.chown + chmod = self.chmod + + if os.path.isfile ( root ): + yield chmod ( root ) + yield chown ( root ) + + else: + for current_root, dirnames, filenames in os.walk ( root ): + yield chmod ( current_root ) + yield chown ( current_root ) + + for filename in filenames: + fpath = current_root + os.sep + filename + yield chmod ( fpath ) + yield chown ( fpath ) + # --- end of chown_chmod_recursive (...) --- + +# --- end of ChownChmod --- + + +def chown_chmod ( root, uid=None, gid=None, mode=None, pretend=False ): + return ChownChmod ( uid, gid, mode, pretend ).chown_chmod ( root ) +# --- end of chown_chmod (...) --- + +def chown_chmod_recursive ( + root, uid=None, gid=None, mode=None, pretend=False +): + return ChownChmod ( + uid, gid, mode, pretend ).chown_chmod_recursive ( root ) +# --- end of chown_chmod_recursive (...) --- + + +class AbstractFsOperations ( object ): + + PRETEND = None + + def __init__ ( self, + stdout=None, stderr=None, uid=None, gid=None, mode=None + ): + if self.__class__.PRETEND is None: + raise AssertionError ( "derived classes have to set PRETEND." ) + + super ( AbstractFsOperations, self ).__init__() + self.perm_env = ChownChmod ( + uid=uid, gid=gid, mode=mode, pretend=self.__class__.PRETEND + ) + self._stdout = sys.stdout if stdout is None else stdout + self._stderr = sys.stderr if stderr is None else stderr + + self.info = self._stdout.write + self.error = self._stderr.write + # --- end of __init__ (...) --- + + @roverlay.util.objects.abstractmethod + def _dodir ( self, dirpath, mkdir_p ): + pass + + @roverlay.util.objects.abstractmethod + def do_touch ( self, fspath ): + pass + + @roverlay.util.objects.abstractmethod + def chown ( self, fspath ): + pass + + @roverlay.util.objects.abstractmethod + def chmod ( self, fspath ): + pass + + def chmod_chown ( self, fspath ): + self.chmod ( fspath ) + self.chown ( fspath ) + + @roverlay.util.objects.abstractmethod + def chmod_chown_recursive ( self, root ): + pass + + def dodir ( self, dirpath, mkdir_p=True, chown=True, chmod=True ): + if self._dodir ( dirpath, mkdir_p=mkdir_p ): + if chmod: + self.chmod ( dirpath ) + if chown: + self.chown ( dirpath ) + + return True + + else: + return False + # --- end of dodir (...) --- + + def dodirs ( self, *dirs, **kwargs ): + for dirpath in dirs: + self.dodir ( dirpath, **kwargs ) + # --- end of dodirs (...) --- + + @roverlay.util.objects.abstractmethod + def rmdir ( self, dirpath ): + pass + + @roverlay.util.objects.abstractmethod + def unlink ( self, fspath ): + pass + + def wipe ( self, fspath ): + return self.rmdir ( fspath ) or self.unlink ( fspath ) + + @roverlay.util.objects.abstractmethod + def symlink ( self, source, link_name ): + pass + + def check_writable ( self, + fspath, mkdir_chown=False, mkdir_chmod=False, mkdir_p=True + ): + """Checks whether fspath can be written. This creates all necessary + directories.""" + success = False + + try: + if self.do_touch ( fspath ): + success = True + + except IOError as ioerr: + if ioerr.errno == errno.ENOENT: + try: + if self.dodir ( + os.path.dirname ( fspath ), + chown=mkdir_chown, chmod=mkdir_chmod, mkdir_p=mkdir_p + ) and self.do_touch ( fspath ): + success = True + + except ( OSError, IOError ) as err: + if err.errno != errno.EPERM: + raise + + return success + # --- end of check_writable (...) --- + +# --- end of AbstractFsOperations --- + +class FsOperations ( AbstractFsOperations ): + + PRETEND = False + + def _dodir ( self, dirpath, mkdir_p ): + return roverlay.util.common.dodir ( + dirpath, mkdir_p=mkdir_p, log_exception=False + ) + # --- end of _dodir (...) --- + + def chmod ( self, fspath ): + self.perm_env.chmod ( fspath ) + + def chown ( self, fspath ): + self.perm_env.chown ( fspath ) + + def chmod_chown ( self, fspath ): + self.perm_env.chown_chmod ( fspath ) + + def chmod_chown_recursive ( self, root ): + for ret in self.perm_env.chown_chmod_recursive ( root ): + pass + + def rmdir ( self, dirpath ): + try: + os.rmdir ( dirpath ) + except OSError as oserr: + return oserr.errno == errno.ENOENT + else: + return True + # --- end of rmdir (...) --- + + def unlink ( self, fspath ): + try: + os.unlink ( fspath ) + except OSError as oserr: + return oserr.errno == errno.ENOENT + else: + return True + + def symlink ( self, source, link_name ): + try: + os.symlink ( source, link_name ) + except OSError: + return False + else: + return True + + def do_touch ( self, fspath ): + if not os.path.lexists ( fspath ): + with open ( fspath, 'a' ) as FH: + pass + return True + + +class VirtualFsOperations ( AbstractFsOperations ): + + PRETEND = True + + def _dodir ( self, dirpath, mkdir_p ): + if mkdir_p: + self.info ( "mkdir -p {!s}\n".format ( dirpath ) ) + else: + self.info ( "mkdir {!s}\n".format ( dirpath ) ) + return True + + def chown ( self, fspath ): + ret = self.perm_env.chown ( fspath ) + if ret is not None: + self.info ( ret + "\n" ) + + def chmod ( self, fspath ): + ret = self.perm_env.chmod ( fspath ) + if ret is not None: + self.info ( ret + "\n" ) + + def chmod_chown_recursive ( self, root ): + for word in self.perm_env.chmod_chown_recursive ( root ): + if word is not None: + self.info ( word + "\n" ) + + def unlink ( self, fspath ): + self.info ( "rm {!r}\n".format ( fspath ) ) + return True + + def rmdir ( self, dirpath ): + self.info ( "rmdir {!r}\n".format ( dirpath ) ) + return True + + def symlink ( self, source, link_name ): + self.info ( "ln -s {} {}\n".format ( source, link_name ) ) + return True + + def do_touch ( self, fspath ): + self.info ( "touch {}\n".format ( fspath ) ) + return True