HEX
Server: Apache/2.4.65 (Debian)
System: Linux web6 5.10.0-36-amd64 #1 SMP Debian 5.10.244-1 (2025-09-29) x86_64
User: innocamp (1028)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/lib/python3/dist-packages/rdiff_backup/eas_acls.py
# Copyright 2003 Ben Escoto
#
# This file is part of rdiff-backup.
#
# rdiff-backup is free software; you can redistribute it and/or modify
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# rdiff-backup is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with rdiff-backup; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA
"""Store and retrieve extended attributes and access control lists

Not all file systems will have EAs and ACLs, but if they do, store
this information in separate files in the rdiff-backup-data directory,
called extended_attributes.<time>.snapshot and
access_control_lists.<time>.snapshot.

"""

import base64
import errno
import re
import io
import os
try:
    import posix1e
except ImportError:
    pass
from . import Globals, connection, metadata, rorpiter, log, C, rpath, user_group  # noqa: F401

# When an ACL gets dropped, put name in dropped_acl_names.  This is
# only used so that only the first dropped ACL for any given name
# triggers a warning.
dropped_acl_names = {}


class ExtendedAttributes:
    """Hold a file's extended attribute information"""

    def __init__(self, index, attr_dict=None):
        """Initialize EA object with no attributes"""
        self.index = index
        if attr_dict is None:
            self.attr_dict = {}
        else:
            self.attr_dict = attr_dict

    def __eq__(self, ea):
        """Equal if all attributes are equal"""
        assert isinstance(ea, ExtendedAttributes)
        return ea.attr_dict == self.attr_dict

    def __ne__(self, ea):
        return not self.__eq__(ea)

    def get_indexpath(self):
        return self.index and b'/'.join(self.index) or b'.'

    def read_from_rp(self, rp):
        """Set the extended attributes from an rpath"""
        try:
            attr_list = rp.conn.xattr.list(rp.path, rp.issym())
        except IOError as exc:
            if exc.errno in (errno.EOPNOTSUPP, errno.EPERM, errno.ETXTBSY):
                return  # if not supported, consider empty
            if exc.errno in (errno.EACCES, errno.ENOENT, errno.ELOOP):
                log.Log("Warning: listattr(%s): %s" % (rp.get_safepath(), exc),
                        4)
                return
            raise
        for attr in attr_list:
            if attr.startswith(b'system.'):
                # Do not preserve system extended attributes
                continue
            if not rp.isdir() and attr == b'com.apple.ResourceFork':
                # Resource Fork handled elsewhere, except for directories
                continue
            try:
                self.attr_dict[attr] = \
                    rp.conn.xattr.get(rp.path, attr, rp.issym())
            except IOError as exc:
                # File probably modified while reading, just continue
                if exc.errno == errno.ENODATA:
                    continue
                elif exc.errno == errno.ENOENT:
                    break
                    # Handle bug in pyxattr < 0.2.2
                elif exc.errno == errno.ERANGE:
                    continue
                else:
                    raise

    def clear_rp(self, rp):
        """Delete all the extended attributes in rpath"""
        try:
            for name in rp.conn.xattr.list(rp.path, rp.issym()):
                try:
                    rp.conn.xattr.remove(rp.path, name, rp.issym())
                except PermissionError:  # errno.EACCES
                    # SELinux attributes cannot be removed, and we don't want
                    # to bail out or be too noisy at low log levels.
                    log.Log(
                        "Warning: unable to remove xattr %s from %s" %
                        (name, rp.get_safepath()), 7)
                    continue
                except OSError as exc:
                    # can happen because trusted.SGI_ACL_FILE is deleted together with
                    # system.posix_acl_access on XFS file systems.
                    if exc.errno == errno.ENODATA:
                        continue
                    else:  # can be anything, just fail
                        raise
        except io.UnsupportedOperation:  # errno.EOPNOTSUPP or errno.EPERM
            return  # if not supported, consider empty
        except FileNotFoundError as exc:
            log.Log(
                "Warning: unable to clear xattrs on %s: %s" %
                (rp.get_safepath(), exc), 3)
            return

    def write_to_rp(self, rp):
        """Write extended attributes to rpath rp"""
        self.clear_rp(rp)
        for (name, value) in self.attr_dict.items():
            try:
                rp.conn.xattr.set(rp.path, name, value, 0, rp.issym())
            except IOError as exc:
                # Mac and Linux attributes have different namespaces, so
                # fail gracefully if can't call xattr.set
                if exc.errno in (errno.EOPNOTSUPP, errno.EPERM, errno.EACCES,
                                 errno.ENOENT, errno.EINVAL):
                    log.Log(
                        "Warning: unable to write xattr %s to %s" %
                        (name, rp.get_safepath()), 6)
                    continue
                else:
                    raise

    def get(self, name):
        """Return attribute attached to given name"""
        return self.attr_dict[name]

    def set(self, name, value=b""):
        """Set given name to given value.  Does not write to disk"""
        self.attr_dict[name] = value

    def delete(self, name):
        """Delete value associated with given name"""
        del self.attr_dict[name]

    def empty(self):
        """Return true if no extended attributes are set"""
        return not self.attr_dict


