X7ROOT File Manager
Current Path:
/opt/cloudlinux/venv/lib/python3.11/site-packages/clwizard/modules
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clwizard
/
modules
/
??
..
??
__init__.py
(5 KB)
??
__pycache__
??
base.py
(7.96 KB)
??
cagefs.py
(3.93 KB)
??
governor.py
(6.18 KB)
??
lsapi.py
(8.33 KB)
??
nodejs.py
(5.31 KB)
??
php.py
(7.7 KB)
??
python.py
(5.56 KB)
??
ruby.py
(3.99 KB)
Editing: governor.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import os from typing import Dict, List # NOQA from clcommon.const import Feature from clcommon.lib.mysql_governor_lib import MySQLGovernor from clcommon.utils import ExternalProgramFailed, run_command from clconfig.db_info_lib import MysqlInfo from cldetectlib import getCPName from clwizard.constants import MODULES_LOGS_DIR from clwizard.exceptions import InstallationFailedException, UserInterventionNeededError from .base import WizardInstaller class GovernorInstaller(WizardInstaller): LOG_FILE = os.path.join(MODULES_LOGS_DIR, 'governor.log') UNKNOWN = 'unknown' UTILITY_PATH = '/usr/share/lve/dbgovernor/mysqlgovernor.py' _REQUIRED_CL_COMPONENT_SUPPORT = Feature.GOVERNOR def __init__(self): super().__init__() # dict( # 'vendor': MySQL|MariaDB|Percona, # 'version': server version in the form of {major}.{minor} # or {major}.{minor}-{release} for percona, # 'cll-lve': patches from CL applied or not (True|False) # ) self.db_info = MysqlInfo().get() def _install_governor_package(self): if not self._is_package_installed('governor-mysql'): try: out = self._install_package('governor-mysql') except ExternalProgramFailed as err: self.app_logger.error("Package installation failed with error: %s", str(err)) raise InstallationFailedException() from err self.app_logger.info("Package was installed successfully: %s", out) else: self.app_logger.info("Skip the governor-mysql installation, it is already installed") def _prepare_db_options(self): if GovernorInstaller.UNKNOWN in (self.db_info["vendor"], self.db_info["version"]): return None try: vendor = self.db_info['vendor'].lower() # split(-)[0] drop release # split[.][0-2] use only major and minor versions version = ''.join(self.db_info['version'].split('-')[0].split('.')[0:2]) return vendor + version except IndexError: return None def _initialize_governor(self): """ Trying to install governor with --wizard key, it detects the DB automatically (on DA and cPanel), for other panels we will have blockers for the governor module. """ try: self.app_logger.info("Install MySQL Governor") # for governor there are errors that could be handled only manually by user # these errors have exit code 3 self._run_cmd_and_check_special_status( [GovernorInstaller.UTILITY_PATH, '--install', '--wizard'], exit_status=3 ) except ExternalProgramFailed as err: raise InstallationFailedException() from err def _run_cmd_and_check_special_status(self, cmd, exit_status): """ There are cases when some commands can't be executed without the user's intervention. Such scripts/commands return a special exit code that must be checked for. """ retcode, out, _ = run_command(cmd, return_full_output=True) if retcode == exit_status: self.app_logger.warning('Can`t install governor automatically') self.app_logger.warning('Reason: %s', out) raise UserInterventionNeededError() if retcode != 0: self.app_logger.error('Error occurred during running "%s"\nReason is: "%s"', cmd, out) raise ExternalProgramFailed(out) def run_installation(self, options): if self.db_info["vendor"] is None: raise InstallationFailedException("Please, install a MySQL server first.") self._install_governor_package() self._initialize_governor() def _get_warnings(self): # type: () -> List """ Get list of warnings that should be shown in wizard before module installation """ warnings = [ { "message": ( "Please create a full database backup (including system tables)." " MySQL/MariaDB/Percona server will be updated from CloudLinux repositories." ) } ] # type: List[Dict] if self.db_info["vendor"] is None: warnings.append( { "message": ( "Could not detect a MySQL server." " For a list of compatible options please see %(url)s." ), "context": { "url": "https://docs.cloudlinux.com/mysql_governor_installation.html" }, } ) return warnings def _get_blockers(self): # type: () -> List """ Get a list of possible blockers to disable Governor module in Wizard UI. """ blockers = [] if getCPName() not in ['cPanel', 'DirectAdmin']: blockers.append( { 'message': 'MySQL Governor can\'t be automatically installed.' ' Please install it through the command-line interface.' ' Learn more here: %(url)s', 'context': { 'url': 'https://docs.cloudlinux.com/installation-wizard.html', }, } ) return blockers def _is_already_configured(self): """ Check if the governor is ready for work. """ mysql_gov = MySQLGovernor() return mysql_gov.is_governor_present() and self.db_info['cll-lve'] def initial_status(self): result = { 'already_configured': self._is_already_configured(), 'db_version': self.db_info['version'], } warnings = self._get_warnings() if warnings: result.update({'warnings': warnings}) blockers = self._get_blockers() if blockers: result.update({'blockers': blockers}) return result
Upload File
Create Folder