X7ROOT File Manager
Current Path:
/opt/cloudlinux/venv/lib/python3.11/site-packages/clselect
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clselect
/
??
..
??
__init__.py
(536 B)
??
__pycache__
??
baseclselect
??
clextselect.py
(19.71 KB)
??
clpassenger.py
(26.53 KB)
??
clselect.py
(21.77 KB)
??
clselectctl.py
(9.15 KB)
??
clselectctlnodejsuser.py
(20.66 KB)
??
clselectctlphp.py
(43.82 KB)
??
clselectctlpython.py
(44.75 KB)
??
clselectctlruby.py
(18.59 KB)
??
clselectexcept.py
(10.22 KB)
??
clselectnodejs
??
clselectnodejsuser
??
clselectphp
??
clselectphpuser
??
clselectprint.py
(5.39 KB)
??
clselectpython
??
clselectpythonuser
??
clselectruby
??
clselectstatistics.py
(6.51 KB)
??
cluserextselect.py
(15.19 KB)
??
cluseroptselect.py
(23.65 KB)
??
cluserselect.py
(28.22 KB)
??
locked_extensions.ini
(1.2 KB)
??
utils.py
(16.22 KB)
Editing: utils.py
# coding:utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import absolute_import from __future__ import division import errno import os import pwd import re import subprocess import sys import json import shutil from time import time from typing import AnyStr, List, Dict, Optional # NOQA from pathlib import Path from urllib.parse import urlparse, urlunparse from clcommon.clpwd import resolve_username_and_doc_root from clcommon.cpapi import CP_NAME from clcommon.cpapi.cpapiexceptions import NoDomain, NotSupported, IncorrectData from clcommon.utils import mod_makedirs from clsentry import init_sentry_client from clsentry.utils import get_pkg_version from .clselectprint import clprint from .clselectexcept import ClSelectExcept CAGEFS_ENTER_USER_BIN = "/sbin/cagefs_enter_user" LVEMANDSN = "https://9713d1296f804031b058b8f2d789d7ac:8ddacae32d8246cf8b25cf826bf3fc0a@cl.sentry.cloudlinux.com/12" def run_command(cmd, env_data=None): """ Runs external process and returns output :param cmd: command and arguments as a list :param env_data :return string """ try: output = subprocess.Popen( cmd, stdin=open("/dev/null"), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True, env=env_data, text=True, ) std_out, std_err = output.communicate() except (OSError, IOError) as e: raise ClSelectExcept.FileProcessError(cmd[0], str(e)) if output.returncode != 0: raise ClSelectExcept.ExternalProgramFailed( std_err or "output of the command: %s\n%s" % (" ".join(cmd), std_out) ) return std_out def run_command_full(cmd, env_data=None, preexec_fn=None, cwd=None): """ Runs external process and returns output. Differs from subprocess.check_output, run_command above, and check_output below in that it does not throw an exception if process's return code != 0 :param cmd: command and arguments as a list :param env_data :param preexec_fn: Pre-exec function. None if don't need :param cwd: Directory name to set as current :return Cortege: (ret_code, stdout, stderr) """ try: process = subprocess.Popen( cmd, stdin=open("/dev/null"), stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=preexec_fn, close_fds=True, cwd=cwd, env=env_data, ) std_out, std_err = process.communicate() except (OSError, IOError) as e: raise ClSelectExcept.FileProcessError(cmd[0], str(e)) return process.returncode, std_out, std_err def read_file_as_string(filename): """ Reads file contents and returns it as a string """ defaults_contents = "" try: f = open(filename) defaults_contents = f.read() f.close() except (OSError, IOError): pass return defaults_contents def check_call(*args, **kwargs): check_output(*args, **kwargs) def check_output(*args, **kwargs): """ check_output(val1, val2, val3, arg4=val4) equivalent check_output(args=(val1, val2, val3), arg4=val4) or equivalent check_output(val1, args=(val2, val3), arg4=val4) DON'T USE check_output((val1, val2, val3), arg4=val4) :param tuple *args: arguments for command line :param dict **kwargs: parameters for subprocess.Popen :return str: """ error = kwargs.pop("error", None) args_from_kwargs = kwargs.get("args") output = kwargs.pop("output", None) wait = kwargs.pop("wait", None) if args_from_kwargs: args += args_from_kwargs del kwargs["args"] try: p = subprocess.Popen(args=args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs) except (OSError, IOError) as e: raise ClSelectExcept.FileProcessError(args[0], str(e)) stdout, stderr = p.communicate() if output: output.write(stdout) output.close() if wait is not None: p.wait() if p.returncode: raise ClSelectExcept.ExternalProgramFailed(error or stderr or stdout) return stdout def list_dirs(path): return [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))] def versioning(path, versions): yield path for major, minors in versions: yield path + str(major) for minor in minors: yield path + ".".join(map(str, (major, minor))) def mkdir_p(path): try: os.makedirs(path) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise def get_abs_rel(user, directory): """ Returns absolute and relative to user's home directories paths, after eliminating all symlinks :param user: Unix user whose home contains the directory :param directory: The directory to obtain absolute and relative paths for """ user_dir = os.path.realpath(pwd.getpwnam(user).pw_dir) abs_dir = os.path.realpath(os.path.join(user_dir, directory)) rel_dir = s_partition(abs_dir, user_dir)[2].lstrip(os.sep) if not all((abs_dir, rel_dir)): raise ClSelectExcept.WrongData("Directory '%s' not in user home ('%s')" % (directory, user_dir)) return abs_dir, rel_dir # TODO: need move to clcommon # safely open/close files in python2.4 # http://stackoverflow.com/questions/3770348/how-to-safely-open-close-files-in-python-2-4 # TODO: consider to remove this method and use builtin def file_read(path, mode="r", errors=None): """ read file and close :param path: file path :param mode: reading mode :return str: """ data = None stream = open(path, mode=mode, errors=errors) try: data = stream.read() finally: stream.close() return data def file_readlines(path, errors=None): """ read litle file and close :param path: :return str: """ data = [] stream = open(path, errors=errors) try: data = stream.readlines() finally: stream.close() return data def file_write(path, line, mode="w", errors=None): """ write litle data to file and close :param path: :return str: """ stream = open(path, mode, errors=errors) try: stream.write(line) finally: stream.close() def file_writelines(path, lines, mode="w", errors=None): """ write litle data to file and close :param path: :return str: """ stream = open(path, mode, errors=errors) try: stream.writelines(lines) finally: stream.close() def s_partition(s, sep): """ str.partition for all python versions :param s: string to parse :param sep: separator to split by :return: cortege - see str.partition function for python 2.5+ """ if sys.version_info[0] >= 2 and sys.version_info[1] >= 5: return s.partition(sep) # Python v2.4 and lower does not support str.partition, so emulating it s_parts = s.split(sep, 1) if len(s_parts) == 2: ret_cortege = (s_parts[0], sep, s_parts[1]) else: ret_cortege = (s_parts[0], "", "") return ret_cortege def grep(pattern, path): """ find regex count in file """ try: f = open(path, "r") except IOError: return None counter = 0 regex = re.compile(pattern) for line in f: if regex.search(line): counter += 1 f.close() return counter def user_should_be_skipped(username): """ Determines is user should be skipped in selectorctl work :param username: user name to check :return: True - user should be skipped, False - not """ from clcommon.cpapi import is_admin # user is DA admin, pass it return is_admin(username) def make_dir(dest_dir): if not os.path.isdir(dest_dir): try: mod_makedirs(dest_dir, 0o755) except (IOError, OSError) as e: print("Error: failed to create", dest_dir, ":", str(e)) sys.exit(1) def remove_file_or_dir(path): try: os.unlink(path) except OSError: pass if os.path.isdir(path): shutil.rmtree(path, True) def make_symlink(src, dst): try: if os.path.islink(dst): if os.readlink(dst) != src: os.unlink(dst) os.symlink(src, dst) else: remove_file_or_dir(dst) os.symlink(src, dst) except OSError as e: print("Error: failed to create symlink", dst, "->", src, ":", str(e)) sys.exit(1) def pretty_json(data): return json.dumps(data, indent=4, separators=(",", ": "), sort_keys=True) # TODO: we have the same safely_resolve_username_and_doc_root in selectorlib, need some refactoring def safely_resolve_username_and_doc_root(user=None, domain=None, msg_format="text", pass_not_supported_panel=False): """ Safely resolve username and doc_root by domain, or resolve document root by username, or resolve document root and username by effective uid :param pass_not_supported_panel: pass not supported panel for php selector :param msg_format: error messages' format (json or text) :param user: str -> name of unix user :param domain: str -> domain of panel user :return: tuple -> user, doc_root """ result_user = user result_doc_root = None try: result_user, result_doc_root = resolve_username_and_doc_root( user=user, domain=domain, ) except NoDomain: clprint.print_diag( msg_format, {"status": "ERROR", "message": "No domain found for user: {}, domain: {}".format(user, domain)} ) sys.exit(1) except NotSupported: if pass_not_supported_panel: pass else: clprint.print_diag( msg_format, {"status": "ERROR", "message": "NodeJs/Ruby/Python selector not supported for %s panel" % CP_NAME}, ) sys.exit(1) except IncorrectData: clprint.print_diag( msg_format, {"status": "ERROR", "message": "Domain %s is not owned by the user %s" % (domain, user)} ) sys.exit(1) return result_user, result_doc_root def get_using_realpath_keys(user, directory, config): """ Transform all directories to realpath, compare and return config value :param user: str -> unix user - owner of the config :param directory: str -> wanted directory key :param config: dict -> config to get value from, should have directories as keys """ abs_directory = get_abs_rel(user, directory)[0] for conf_dir in config: abs_conf_dir = get_abs_rel(user, conf_dir)[0] if abs_directory == abs_conf_dir: return config[conf_dir] raise KeyError("Config does not contain directory: {}".format(directory)) def delete_using_realpath_keys(user, directory, config): """ Transform all directories to realpath, compare and remove directory key :param user: str -> unix user - owner of the config :param directory: str -> wanted directory key :param config: dict -> config to remove key from, should have directories as keys """ abs_directory = get_abs_rel(user, directory)[0] removed_at_least_one = False config_keys = list(config.keys()) for conf_dir in config_keys: abs_conf_dir = get_abs_rel(user, conf_dir)[0] if abs_directory == abs_conf_dir: del config[conf_dir] removed_at_least_one = True if not removed_at_least_one: raise KeyError("Config does not contain directory: {}".format(directory)) def realpaths_are_equal(user, path1, path2): """ Checks that two paths in user home directory are the same after stripping symlinks :param user: Unix user whose home directory contains both paths :type path1: str :type path2: str :return: True if paths are canonically the same otherwise False :rtype: bool """ return get_abs_rel(user, path1)[0] == get_abs_rel(user, path2)[0] def run_process_in_cagefs(user, path_to_utility, args_for_utility, env_vars=None): # type: (AnyStr, AnyStr, List[AnyStr], Optional[Dict[AnyStr, AnyStr]]) -> Dict[AnyStr, AnyStr, int] """ Run any process using the utility cagefs_enter_user """ result = {} # there are cases when we need to store some env vars # (e.g HOME when calling pip during import) if not env_vars: env_vars = {} cmd = [CAGEFS_ENTER_USER_BIN, user, path_to_utility] + args_for_utility process = subprocess.Popen(cmd, env=env_vars, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, _ = process.communicate() retcode = process.returncode # contition `retcode > 128` because in lve-wrappers: # if (WIFSIGNALED(status)) { # exit(WTERMSIG(status) + 128); # } if not stdout and retcode > 128: stdout = { "details": "User's process \"{}\" failed and didn't return anything. It could mean that user " "has too small limit of PMEM for running of cloudlinux-selector. Please, " "increase the limit to user and try run process again".format(path_to_utility), "result": "error", "timestamp": time(), } else: stdout = stdout.strip() result["output"] = stdout result["returncode"] = retcode result["failed"] = True if retcode else False return result def is_imunify_using_python(version: str) -> bool: """ Check if python version is used by imunify packages The algorithm is as follows: - Verify if imunify services are installed - Check if shebang contains the path to the specified Python version being checked - If shebang contains the path to the imunify virtual environment, determine the Python version it is based on by reading a symlink """ # Init sentry for this function sentry_client = init_sentry_client("lvemanager", release=get_pkg_version("lvemanager"), dsn=LVEMANDSN, handle=False) alt_path = f"/opt/alt/python{version.replace('.', '')}" imunify_venv_path = "/opt/imunify360/venv" imunify_service = Path("/usr/bin/imunify-service") if not imunify_service.is_file(): return False try: with open(imunify_service) as f: first_line = f.readline() if not first_line.startswith("#!"): raise Exception(f'ERROR: "{imunify_service}" doesn\'t have a shebang') if alt_path in first_line: return True if imunify_venv_path in first_line: venv_python_bin = Path(first_line.lstrip("#!").strip()) python_bin = venv_python_bin.readlink() if alt_path in str(python_bin): return True except: sentry_client.captureException() return False def demote(user: str): """ Drops to user """ user_pwd = pwd.getpwnam(user) def func(): os.setgid(user_pwd.pw_gid) os.setuid(user_pwd.pw_uid) os.environ["USER"] = user os.environ["HOME"] = user_pwd.pw_dir return func def apply_for_at_least_one_user(func, users, exception_type, *args, **kwargs): caught_exception = None is_handled = False for user_alias in users: try: result = func(user_alias, *args, **kwargs) except exception_type as e: caught_exception = e else: is_handled |= True if not is_handled: raise caught_exception return result def transform_wildcard_url(url): """ Transforms a URL containing a wildcard ('*') in the netloc. If the netloc of the URL contains a '*', it replaces the '*' with 'any'. This is necessary because URLs with wildcards are not valid for HTTP requests. Replacing '*' with 'any' ensures compatibility with systems or libraries that do not support wildcards in URLs. :param url: str - The URL to transform. :return: str - The transformed URL with the wildcard replaced. """ parsed = urlparse(url) if "*" in parsed.netloc: netloc = parsed.netloc.replace("*", "any") parsed = parsed._replace(netloc=netloc) return urlunparse(parsed)
Upload File
Create Folder