def ea_compare_rps(rp1, rp2):
    """Return true if rp1 and rp2 have same extended attributes"""
    ea1 = ExtendedAttributes(rp1.index)
    ea1.read_from_rp(rp1)
    ea2 = ExtendedAttributes(rp2.index)
    ea2.read_from_rp(rp2)
    return ea1 == ea2


def EA2Record(ea):
    """Convert ExtendedAttributes object to text record"""
    str_list = [b'# file: %s' % C.acl_quote(ea.get_indexpath())]

    for (name, val) in ea.attr_dict.items():
        if not val:
            str_list.append(name)
        else:
            encoded_val = base64.b64encode(val)
            str_list.append(b'%s=0s%s' % (C.acl_quote(name), encoded_val))
    return b'\n'.join(str_list) + b'\n'


def Record2EA(record):
    """Convert text record to ExtendedAttributes object"""
    lines = record.split(b'\n')
    first = lines.pop(0)
    if not first[:8] == b'# file: ':
        raise metadata.ParsingError("Bad record beginning: %r" % first[:8])
    filename = first[8:]
    if filename == b'.':
        index = ()
    else:
        unquoted_filename = C.acl_unquote(filename)
        index = tuple(unquoted_filename.split(b'/'))
    ea = ExtendedAttributes(index)

    for line in lines:
        line = line.strip()
        if not line:
            continue
        assert line[0] != b'#', line
        eq_pos = line.find(b'=')
        if eq_pos == -1:
            ea.set(line)
        else:
            name = line[:eq_pos]
            assert line[eq_pos + 1:eq_pos + 3] == b'0s', \
                "Currently only base64 encoding supported"
            encoded_val = line[eq_pos + 3:]
            ea.set(name, base64.b64decode(encoded_val))
    return ea


class EAExtractor(metadata.FlatExtractor):
    """Iterate ExtendedAttributes objects from the EA information file"""
    record_boundary_regexp = re.compile(b'(?:\\n|^)(# file: (.*?))\\n')
    record_to_object = staticmethod(Record2EA)

    def filename_to_index(self, filename):
        """Convert possibly quoted filename to index tuple"""
        if filename == b'.':
            return ()
        else:
            return tuple(C.acl_unquote(filename).split(b'/'))


class ExtendedAttributesFile(metadata.FlatFile):
    """Store/retrieve EAs from extended_attributes file"""
    _prefix = b"extended_attributes"
    _extractor = EAExtractor
    _object_to_record = staticmethod(EA2Record)


def join_ea_iter(rorp_iter, ea_iter):
    """Update a rorp iter by adding the information from ea_iter"""

    def _safe_str(cmd):
        """Transform bytes into string without risk of conversion error"""
        if isinstance(cmd, str):
            return cmd
        else:
            return str(cmd, errors='replace')

    for rorp, ea in rorpiter.CollateIterators(rorp_iter, ea_iter):
        assert rorp, "Missing rorp for index '%s'." % _safe_str(ea.index)
        if not ea:
            ea = ExtendedAttributes(rorp.index)
        rorp.set_ea(ea)
        yield rorp


