HEX
Server: Apache/2.4.65 (Debian)
System: Linux web6 5.10.0-36-amd64 #1 SMP Debian 5.10.244-1 (2025-09-29) x86_64
User: innocamp (1028)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/lib/python3/dist-packages/rdiff_backup/win_acls.py
# Copyright 2008 Fred Gansevles <fred@betterbe.com>
#
# This file is part of rdiff-backup.
#
# rdiff-backup is free software; you can redistribute it and/or modify
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# rdiff-backup is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with rdiff-backup; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA

import re
import os
from . import C, metadata, rorpiter, rpath, log

try:
    from win32security import (
        AdjustTokenPrivileges,
        DACL_SECURITY_INFORMATION,
        GetTokenInformation,
        GROUP_SECURITY_INFORMATION,
        INHERIT_ONLY_ACE,
        LookupPrivilegeValue,
        OpenProcessToken,
        OWNER_SECURITY_INFORMATION,
        SACL_SECURITY_INFORMATION,
        SDDL_REVISION_1,
        SE_BACKUP_NAME,
        SE_DACL_PROTECTED,
        SE_FILE_OBJECT,
        SE_PRIVILEGE_ENABLED,
        SE_RESTORE_NAME,
        SE_SECURITY_NAME,
        TOKEN_ADJUST_PRIVILEGES,
        TokenPrivileges,
        TOKEN_QUERY,
    )
    import pywintypes
except ImportError:
    GROUP_SECURITY_INFORMATION = 0
    OWNER_SECURITY_INFORMATION = 0
    DACL_SECURITY_INFORMATION = 0

    pywintypes = None


