X7ROOT File Manager
Current Path:
/opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/lib/commons
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
lvestats
/
lib
/
commons
/
??
..
??
__init__.py
(219 B)
??
__pycache__
??
argparse_utils.py
(11.25 KB)
??
dateutil.py
(5.6 KB)
??
decorators.py
(893 B)
??
func.py
(15.89 KB)
??
htpasswd.py
(2.25 KB)
??
litespeed.py
(6.67 KB)
??
logsetup.py
(4.5 KB)
??
proctitle.py
(2.9 KB)
??
profiler.py
(575 B)
??
progress.py
(1016 B)
??
sentry.py
(6.17 KB)
??
server_status.py
(1.31 KB)
??
sizeutil.py
(2.59 KB)
??
users_manager.py
(2.97 KB)
Editing: litespeed.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import base64 import urllib.request import urllib.error import urllib.parse import ssl import os from lxml import etree from lvestats.lib.commons.func import get_all_user_domains, normalize_domain class LiteSpeedException(Exception): pass class LiteSpeedDisabledException(LiteSpeedException): pass class LiteSpeedInvalidCredentials(LiteSpeedException): pass class LiteSpeedDataMapping(object): TIME = 3 HOST = 8 REQUEST = 14 TOTAL_LEN = 15 class LiteSpeed(object): IGNORE_HOSTS = [b'_AdminVHost'] PID_FILE_PATH = '/tmp/lshttpd/lshttpd.pid' HTPASSWD_PATH = '/usr/local/lsws/admin/htpasswds/status' HTTP_TIMEOUT = 2 LS_ADMIN_CONFIG = "/usr/local/lsws/admin/conf/admin_config.xml" def __init__(self, login, password): self.login = login self.password = password @staticmethod def _get_litespeed_pid(): """ Returns pid that is stored in litespeed's pidfile :return: str """ if os.path.isfile(LiteSpeed.PID_FILE_PATH) and os.path.isfile(LiteSpeed.HTPASSWD_PATH): with open(LiteSpeed.PID_FILE_PATH, encoding='utf-8') as f: return f.readline().rstrip(os.linesep) else: return None @staticmethod def is_litespeed_running(): """ Checks whether pid is not None. :return: bool """ return LiteSpeed._get_litespeed_pid() is not None def _get_litespeed_webadmin_port(self): """ Retrives current LiteSpeed webadmin console port :return: LiteSpeed webadmin console port as string """ try: # Part of Litespeed config, containing console port: # <?xml version="1.0" encoding="UTF-8"?> # <adminConfig> # <listenerList> # <listener> # <name>adminListener</name> # <address>*:7080</address> # <secure>0</secure> # </listener> # </listenerList> with open(self.LS_ADMIN_CONFIG, 'r', encoding='utf-8') as f: ls_adm_cfg = etree.parse(f).getroot() data = ls_adm_cfg.xpath("listenerList/listener/address")[0] return data.text.split(':')[1] except (AttributeError, IndexError, ValueError, OSError, IOError) as e: raise LiteSpeedException( "Can't determine current LiteSpeed webadmin console " f"port from config {self.LS_ADMIN_CONFIG}: {e}" ) from e def _get_requests(self): """ Get info about connections from litespeed and returns array of rows with data :return: list :raise: [LiteSpeedInvalidCredentials, LiteSpeedDisabledException] """ status_url = f'http://localhost:{self._get_litespeed_webadmin_port()}/status?rpt=details' request = urllib.request.Request(status_url) base64string = base64.b64encode(b'%s:%s' % (self.login.encode(), self.password.encode())) request.add_header(b"Authorization", b"Basic %s" % base64string) # get data from litespeed, check whether http code is 200 try: context = ssl._create_unverified_context() # pylint: disable=protected-access with urllib.request.urlopen( request, timeout=self.HTTP_TIMEOUT, context=context, ) as response: data = response.read() except urllib.error.HTTPError as e: if e.code in [401, 403]: raise LiteSpeedInvalidCredentials( "Litespeed login / password invalid. " "Please, try restart lvestats service." ) from e raise LiteSpeedDisabledException(str(e)) from e except Exception as e: # not good, but urllib raises lot of exceptions raise LiteSpeedDisabledException(str(e)) from e # remove empty lines result = [row for row in data.split(os.linesep.encode()) if row.strip() != b''] return result def __is_host_valid(self, host): """ Check whether host is not empty. :type host: str :return: bool """ host = host.strip() if host and host not in self.IGNORE_HOSTS: return True return False def _parse_request_info(self, request: bytes): """ :return: method, url, http_version """ request_info = request.strip(b'"').split() if len(request_info) == 3: method, url, http_version = request_info elif len(request_info) == 2: method, url = request_info http_version = b'' else: return None return method, url, http_version def get_user_data(self, username): """ Returns information about processed by user pages. :param username: :return list[list]: list of the lists [[Pid, Domain, Http type, Path, Http version, Time],...] :raises: LiteSpeedDownException """ data_delimiter = b'\t' pid = self._get_litespeed_pid() all_domains = get_all_user_domains(username) normalized_domains = set(map(normalize_domain, all_domains)) requests = self._get_requests() litespeed_requests = [] for request in requests: request_info = request.split(data_delimiter) if len(request_info) < LiteSpeedDataMapping.TOTAL_LEN: # that is not valid request info, skip it... continue host = request_info[LiteSpeedDataMapping.HOST] request = request_info[LiteSpeedDataMapping.REQUEST] # time since first request, seconds request_time = self.to_float(request_info[LiteSpeedDataMapping.TIME]) if self.__is_host_valid(host) and \ normalize_domain(host.decode()) in normalized_domains: request_data = self._parse_request_info(request) if request_data is not None: method, url, http_version = request_data litespeed_requests.append((pid, host, method, url, http_version, request_time)) return litespeed_requests @staticmethod def to_float(string): """ Converts str to float, if can't return -1. :type string: str :rtype: float """ try: return float(string) except ValueError: return -1.
Upload File
Create Folder