class AccessControlLists:
    """Hold a file's access control list information

    Since posix1e.ACL objects cannot be pickled, and because they lack
    user/group name information, store everything in self.entry_list
    and self.default_entry_list.

    """

    def __init__(self, index, acl_text=None):
        """Initialize object with index and possibly acl_text"""
        self.index = index
        if acl_text:
            self.set_from_text(acl_text)
        else:
            self.entry_list = self.default_entry_list = None

    def set_from_text(self, text):
        """Set self.entry_list and self.default_entry_list from text"""
        self.entry_list, self.default_entry_list = [], []
        for line in text.split('\n'):
            comment_pos = line.find('#')
            if comment_pos >= 0:
                line = line[:comment_pos]
            line = line.strip()
            if not line:
                continue

            if line.startswith('default:'):
                entrytuple = self.text_to_entrytuple(line[8:])
                self.default_entry_list.append(entrytuple)
            else:
                self.entry_list.append(self.text_to_entrytuple(line))

    def __str__(self):
        """Return text version of acls"""
        if not self.entry_list:
            return ""
        slist = list(map(self.entrytuple_to_text, self.entry_list))
        if self.default_entry_list:
            slist.extend([
                "default:" + self.entrytuple_to_text(e)
                for e in self.default_entry_list
            ])
        return "\n".join(slist)

    def entrytuple_to_text(self, entrytuple):
        """Return text version of entrytuple, as in getfacl"""
        tagchar, name_pair, perms = entrytuple
        if tagchar == "U":
            text = 'user::'
        elif tagchar == "u":
            uid, uname = name_pair
            text = 'user:%s:' % (uname or uid)
        elif tagchar == "G":
            text = 'group::'
        elif tagchar == "g":
            gid, gname = name_pair
            text = 'group:%s:' % (gname or gid)
        elif tagchar == "M":
            text = 'mask::'
        else:
            assert tagchar == "O", tagchar
            text = 'other::'

        permstring = '%s%s%s' % (perms & 4 and 'r' or '-', perms & 2 and 'w'
                                 or '-', perms & 1 and 'x' or '-')
        return text + permstring

    def text_to_entrytuple(self, text):
        """Return entrytuple given text like 'user:foo:r--'

        See the acl_to_list function for entrytuple documentation.

        """
        typetext, qualifier, permtext = text.split(':')
        if qualifier:
            try:
                uid = int(qualifier)
            except ValueError:
                namepair = (None, qualifier)
            else:
                namepair = (uid, None)

            if typetext == 'user':
                typechar = "u"
            else:
                assert typetext == 'group', (typetext, text)
                typechar = "g"
        else:
            namepair = None
            if typetext == 'user':
                typechar = "U"
            elif typetext == 'group':
                typechar = "G"
            elif typetext == 'mask':
                typechar = "M"
            else:
                assert typetext == 'other', (typetext, text)
                typechar = "O"

        assert len(permtext) == 3, (permtext, text)
        read, write, execute = permtext
        perms = ((read == 'r') << 2 | (write == 'w') << 1 | (execute == 'x'))
        return (typechar, namepair, perms)

    def cmp_entry_list(self, l1, l2):
        """True if the lists have same entries.  Assume preordered"""
        if not l1:
            return not l2
        if not l2 or len(l1) != len(l2):
            return 0
        for i in range(len(l1)):
            type1, namepair1, perms1 = l1[i]
            type2, namepair2, perms2 = l2[i]
            if type1 != type2 or perms1 != perms2:
                return 0
            if namepair1 == namepair2:
                continue
            if not namepair1 or not namepair2:
                return 0
            (id1, name1), (id2, name2) = namepair1, namepair2
            if name1:
                if name1 == name2:
                    continue
                else:
                    return 0
            if name2:
                return 0
            if id1 != id2:
                return 0
        return 1

    def __eq__(self, acl):
        """Compare self and other access control list

        Basic acl permissions are considered equal to an empty acl
        object.

        """
        assert isinstance(acl, self.__class__)
        if self.is_basic():
            return acl.is_basic()
        return (self.cmp_entry_list(self.entry_list, acl.entry_list)
                and self.cmp_entry_list(self.default_entry_list,
                                        acl.default_entry_list))

    def __ne__(self, acl):
        return not self.__eq__(acl)

    def eq_verbose(self, acl):
        """Returns same as __eq__ but print explanation if not equal"""
        if not self.cmp_entry_list(self.entry_list, acl.entry_list):
            print("ACL entries for %s compare differently" % (self.index, ))
            return 0
        if not self.cmp_entry_list(self.default_entry_list,
                                   acl.default_entry_list):
            print("Default ACL entries for %s do not compare" % (self.index, ))
            return 0
        return 1

    def get_indexpath(self):
        return self.index and b'/'.join(self.index) or b'.'

    def is_basic(self):
        """True if acl can be reduced to standard unix permissions

        Assume that if they are only three entries, they correspond to
        user, group, and other, and thus don't use any special ACL
        features.

        """
        if not self.entry_list and not self.default_entry_list:
            return 1
        assert len(self.entry_list) >= 3, self.entry_list
        return len(self.entry_list) == 3 and not self.default_entry_list

    def read_from_rp(self, rp):
        """Set self.ACL from an rpath, or None if not supported"""
        self.entry_list, self.default_entry_list = \
            rp.conn.eas_acls.get_acl_lists_from_rp(rp)

    def write_to_rp(self, rp, map_names=1):
        """Write current access control list to RPath rp"""
        rp.conn.eas_acls.set_rp_acl(rp, self.entry_list,
                                    self.default_entry_list, map_names)