class ACL:
    flags = (GROUP_SECURITY_INFORMATION | OWNER_SECURITY_INFORMATION
             | DACL_SECURITY_INFORMATION)

    def __init__(self, index=()):
        self.__acl = b""
        self.index = index

    def get_indexpath(self):
        return self.index and b'/'.join(self.index) or b'.'

    def load_from_rp(self, rp, skip_inherit_only=True):
        self.index = rp.index

        # Sometimes, we are asked to load from an rpath when ACL's
        # are not supported. Ignore the request in this case.
        if not pywintypes:
            return

        try:
            sd = rp.conn.win32security.GetNamedSecurityInfo(os.fsdecode(rp.path),
                                                            SE_FILE_OBJECT, ACL.flags)
        except (OSError, IOError, pywintypes.error) as exc:
            log.Log(
                "Warning: unable to read ACL from %s: %s" % (repr(rp.path),
                                                             exc), 4)
            return

        if skip_inherit_only:
            # skip the inherit_only aces
            acl = sd.GetSecurityDescriptorDacl()
            if acl:
                n = acl.GetAceCount()
                # traverse the ACL in reverse, so the indices stay correct
                while n:
                    n -= 1
                    ace_flags = acl.GetAce(n)[0][1]
                    if ace_flags & INHERIT_ONLY_ACE:
                        acl.DeleteAce(n)
            sd.SetSecurityDescriptorDacl(1, acl, 0)

            if ACL.flags & SACL_SECURITY_INFORMATION:
                acl = sd.GetSecurityDescriptorSacl()
                if acl:
                    n = acl.GetAceCount()
                    # traverse the ACL in reverse, so the indices stay correct
                    while n:
                        n -= 1
                        ace_flags = acl.GetAce(n)[0][1]
                        if ace_flags & INHERIT_ONLY_ACE:
                            acl.DeleteAce(n)
                    sd.SetSecurityDescriptorSacl(1, acl, 0)

        if not sd.GetSecurityDescriptorDacl():
            sd.SetSecurityDescriptorDacl(0, None, 0)
        if (ACL.flags & SACL_SECURITY_INFORMATION) and not sd.GetSecurityDescriptorSacl():
            sd.SetSecurityDescriptorSacl(0, None, 0)

        try:
            self.__acl = \
                rp.conn.win32security.ConvertSecurityDescriptorToStringSecurityDescriptor(sd,
                                                                                          SDDL_REVISION_1,
                                                                                          ACL.flags)
        except (OSError, IOError, pywintypes.error) as exc:
            log.Log(
                "Warning: unable to convert ACL from %s to string: %s" % (repr(
                    rp.path), exc), 4)
            self.__acl = ''
        self.__acl = os.fsencode(self.__acl)

    def clear_rp(self, rp):
        # not sure how to interpret this
        # I'll just clear all acl-s from rp.path
        try:
            sd = rp.conn.win32security. \
                GetNamedSecurityInfo(os.fsdecode(rp.path), SE_FILE_OBJECT, ACL.flags)
        except (OSError, IOError, pywintypes.error) as exc:
            log.Log(
                "Warning: unable to read ACL from %s for clearing: %s" % (repr(
                    rp.path), exc), 4)
            return

        acl = sd.GetSecurityDescriptorDacl()
        if acl:
            n = acl.GetAceCount()
            # traverse the ACL in reverse, so the indices stay correct
            while n:
                n -= 1
                acl.DeleteAce(n)
            sd.SetSecurityDescriptorDacl(0, acl, 0)

        if ACL.flags & SACL_SECURITY_INFORMATION:
            acl = sd.GetSecurityDescriptorSacl()
            if acl:
                n = acl.GetAceCount()
                # traverse the ACL in reverse, so the indices stay correct
                while n:
                    n -= 1
                    acl.DeleteAce(n)
                sd.SetSecurityDescriptorSacl(0, acl, 0)

        try:
            rp.conn.win32security. \
                SetNamedSecurityInfo(os.fsdecode(rp.path),
                                     SE_FILE_OBJECT,
                                     ACL.flags,
                                     sd.GetSecurityDescriptorOwner(),
                                     sd.GetSecurityDescriptorGroup(),
                                     sd.GetSecurityDescriptorDacl(),
                                     (ACL.flags & SACL_SECURITY_INFORMATION)
                                     and sd.GetSecurityDescriptorSacl() or None)
        except (OSError, IOError, pywintypes.error) as exc:
            log.Log(
                "Warning: unable to set ACL on %s after clearing: %s" % (repr(
                    rp.path), exc), 4)

    def write_to_rp(self, rp):
        if not self.__acl:
            return

        try:
            sd = rp.conn.win32security. \
                ConvertStringSecurityDescriptorToSecurityDescriptor(
                    os.fsdecode(self.__acl),
                    SDDL_REVISION_1)
        except (OSError, IOError, pywintypes.error) as exc:
            log.Log(
                "Warning: unable to convert string %s to ACL: %s" % (repr(
                    self.__acl), exc), 4)

        # Enable next block of code for dirs after we have a mechanism in
        # backup.py (and similar) to do a first pass to see if a directory
        # has SE_DACL_PROTECTED. In that case, we will need to
        #       1) dest_rorp.write_win_acl(source_rorp.get_win_acl())
        #               --> And clear existing dest_rorp one while doing so
        #       2) Check if backup user has Admin privs to write dest_rorp
        #               --> May need to use Win32 AccessCheck() API
        #       3) If not, add Admin write privs to dest_rorp and add dir
        #               to dir_perms_list-equivalent
        #       4) THEN, allow the pre_process() function to finish and the
        #               files be copied over. Those files which wish to
        #               will now inherit the correct ACE objects.
        #       5) If dir was on dir_perms_list-equivalent, drop the write
        #               write permission we added.
        #       6) When copy_attribs is called in end_process, make sure
        #               that the write_win_acl() call isn't made this time
        # The reason we will need to do this is because otherwise, the files
        # which are created during step 4 will reference the ACE entries
        # which we clear during step 6. We need to clear them *before* the
        # children files/subdirs are created and generate the appropriate
        # DACL so the inheritance magic can happen during step 4.

        (flags, revision) = sd.GetSecurityDescriptorControl()
        if (not rp.isdir() and flags & SE_DACL_PROTECTED):
            self.clear_rp(rp)

        try:
            rp.conn.win32security. \
                SetNamedSecurityInfo(os.fsdecode(rp.path),
                                     SE_FILE_OBJECT, ACL.flags,
                                     sd.GetSecurityDescriptorOwner(),
                                     sd.GetSecurityDescriptorGroup(),
                                     sd.GetSecurityDescriptorDacl(),
                                     (ACL.flags & SACL_SECURITY_INFORMATION)
                                     and sd.GetSecurityDescriptorSacl() or None)
        except (OSError, IOError, pywintypes.error) as exc:
            log.Log(
                "Warning: unable to set ACL on %s: %s" % (repr(rp.path), exc),
                4)

    def __bytes__(self):
        return b'# file: %b\n%b\n' % \
            (C.acl_quote(self.get_indexpath()), self.__acl)

    def __str__(self):
        return os.fsdecode(self.__bytes__())

    def from_string(self, acl_str):

        def _safe_str(cmd):
            """Transform bytes into string without risk of conversion error"""
            if isinstance(cmd, str):
                return cmd
            else:
                return str(cmd, errors='replace')

        lines = acl_str.splitlines()
        if len(lines) != 2 or not lines[0][:8] == b"# file: ":
            raise metadata.ParsingError(
                "Bad record beginning: %s" % _safe_str(lines[0][:8]))
        filename = lines[0][8:]
        if filename == b'.':
            self.index = ()
        else:
            self.index = tuple(C.acl_unquote(filename).split(b'/'))
        self.__acl = lines[1]


