X7ROOT File Manager
Current Path:
/opt/cloudlinux/venv/lib/python3.11/site-packages/clselect
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clselect
/
??
..
??
__init__.py
(536 B)
??
__pycache__
??
baseclselect
??
clextselect.py
(19.71 KB)
??
clpassenger.py
(26.53 KB)
??
clselect.py
(21.77 KB)
??
clselectctl.py
(9.15 KB)
??
clselectctlnodejsuser.py
(20.66 KB)
??
clselectctlphp.py
(43.82 KB)
??
clselectctlpython.py
(44.75 KB)
??
clselectctlruby.py
(18.59 KB)
??
clselectexcept.py
(10.22 KB)
??
clselectnodejs
??
clselectnodejsuser
??
clselectphp
??
clselectphpuser
??
clselectprint.py
(5.39 KB)
??
clselectpython
??
clselectpythonuser
??
clselectruby
??
clselectstatistics.py
(6.51 KB)
??
cluserextselect.py
(15.19 KB)
??
cluseroptselect.py
(23.65 KB)
??
cluserselect.py
(28.22 KB)
??
locked_extensions.ini
(1.2 KB)
??
utils.py
(16.22 KB)
Editing: cluserselect.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import absolute_import from __future__ import division import json import os import sys import uuid import signal import secureio from future.moves import configparser as ConfigParser from stat import S_IRUSR, S_IWUSR, S_IRGRP, S_IROTH from future.utils import iteritems from pathlib import Path import psutil from .clselect import ClSelect from .clselectexcept import ClSelectExcept from clcommon import ClPwd, clcaptain from .clselectprint import clprint from . import utils from clcommon import clcagefs from clcommon.utils import ExternalProgramFailed try: from clcagefslib.const import BASEDIR from clcagefslib.fs import get_user_prefix from clcagefslib.selector.configure import is_ea4_enabled, read_cpanel_ea4_php_conf, configure_alt_php from clcagefslib.selector.panel.da import da_change_user_php_ini from clcagefslib.selector.panel.isp import ispmanager_create_user_wrapper except ImportError: pass class ClUserSelect(ClSelect): CAGEFS_PATH = "/var/cagefs" SELECTOR_PATH = "/usr/selector" NATIVE_PATH = SELECTOR_PATH if clcagefs.in_cagefs() else "/usr/share/cagefs-skeleton/usr/selector" CAGEFS_EXCLUDE = "/etc/cagefs/exclude" SELECTOR2_DIR = ".cl.selector/selector.path" def clean_crui_images(self, users=None): """ Creates flags mod_lsapi_reset_me in users' home directories in order to recreate CRIU images when php version/extensions/options have changed For details see LVEMAN-1210 :param users: list of usernames (strings) """ # There is not reliable way to check if CRIU is enabled inside CageFS # So let's always create the "mod_lsapi_reset_me" flag if not clcagefs.in_cagefs() and not os.path.isfile("/var/run/mod_lsapi/criu.enabled"): return for user in users: pw = self._clpwd.get_pw_by_name(user) path = os.path.join(pw.pw_dir, "mod_lsapi_reset_me") if not os.path.isfile(path): previous_user_data = self._change_uid(user) try: clcaptain.write(path) except (OSError, ExternalProgramFailed) as e: raise ClSelectExcept.UnableToSaveData(path, e) finally: ClUserSelect._restore_uid(previous_user_data) @staticmethod def switch_symlink_for_alt_php(version, pw, exit_on_error=True, configure_multiphp=True): """ Switch symlink for alt php. Create .cagefs directory if not created Rerurn True if error has occured """ if not os.path.isdir(BASEDIR) and not clcagefs.in_cagefs(): print("ERROR: CageFS not installed.") if exit_on_error: sys.exit(1) else: return True if configure_multiphp and is_ea4_enabled(): conf = read_cpanel_ea4_php_conf() if conf: try: # get default system php version selected via MultiPHP Manager in cPanel WHM default_php = conf["default"] # LVEMAN-1170: do not configure PHP Selector when system default version is alt-php if not default_php.startswith("ea-php"): print( "ERROR: system default PHP version is alt-php. " "PHP Selector is disabled. Use cPanel MultiPHP manager instead." ) if exit_on_error: sys.exit(1) else: return True except KeyError: pass # configure alt php - create .cagefs dir and create symlink error = configure_alt_php( pw, version, write_log=False, drop_perm=(os.geteuid() == 0), configure_multiphp=configure_multiphp ) if error and exit_on_error: sys.exit(1) return error def apply_symlinks_rules(self): if self.without_cagefs: print('ERROR: this option does not work in "single user" mode (when CageFS is disabled)') sys.exit(1) if os.geteuid() != 0: print("ERROR: root privileges required") sys.exit(1) users_vers_dict = self.get_user_version_map() for user, version in iteritems(users_vers_dict): print("Processing user", user) pw = self._clpwd.get_pw_by_name(user) ClUserSelect.switch_symlink_for_alt_php(version, pw, exit_on_error=False, configure_multiphp=False) def __init__(self, item="php", exclude_pid_list=None): ClSelect.__init__(self, item) self._clpwd = ClPwd() self._user_excludes = set() if exclude_pid_list: self.exclude_pid_list = exclude_pid_list else: self.exclude_pid_list = [] def get_version(self, user, show_native_version=False): """ Returns alternative version for a user @param user: string @return: string """ self._check_user_in_cagefs(user) alt_path = self._compose_user_alt_path(user) native = self._compose_native_info(show_native_version) if not os.path.isdir(alt_path): return native alternatives = self.get_all_alternatives_data() full_path = os.path.join(alt_path, self._item) if not os.path.islink(full_path): return native link_dst = os.readlink(full_path) if self.without_cagefs: if not self._native_contents: self._load_native_contents(self._item) if link_dst == self._native_contents[self._item]: return native if os.path.dirname(link_dst) == self.SELECTOR_PATH: return native try: version = list( filter((lambda i: alternatives[i]["data"][self._item] == link_dst), list(alternatives.keys())) )[0] return (version, alternatives[version]["version"], alternatives[version]["data"][self._item]) except (IndexError, KeyError): return native def create_dir(self, path, user): if not os.path.isdir(path): previous_user_data = self._change_uid(user) try: clcaptain.mkdir(path) except (OSError, ExternalProgramFailed) as e: raise ClSelectExcept.UnableToSaveData(path, e) finally: ClUserSelect._restore_uid(previous_user_data) def create_selector_symlinks(self, user): """ Creates additional directory and symlinks for use in "without CageFS" mode """ homedir = self._clpwd.get_homedir(user) path_in_home = os.path.join(homedir, self.SELECTOR2_DIR) cur_user = self._change_uid(user) self.create_dir(path_in_home, user) self._create_symlink("../php-cli", path_in_home + "/php", check_existence=True) self._create_symlink("../php", path_in_home + "/php-cgi", check_existence=True) self._restore_uid(cur_user) def get_default_version(self): if os.path.isfile(ClSelect.DEFAULTS_PATH): try: return self._dh.get("versions", self._item) except (ConfigParser.Error, IOError, KeyError): return "native" return "native" def set_version_from_backup(self, user): user_backup_path = os.path.join(self._clpwd.get_homedir(user), ".cl.selector", "defaults.cfg") if not os.path.isfile(user_backup_path): self.set_version(user, self.get_default_version()) else: try: dh = self._get_default_config_handler(user_backup_path) self.set_version(user, dh.get("versions", self._item)) except (ConfigParser.Error, IOError, KeyError) as e: print("Error while restoring settings from backup", str(e)) sys.exit(1) def _version_enabled(self, version): """" Checks if version is enabled, a version is considered "enabled" if there is no "state" option in the config. The presence of the "state" option always means it is disabled, regardless of its value. @param version: string @return: bool - True if version is enabled, False otherwise """ if not self._dh.has_option("%s%s" % (self._item, version), "state"): return True else: return False def set_version(self, user, version, return_summary=False, show_native_version=False, exit_on_error=True): """ Sets alternative version for a users with the same uid @param user: string @return: None """ if os.geteuid() != 0 and (message := self.get_version_selection_disabled_msg(user)): raise ClSelectExcept.VersionModificationBlocked(message) # This check is needed to prevent setting a disabled version as selected version for a user if not self._version_enabled(version): raise ClSelectExcept.VersionModificationBlocked('Version %s is disabled by administrator' % version) data = utils.apply_for_at_least_one_user( self._set_version, self._clpwd.get_names(self._clpwd.get_uid(user)), ClSelectExcept.NoUserSelector, version, return_summary, show_native_version, exit_on_error, ) if return_summary: return data def _set_version(self, user, version, return_summary=False, show_native_version=False, exit_on_error=True): """ Sets alternative version for a user @param user: string @return: None """ if self.without_cagefs: previous_user_data = self._change_uid(user) self._check_user_in_cagefs(user) alt_path = self._compose_user_alt_path(user) if not os.path.isdir(alt_path): if self.without_cagefs: self.create_dir(alt_path, user) else: raise ClSelectExcept.NoUserSelector(user) alternatives = self.get_all_alternatives_data() if version not in alternatives and version != "native": raise ClSelectExcept.NoSuchAlternativeVersion(version) self._remove_alternatives_links(alt_path) pw = self._clpwd.get_pw_by_name(user) if version == "native": if self.without_cagefs: if not self._native_contents: self._load_native_contents(self._item) for item, native_path in iteritems(self._native_contents): self._create_symlink(native_path, alt_path + "/" + item, user, version) else: ini = "php.ini" new_ini_created = False new_ini_path = os.path.join("%s.etc" % (self.NATIVE_PATH,), ini) if os.path.exists(new_ini_path): src = os.path.join("%s.etc" % self.SELECTOR_PATH, ini) dst = os.path.join(alt_path, ini) self._create_symlink(src, dst, user, version) new_ini_created = True for filename in os.listdir(self.NATIVE_PATH): if self._item not in filename: continue if filename.endswith(".ini") and new_ini_created: continue dst = os.path.join(alt_path, filename) src = os.path.join(self.SELECTOR_PATH, filename) self._create_symlink(src, dst, user, version) else: for item, path in iteritems(alternatives[version]["data"]): self._create_symlink(path, os.path.join(alt_path, item), user, version) if self.without_cagefs: ClUserSelect._restore_uid(previous_user_data) else: ClUserSelect.switch_symlink_for_alt_php(version, pw, exit_on_error=exit_on_error) self._switch_php_da_isp(user, version) self._reload_processes(user) self._backup_settings(user) if return_summary: return self.get_summary(user, show_native_version) def get_version_selection_disabled_msg(self, username: str) -> str: """ Returns a message indicating that the selection of the PHP version is disabled for the user, based on the configuration file. Args: username (str): The username for which to check the configuration. Returns: str: The message indicating that version selection is disabled, or an empty string if the configuration file does not exist or does not contain the message. """ uid = self._clpwd.get_uid(username) config_file = Path(f"/var/cloudlinux/cl.selector/uids/{uid}/version_selection_conf.json") if not config_file.exists(): return "" try: with config_file.open(encoding="utf-8") as f: config_data = json.load(f) return config_data.get("version_selection_disabled_msg", "") except (OSError, ValueError): return "" def get_summary(self, user, show_native_version=False): """ Returns a summary of available alternative versions for the given user. Only versions that are enabled, default, or selected are included. The label for the "native" version is replaced if show_native_version is True. @param user: str - Username inside CageFS. @param show_native_version: bool - Whether to replace the 'native' version label. @return: tuple of (str, (bool, bool, bool)) - Each entry contains the version name and a tuple with flags: (enabled, default, selected). """ self._check_user_in_cagefs(user) # Retrieve all alternatives data and "native" alternatives = self.get_all_alternatives_data() native_info = self._compose_native_info(show_native_version) summary = {"native": {"enabled": True, "default": False, "selected": False}} alt_versions = sorted(alternatives.keys()) alt_versions.append("native") selected_version = self.get_version(user)[0] # Populate summary with alternative versions for version in alt_versions: if version not in summary: summary[version] = {} summary[version]["enabled"] = self._version_enabled(version) summary[version]["default"] = False summary[version]["selected"] = False try: default_version = self._dh.get("versions", self._item) except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): default_version = "native" try: summary[default_version]["default"] = True summary[selected_version]["selected"] = True except KeyError: raise ClSelectExcept.NoSuchAlternativeVersion(default_version) summary[native_info[0]] = summary.pop("native") alt_versions.remove("native") alt_versions.append(native_info[0]) result = [] # Final list for summary output # Include only versions where at least one flag is True for idx, v in enumerate(alt_versions): if not any((summary[v]["enabled"], summary[v]["default"], summary[v]["selected"])): continue result.append((v, (summary[v]["enabled"], summary[v]["default"], summary[v]["selected"]))) return tuple(result) def change_to_version(self, new_version, current_version): """ Changes users of a supplied version to specified_version @param version: string @param current_version: string """ users = self.list_users(current_version) for user in users: try: self.set_version(user, new_version, exit_on_error=False) except Exception as e: # catch every errors, print it and go to the next user clprint.print_diag("text", {"status": "ERROR", "message": str(e)}) pass self.clean_crui_images(users) def list_users(self, version): """ Returns users of a certain alternative """ data = self.get_version_user_map() if version in data: return data[version] return [] def list_all_users(self): """ Returns all valid system users @return: list """ if self.without_cagefs: from .clselectctlphp import get_cpanel_user return [get_cpanel_user()] return list(self._get_system_users().difference(self._get_user_excludes())) def cagefs_copy_etc(self, user): config = dict() config["init"] = 0 config["reinit"] = 0 config["verbose"] = 0 LIBDIR = "/usr/share/cagefs" sys.path.append(LIBDIR) try: import cagefsctl except ImportError: print("ERROR: CageFS not installed.") sys.exit(1) cagefs_etc_path = os.path.join(BASEDIR, get_user_prefix(user), user, "etc") if not os.path.exists(cagefs_etc_path + "/cl.selector") or not os.path.exists(cagefs_etc_path + "/cl.php.d"): cagefsctl.cpetc_for_user(user, config) def get_user_version_map(self): """ Returns user version map as dict @return: dict """ actual_users = self.list_all_users() data = {} for user in actual_users: try: data[user] = self.get_version(user, False)[0] except ClSelectExcept.NotCageFSUser: continue return data def get_version_user_map(self, user_names=None): """ Returns users grouped by version @return: dict """ actual_users = user_names or self.list_all_users() data = {} for user in actual_users: try: version = self.get_version(user, False)[0] if not version in data: data[version] = [] data[version].append(user) except ClSelectExcept.NotCageFSUser: continue return data def _create_symlink(src, dst, user=None, version=None, check_existence=False): """ Creates symlink from src to dst @param src: string @param dst: string @param user: string @param version: string @param check_existence: bool @return: None """ try: if check_existence: if os.path.islink(dst): if os.readlink(dst) != src: os.unlink(dst) else: return else: utils.remove_file_or_dir(dst) clcaptain.symlink(src, dst) except Exception as e: if user is not None and version is not None: raise ClSelectExcept.UnableToSetAlternative(user, version, e) raise ClSelectExcept.SelectorException("Cannot create symlink from %s to %s (%s)" % (src, dst, e)) _create_symlink = staticmethod(_create_symlink) def _get_user_excludes(self): """ Returns list of user excludes @return: list """ if self._user_excludes: return self._user_excludes if not os.path.isdir(self.CAGEFS_EXCLUDE): return set() for item in os.listdir(self.CAGEFS_EXCLUDE): full_item_path = os.path.join(self.CAGEFS_EXCLUDE, item) self._user_excludes.update( set(map((lambda x: x.strip()), utils.read_file_as_string(full_item_path).splitlines())) ) return self._user_excludes def _check_user_in_cagefs(self, user): """ Check that cagefs enabled for user """ if self.without_cagefs: return if clcagefs.in_cagefs(): return LIBDIR = "/usr/share/cagefs" sys.path.append(LIBDIR) try: import cagefsctl except ImportError: print("ERROR: CageFS not installed.") sys.exit(1) try: if not cagefsctl.is_user_enabled(user): raise ClSelectExcept.NotCageFSUser(user) except AttributeError: print("ERROR: CageFS version is unsupported. Please update CageFS.") sys.exit(1) def _remove_alternatives_links(self, path): """ Removes all symlinks from directory @param path: string @return: None """ for filename in os.listdir(path): if self._item not in filename: continue full_path = os.path.join(path, filename) if not os.path.islink(full_path): continue os.unlink(full_path) def _compose_user_alt_path(self, user): """ Composes and returns user alternative directory path @param user: string @return: string """ if self.without_cagefs: homedir = self._clpwd.get_homedir(user) return homedir + "/.cl.selector" uid = str(self._clpwd.get_uid(user)) return ( "/etc/cl.selector" if clcagefs.in_cagefs() else os.path.join(self.CAGEFS_PATH, uid[-2:], user, "etc", "cl.selector") ) def _get_system_users(self): """ Returns set of system users @return: set """ users_dict = self._clpwd.get_user_dict() return set(users_dict.keys()) def _delete_if_symlink(file_path): """ Deletes file to be written if it is a symlink """ if os.path.islink(file_path): try: os.unlink(file_path) except OSError: raise ClSelectExcept.UnableToSaveData(file_path, "Cannot delete symlink while saving data") _delete_if_symlink = staticmethod(_delete_if_symlink) def _change_uid(self, user): """ Changes to another uid and returns tuple of previous euid and egid @param user: string @return: tuple """ entry = self._clpwd.get_pw_by_name(user) new_uid = entry.pw_uid new_gid = entry.pw_gid cur_euid = os.geteuid() cur_egid = os.getegid() if cur_euid == new_uid: return cur_euid, cur_egid try: os.setegid(new_gid) os.seteuid(new_uid) secureio.set_capability() return cur_euid, cur_egid except OSError as e: raise ClSelectExcept.UnableToChangeToAnotherUser(user, e) def _restore_uid(uid_and_gid): """ Restores changed uid and gid to original ones @param uid_and_gid: tuple @return: None """ if uid_and_gid[0] != os.geteuid(): secureio.set_capability(clear=True) try: os.setegid(uid_and_gid[1]) os.seteuid(uid_and_gid[0]) except OSError as e: raise ClSelectExcept.UnableToChangeToAnotherUser(str(uid_and_gid[0]), e) _restore_uid = staticmethod(_restore_uid) def _write_to_file(self, user, file_contents, file_path, create=True): """ Saves data to file @param user: string @param file_contents: string @param file_path: string @return: None """ if not create and not os.path.exists(file_path): return self._delete_if_symlink(file_path) previous_user_data = self._change_uid(user) file_directory = os.path.dirname(file_path) try: # Replace tempfile.mkstemp with str(uuid.uuid4()) dirname = "clseltmp_%s" % str(uuid.uuid4()) temp_path = os.path.join(file_directory, dirname) clcaptain.write(temp_path, "%s\n" % (file_contents,)) except (IOError, OSError, ExternalProgramFailed) as e: try: if os.path.exists(temp_path): os.unlink(temp_path) except: pass ClUserSelect._restore_uid(previous_user_data) raise ClSelectExcept.UnableToSaveData(file_path, e) else: try: mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH os.rename(temp_path, file_path) os.chmod(file_path, mask) except OSError: pass ClUserSelect._restore_uid(previous_user_data) def _reload_processes(self, user): """ Reloads user process """ try: next_parent = psutil.Process() for i in range(2): next_parent = next_parent.parent() if next_parent is not None: self.exclude_pid_list.append(next_parent.pid) else: break except psutil.NoSuchProcess: pass try: uid = ClPwd().get_uid(user) except (ClPwd.NoSuchUserException,): # no such user return try: for proc in psutil.process_iter(): try: if uid not in [proc.uids().real, proc.uids().effective] or proc.name().find(self._item) == -1: continue pid = proc.pid except psutil.NoSuchProcess: continue try: if pid not in self.exclude_pid_list: os.kill(pid, signal.SIGHUP) except (OSError,): continue except (OSError, IOError): # psutil reads /proc FS as usual FS, skip read errors pass def _skim_over_extensions(path): """ Get extension names from user extensions file comments """ extensions = [] try: ini = open(path) for line in ini: if line.startswith(";---"): ext = line[4 : line.rfind("---")] extensions.append(ext) ini.close() except (OSError, IOError): pass return extensions _skim_over_extensions = staticmethod(_skim_over_extensions) def _backup_settings(self, user): """ Scans all user settings and backups'em in homedir as INI file @param user: string """ self._check_user_in_cagefs(user) backup_contents = [] user_alt_path = self._compose_user_alt_path(user) user_ext_path = os.path.join(os.path.dirname(user_alt_path), "cl.php.d") alternatives = self.get_all_alternatives_data() user_backup_path = os.path.join(self._clpwd.get_homedir(user), ".cl.selector") if not os.path.isdir(user_backup_path): previous_user_data = self._change_uid(user) try: clcaptain.mkdir(user_backup_path) except (OSError, ExternalProgramFailed) as e: ClUserSelect._restore_uid(previous_user_data) raise ClSelectExcept.UnableToSaveData(user_backup_path, e) ClUserSelect._restore_uid(previous_user_data) user_backup_file = os.path.join(user_backup_path, "defaults.cfg") if os.path.isdir(user_alt_path): version = "[versions]\n%s = %s\n" % (self._item, self.get_version(user)[0]) else: version = "[versions]\n%s = native\n" % (self._item,) backup_contents.append(version) for alt in sorted(alternatives.keys()): if self.without_cagefs: curr_ext_path = user_alt_path + "/alt_php" + alt.replace(".", "") + ".ini" else: curr_ext_path = os.path.join(user_ext_path, "alt-php%s" % ((alt.replace(".", ""),)), "alt_php.ini") extensions = self._skim_over_extensions(curr_ext_path) backup_contents.append("[%s%s]\nmodules = %s\n" % (self._item, alt, ",".join(sorted(extensions)))) self._write_to_file(user, "\n".join(backup_contents), user_backup_file) def _switch_php_da_isp(self, user, version): if self.without_cagefs: return da_change_user_php_ini(user, version) ispmanager_create_user_wrapper(user, version)
Upload File
Create Folder