def set_rp_acl(rp, entry_list=None, default_entry_list=None, map_names=1):
    """Set given rp with ACL that acl_text defines.  rp should be local"""
    assert rp.conn is Globals.local_connection
    if entry_list:
        acl = list_to_acl(entry_list, map_names)
    else:
        acl = posix1e.ACL()

    try:
        acl.applyto(rp.path)
    except IOError as exc:
        if exc.errno == errno.EOPNOTSUPP:
            log.Log(
                "Warning: unable to set ACL on %s: %s" % (rp.get_safepath(),
                                                          exc), 4)
            return
        else:
            raise

    if rp.isdir():
        if default_entry_list:
            def_acl = list_to_acl(default_entry_list, map_names)
        else:
            def_acl = posix1e.ACL()
        def_acl.applyto(rp.path, posix1e.ACL_TYPE_DEFAULT)


def get_acl_lists_from_rp(rp):
    """Returns (acl_list, def_acl_list) from an rpath.  Call locally"""
    assert rp.conn is Globals.local_connection
    try:
        acl = posix1e.ACL(file=rp.path)
    except (FileNotFoundError, UnicodeEncodeError) as exc:
        log.Log(
            "Warning: unable to read ACL from %s: %s" % (rp.get_safepath(),
                                                         exc), 3)
        acl = None
    except IOError as exc:
        if exc.errno == errno.EOPNOTSUPP:
            acl = None
        else:
            raise
    if rp.isdir():
        try:
            def_acl = posix1e.ACL(filedef=os.fsdecode(rp.path))
        except (FileNotFoundError, UnicodeEncodeError) as exc:
            log.Log(
                "Warning: unable to read default ACL from %s: %s" %
                (rp.get_safepath(), exc), 3)
            def_acl = None
        except IOError as exc:
            if exc.errno == errno.EOPNOTSUPP:
                def_acl = None
            else:
                raise
    else:
        def_acl = None
    return (acl and acl_to_list(acl), def_acl and acl_to_list(def_acl))


def acl_to_list(acl):
    """Return list representation of posix1e.ACL object

    ACL objects cannot be pickled, so this representation keeps
    the structure while adding that option.  Also we insert the
    username along with the id, because that information will be
    lost when moved to another system.

    The result will be a list of tuples.  Each tuple will have the
    form (acltype, (uid or gid, uname or gname) or None, permissions
    as an int).  acltype is encoded as a single character:

    U - ACL_USER_OBJ
    u - ACL_USER
    G - ACL_GROUP_OBJ
    g - ACL_GROUP
    M - ACL_MASK
    O - ACL_OTHER

    """

    def acltag_to_char(tag):
        if tag == posix1e.ACL_USER_OBJ:
            return "U"
        elif tag == posix1e.ACL_USER:
            return "u"
        elif tag == posix1e.ACL_GROUP_OBJ:
            return "G"
        elif tag == posix1e.ACL_GROUP:
            return "g"
        elif tag == posix1e.ACL_MASK:
            return "M"
        else:
            assert tag == posix1e.ACL_OTHER, tag
            return "O"

    def entry_to_tuple(entry):
        tagchar = acltag_to_char(entry.tag_type)
        if tagchar == "u":
            uid = entry.qualifier
            owner_pair = (uid, user_group.uid2uname(uid))
        elif tagchar == "g":
            gid = entry.qualifier
            owner_pair = (gid, user_group.gid2gname(gid))
        else:
            owner_pair = None

        perms = (entry.permset.read << 2 | entry.permset.write << 1
                 | entry.permset.execute)
        return (tagchar, owner_pair, perms)

    return list(map(entry_to_tuple, acl))


