X7ROOT File Manager
Current Path:
/opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clcommon
/
??
..
??
__init__.py
(1.37 KB)
??
__pycache__
??
clcagefs.py
(10.99 KB)
??
clcaptain.py
(1.96 KB)
??
clconfig.py
(1.68 KB)
??
clconfpars.py
(12.09 KB)
??
clcustomscript.py
(1.16 KB)
??
cldebug.py
(905 B)
??
clemail.py
(1.65 KB)
??
clexception.py
(1.14 KB)
??
clfunc.py
(6.47 KB)
??
clhook.py
(3.86 KB)
??
cllog.py
(1.45 KB)
??
cloutput.py
(471 B)
??
clproc.py
(4.05 KB)
??
clpwd.py
(7.74 KB)
??
clquota.py
(1.27 KB)
??
clsec.py
(657 B)
??
clwpos_lib.py
(16.6 KB)
??
const.py
(277 B)
??
cpapi
??
evr_utils.py
(3.55 KB)
??
features.py
(5.01 KB)
??
group_info_reader.py
(5.29 KB)
??
lib
??
lock.py
(1.02 KB)
??
mail_helper.py
(4.45 KB)
??
mysql_lib.py
(5.84 KB)
??
php_conf_reader.py
(9.77 KB)
??
public_hooks
??
sysctl.py
(7.61 KB)
??
ui_config.py
(3.12 KB)
??
utils.py
(32.91 KB)
??
utils_cmd.py
(2.71 KB)
Editing: clcagefs.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # import logging import os import re import subprocess import sys CAGEFS_MP_FILENAME = "/etc/cagefs/cagefs.mp" CAGEFSCTL_TOOL = "/usr/sbin/cagefsctl" CAGEFS_SKELETON_PATH = "/usr/share/cagefs-skeleton" CAGEFS_DIR_PATH = "/var/.cagefs" logger = logging.getLogger(__name__) class CagefsMpConflict(Exception): def __init__(self, new_item, existing_item): self._msg = ( f"Conflict in adding '{new_item}' to {CAGEFS_MP_FILENAME} " f"because of pre-existing alternative specification: '{existing_item}'" ) def __str__(self): return self._msg class CagefsNotSupportedError(Exception): """Cagefs Not Supported Exception""" def __init__(self, message): Exception.__init__(self, message) class CagefsMpItem: PREFIX_LIST = "@!%*" _PREFIX_MOUNT_RW = "" _PREFIX_MOUNT_RO = "!" def __init__(self, arg): """Constructor :param arg: Is either path to add to cagefs.mp or a raw line is read from cagefs.mp :param prefix: The same as adding prefix '!' to arg before passing it to ctor""" if arg[:1] == "#": # is a comment? then init as dummy self._path_spec = None elif arg.strip() == "": # init as dummy for empty lines self._path_spec = None else: self._path_spec = arg def mode(self, mode): """Specify mode as in fluent constructor""" if self.prefix() == "@" and mode is not None: self._path_spec = f"{self._path_spec},{mode:03o}" return self def __str__(self): return self._path_spec @staticmethod def _add_slash(path): if path == "": return "/" if path[-1] != "/": return path + "/" return path def pre_exist_in(self, another): adopted = CagefsMpItem._adopt(another) # overkill: just to keep strictly to comparing NULL objects principle if self.is_dummy() or adopted.is_dummy(): return False this_path = CagefsMpItem._add_slash(self.path()) test_preexist_in_path = CagefsMpItem._add_slash(adopted.path()) return this_path.startswith(test_preexist_in_path) def is_compatible_by_prefix_with(self, existing): adopted = CagefsMpItem._adopt(existing) # overkill: just to keep strictly to comparing NULL objects principle if self.is_dummy() or adopted.is_dummy(): return False if self.prefix() == adopted.prefix(): return True prefix_compatibility_map = {CagefsMpItem._PREFIX_MOUNT_RW: [CagefsMpItem._PREFIX_MOUNT_RO]} null_options = [] return self.prefix() in prefix_compatibility_map.get(adopted.prefix(), null_options) def is_dummy(self): return self._path_spec is None @staticmethod def _adopt(x): if isinstance(x, CagefsMpItem): return x else: return CagefsMpItem(x) @staticmethod def _cut_off_mode(path_spec): """Cut off mode from path spec like @/var/run/screen,777 Only one comma per path spec is allowed ;-)""" return path_spec.split(",")[0] @staticmethod def _cut_off_prefix(path_spec): return path_spec.lstrip(CagefsMpItem.PREFIX_LIST) def path(self): return CagefsMpItem._cut_off_prefix(CagefsMpItem._cut_off_mode(self._path_spec)) def prefix(self): if self._path_spec != self.path(): return self._path_spec[0] else: return "" def is_cagefs_present(): return os.path.exists(CAGEFSCTL_TOOL) def _mk_mount_dir_setup_perm(path, mode=0o755, owner_id=None, group_id=None): # -1 means 'unchanged' if group_id is None: group_id = -1 if owner_id is None: owner_id = -1 if not os.path.isdir(path): os.mkdir(path) if mode is not None: os.chmod(path, mode) os.chown(path, owner_id, group_id) def _remount_cagefs(user=None, remount_in_background=False): if not is_cagefs_present(): return if user is None: command = [CAGEFSCTL_TOOL, "--wait-lock", "--remount-all"] else: command = [CAGEFSCTL_TOOL, "--remount", user] if remount_in_background: subprocess.Popen( # pylint: disable=consider-using-with command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) else: subprocess.run( command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, ) def setup_mount_dir_cagefs( path, added_by=None, mode=0o755, owner_id=None, group_id=None, prefix="", remount_cagefs=False, remount_in_background=False, ): """ Add mount point to /etc/cagefs/cagefs.mp :param path: Directory path to be added in cagefs.mp and mounted from within setup_mount_dir_cagefs(). If this directory does not exist, then it is created. :param added_by: package or component, mount dir relates to, or whatever will stay in cagefs.mp with "# added by..." comment :param mode: If is not None: Regardless of whether directory exists or not prior this call, it's permissions will be set to mode. :param owner_id: Regardless of whether directory exists or not prior this call, it's owner id will be set to. If None, the owner won't be changed. :param group_id: Regardless of whether directory exists or not prior this call, it's group id will be set to. If None, the group won't be changed. :param prefix: Mount point prefix. Default is mount as RW. Pass '!' to add read-only mount point. Refer CageFS section at http://docs.cloudlinux.com/ for more options. :param remount_cagefs: If True, cagefs skeleton will be automatically remounted to apply changes. :param remount_in_background: If True, cagefs remount will be done in separate background process, without waiting for completion :returns: None Propagates native EnvironmentError if no CageFS installed or something else goes wrong. Raises CagefsMpConflict if path is already specified in cagefs.mp, but in a way which is opposite to mount_as_readonly param. """ _mk_mount_dir_setup_perm(path, mode, owner_id, group_id) if not is_cagefs_present(): return # Create cagefs.mp if absent. It will be merged when cagefsctl --init. if not os.path.exists(CAGEFS_MP_FILENAME): subprocess.run( [CAGEFSCTL_TOOL, "--create-mp"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, ) add_new_line_to_cagefs_mp() # ^^ # Hereafter we will not care if there was # 'no newline at the end of file' with open(CAGEFS_MP_FILENAME, "r+", encoding="utf-8") as f: new_item = CagefsMpItem(prefix + path).mode(mode) trim_nl_iter = (file_line.rstrip() for file_line in f) pre_exist_option = [x for x in trim_nl_iter if new_item.pre_exist_in(x)] if not pre_exist_option: f.seek(0, 2) # 2: seek to the end of file if added_by is not None: # no newline is allowed added_by = added_by.strip() print("# next line is added by ", added_by, file=f) print(new_item, file=f) if remount_cagefs: _remount_cagefs(remount_in_background=remount_in_background) elif not new_item.is_compatible_by_prefix_with(pre_exist_option[-1]): raise CagefsMpConflict(new_item, pre_exist_option[-1]) def _get_cagefs_mp_lines(): with open(CAGEFS_MP_FILENAME, "r", encoding="utf-8") as f: return f.readlines() def _write_cagefs_mp_lines(lines): with open(CAGEFS_MP_FILENAME, "w", encoding="utf-8") as f: return f.writelines(lines) # next line is added by def add_new_line_to_cagefs_mp(): """ Add new line to the end of /etc/cagefs/cagefs.mp file if it is not there """ lines = _get_cagefs_mp_lines() # file is not empty and last line in the file does not end with new line symbol ? if lines and lines[0] != "" and lines[-1][-1] != "\n": lines[-1] += "\n" _write_cagefs_mp_lines(lines) def remove_mount_dir_cagefs(path, remount_cagefs=False, remount_in_background=False): """ Remove mount points matching given path from cagefs.mp file :param str path: Path that should be removed from file. :param bool remount_cagefs: Remount cagefs skeleton or not :param remount_in_background: If True, cagefs remount will be done in separate background process, without waiting for completion :return: Nothing """ lines = _get_cagefs_mp_lines() r = re.compile(rf"^[{CagefsMpItem.PREFIX_LIST}]?{re.escape(path)}(,\d+)?$") lines_with_excluded_path = [line for line in lines if not r.match(line)] # nothing to do if nothing found by pattern if len(lines) == len(lines_with_excluded_path): return _write_cagefs_mp_lines(lines_with_excluded_path) if remount_cagefs: _remount_cagefs(remount_in_background=remount_in_background) def _is_directory_mounted(path: str) -> bool: """Check the fact that directory is mounted i.e. listed in mountinfo""" with open("/proc/self/mountinfo", encoding="utf-8") as mountinfo_file: for line in mountinfo_file: mountinfo_entry_parts = line.split(maxsplit=5) # See the following link about /proc/<pid>/mountinfo format: # https://docs.kernel.org/filesystems/proc.html#proc-pid-mountinfo-information-about-mounts mount_point = mountinfo_entry_parts[4] if mount_point == path: return True return False def in_cagefs() -> bool: """Recommended way to check whether process is running inside CageFS Please avoid testing for /var/.cagefs existence. Replace with this function call when possible. """ in_cagefs_ = _is_directory_mounted(CAGEFS_DIR_PATH) if not in_cagefs_ and os.path.isdir(CAGEFS_DIR_PATH): logger.warning("%s should not exist in the real file system", CAGEFS_DIR_PATH) return in_cagefs_ def _is_cagefs_enabled(user): """ Check that cagefs enabled for user """ try: cagefs_lib_dir = "/usr/share/cagefs/" if cagefs_lib_dir not in sys.path: sys.path.append(cagefs_lib_dir) import cagefsctl # pylint: disable=import-outside-toplevel except ImportError: return False try: if not cagefsctl.is_user_enabled(user): return False except AttributeError as e: raise CagefsNotSupportedError("ERROR: CageFS version is unsupported. Please update CageFS.") from e return True
Upload File
Create Folder