def Record2WACL(record):
    acl = ACL()
    acl.from_string(record)
    return acl


def WACL2Record(wacl):
    return bytes(wacl)


class WACLExtractor(metadata.FlatExtractor):
    """Iterate ExtendedAttributes objects from the WACL information file"""
    record_boundary_regexp = re.compile(b'(?:\\n|^)(# file: (.*?))\\n')
    record_to_object = staticmethod(Record2WACL)

    def filename_to_index(self, filename):
        """Convert possibly quoted filename to index tuple"""
        if filename == b'.':
            return ()
        else:
            return tuple(C.acl_unquote(filename).split(b'/'))


class WinAccessControlListFile(metadata.FlatFile):
    """Store/retrieve ACLs from extended_attributes file"""
    _prefix = b"win_access_control_lists"
    _extractor = WACLExtractor
    _object_to_record = staticmethod(WACL2Record)


def join_wacl_iter(rorp_iter, wacl_iter):
    """Update a rorp iter by adding the information from acl_iter"""
    for rorp, wacl in rorpiter.CollateIterators(rorp_iter, wacl_iter):
        assert rorp, "Missing rorp for index %s" % (wacl.index, )
        if not wacl:
            wacl = ACL(rorp.index)
        rorp.set_win_acl(bytes(wacl))
        yield rorp


def rpath_acl_win_get(rpath):
    acl = ACL()
    acl.load_from_rp(rpath)
    return bytes(acl)


rpath.win_acl_get = rpath_acl_win_get


def rpath_get_blank_win_acl(index):
    acl = ACL(index)
    return bytes(acl)


rpath.get_blank_win_acl = rpath_get_blank_win_acl


def rpath_set_win_acl(rp, acl_str):
    acl = ACL()
    acl.from_string(acl_str)
    acl.write_to_rp(rp)


rpath.write_win_acl = rpath_set_win_acl


def init_acls():
    # A process that tries to read or write a SACL needs
    # to have and enable the SE_SECURITY_NAME privilege.
    # And inorder to backup/restore, the SE_BACKUP_NAME and
    # SE_RESTORE_NAME privileges are needed.
    import win32api
    try:
        hnd = OpenProcessToken(win32api.GetCurrentProcess(),
                               TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY)
    except win32api.error as exc:
        log.Log("Warning: unable to open Windows process token: %s" % exc, 5)
        return
    try:
        try:
            def lpv(priv):
                return LookupPrivilegeValue(None, priv)

            # enable the SE_*_NAME privileges
            SecurityName = lpv(SE_SECURITY_NAME)
            AdjustTokenPrivileges(
                hnd, False, [(SecurityName, SE_PRIVILEGE_ENABLED),
                             (lpv(SE_BACKUP_NAME), SE_PRIVILEGE_ENABLED),
                             (lpv(SE_RESTORE_NAME), SE_PRIVILEGE_ENABLED)])
        except win32api.error as exc:
            log.Log("Warning: unable to enable SE_*_NAME privileges: %s" % exc,
                    5)
            return
        for name, enabled in GetTokenInformation(hnd, TokenPrivileges):
            if name == SecurityName and enabled:
                # now we *may* access the SACL (sigh)
                ACL.flags |= SACL_SECURITY_INFORMATION
                break
    finally:
        win32api.CloseHandle(hnd)