def list_to_acl(entry_list, map_names=1):
    """Return posix1e.ACL object from list representation

    If map_names is true, use user_group to update the names for the
    current system, and drop if not available.  Otherwise just use the
    same id.

    See the acl_to_list function for the format of an acllist.

    """

    def char_to_acltag(typechar):
        """Given typechar, query posix1e module for appropriate constant"""
        if typechar == "U":
            return posix1e.ACL_USER_OBJ
        elif typechar == "u":
            return posix1e.ACL_USER
        elif typechar == "G":
            return posix1e.ACL_GROUP_OBJ
        elif typechar == "g":
            return posix1e.ACL_GROUP
        elif typechar == "M":
            return posix1e.ACL_MASK
        else:
            assert typechar == "O", typechar
            return posix1e.ACL_OTHER

    def warn_drop(name):
        """Warn about acl with name getting dropped"""
        global dropped_acl_names
        if Globals.never_drop_acls:
            log.Log.FatalError(
                "--never-drop-acls specified but cannot map name\n"
                "%s occurring inside an ACL." % (name, ))
        if name in dropped_acl_names:
            return
        log.Log(
            "Warning: name %s not found on system, dropping ACL entry.\n"
            "Further ACL entries dropped with this name will not "
            "trigger further warnings" % (name, ), 2)
        dropped_acl_names[name] = name

    acl = posix1e.ACL()
    for typechar, owner_pair, perms in entry_list:
        id = None
        if owner_pair:
            if map_names:
                if typechar == "u":
                    id = user_group.acl_user_map(*owner_pair)
                else:
                    assert typechar == "g", (typechar, owner_pair, perms)
                    id = user_group.acl_group_map(*owner_pair)
                if id is None:
                    warn_drop(owner_pair[1])
                    continue
            else:
                assert owner_pair[0] is not None, (typechar, owner_pair, perms)
                id = owner_pair[0]

        entry = posix1e.Entry(acl)
        entry.tag_type = char_to_acltag(typechar)
        if id is not None:
            entry.qualifier = id
        entry.permset.read = perms >> 2
        entry.permset.write = perms >> 1 & 1
        entry.permset.execute = perms & 1
    return acl


def acl_compare_rps(rp1, rp2):
    """Return true if rp1 and rp2 have same acl information"""
    acl1 = AccessControlLists(rp1.index)
    acl1.read_from_rp(rp1)
    acl2 = AccessControlLists(rp2.index)
    acl2.read_from_rp(rp2)
    return acl1 == acl2


def ACL2Record(acl):
    """Convert an AccessControlLists object into a text record"""
    return b'# file: %b\n%b\n' % (C.acl_quote(acl.get_indexpath()), os.fsencode(str(acl)))


def Record2ACL(record):
    """Convert text record to an AccessControlLists object"""
    newline_pos = record.find(b'\n')
    first_line = record[:newline_pos]
    if not first_line.startswith(b'# file: '):
        raise metadata.ParsingError("Bad record beginning: %r" % first_line)
    filename = first_line[8:]
    if filename == b'.':
        index = ()
    else:
        unquoted_filename = C.acl_unquote(filename)
        index = tuple(unquoted_filename.split(b'/'))
    return AccessControlLists(index, os.fsdecode(record[newline_pos:]))


class ACLExtractor(EAExtractor):
    """Iterate AccessControlLists objects from the ACL information file

    Except for the record_to_object method, we can reuse everything in
    the EAExtractor class because the file formats are so similar.

    """
    record_to_object = staticmethod(Record2ACL)


class AccessControlListFile(metadata.FlatFile):
    """Store/retrieve ACLs from extended attributes file"""
    _prefix = b'access_control_lists'
    _extractor = ACLExtractor
    _object_to_record = staticmethod(ACL2Record)


def join_acl_iter(rorp_iter, acl_iter):
    """Update a rorp iter by adding the information from acl_iter"""
    for rorp, acl in rorpiter.CollateIterators(rorp_iter, acl_iter):
        assert rorp, "Missing rorp for index %s" % (acl.index, )
        if not acl:
            acl = AccessControlLists(rorp.index)
        rorp.set_acl(acl)
        yield rorp


def rpath_acl_get(rp):
    """Get acls of given rpath rp.

    This overrides a function in the rpath module.

    """
    acl = AccessControlLists(rp.index)
    if not rp.issym():
        acl.read_from_rp(rp)
    return acl


rpath.acl_get = rpath_acl_get


def rpath_get_blank_acl(index):
    """Get a blank AccessControlLists object (override rpath function)"""
    return AccessControlLists(index)


rpath.get_blank_acl = rpath_get_blank_acl


def rpath_ea_get(rp):
    """Get extended attributes of given rpath

    This overrides a function in the rpath module.

    """
    ea = ExtendedAttributes(rp.index)
    if not rp.issock() and not rp.isfifo():
        ea.read_from_rp(rp)
    return ea


rpath.ea_get = rpath_ea_get


def rpath_get_blank_ea(index):
    """Get a blank ExtendedAttributes object (override rpath function)"""
    return ExtendedAttributes(index)


rpath.get_blank_ea = rpath_get_blank_ea