X7ROOT File Manager
Current Path:
/opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clcommon
/
??
..
??
__init__.py
(1.37 KB)
??
__pycache__
??
clcagefs.py
(10.99 KB)
??
clcaptain.py
(1.96 KB)
??
clconfig.py
(1.68 KB)
??
clconfpars.py
(12.09 KB)
??
clcustomscript.py
(1.16 KB)
??
cldebug.py
(905 B)
??
clemail.py
(1.65 KB)
??
clexception.py
(1.14 KB)
??
clfunc.py
(6.47 KB)
??
clhook.py
(3.86 KB)
??
cllog.py
(1.45 KB)
??
cloutput.py
(471 B)
??
clproc.py
(4.05 KB)
??
clpwd.py
(7.74 KB)
??
clquota.py
(1.27 KB)
??
clsec.py
(657 B)
??
clwpos_lib.py
(16.6 KB)
??
const.py
(277 B)
??
cpapi
??
evr_utils.py
(3.55 KB)
??
features.py
(5.01 KB)
??
group_info_reader.py
(5.29 KB)
??
lib
??
lock.py
(1.02 KB)
??
mail_helper.py
(4.45 KB)
??
mysql_lib.py
(5.84 KB)
??
php_conf_reader.py
(9.77 KB)
??
public_hooks
??
sysctl.py
(7.61 KB)
??
ui_config.py
(3.12 KB)
??
utils.py
(32.91 KB)
??
utils_cmd.py
(2.71 KB)
Editing: clconfpars.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # import configparser import locale import os import re import syslog from collections import namedtuple class WebConfigParsingError(Exception): def __init__(self, message): self.message = message class WebConfigMissing(Exception): def __init__(self, message): self.message = message SECHEAD = 'asection' def load(path, case_sensitive=False, ignore_bad_encoding=False): config = configparser.ConfigParser(allow_no_value=True, interpolation=None, strict=False) if case_sensitive: config.optionxform = str if ignore_bad_encoding: with open(path, 'rb') as f: raw = f.read().decode(locale.getpreferredencoding(), 'replace') else: with open(path, 'r', encoding='utf-8') as f: raw = f.read() config.read_string(f'[{SECHEAD}]\n' + raw, source=path) return dict(config.items(section=SECHEAD)) _QUOTES = "'", '"' def _strip_escape_quotes_of_config_value(val: str) -> str: """ Strips single or double quote char only if the quote present from both sides. """ if val.startswith(_QUOTES) and val.endswith(_QUOTES): return val[1:-1] return val def load_fast(path, delimiter="=", strip_quotes=False): data = {} with open(path, "r", encoding="utf-8", errors="surrogateescape") as f: for line in f.readlines(): parts = line.split(delimiter, 1) try: key, value = parts except ValueError: # Skip broken lines continue value = value.strip() value = ( _strip_escape_quotes_of_config_value(value) if strip_quotes else value ) data[key.strip()] = value return data cache = {} def load_once(path, ignore_errors=False): """ Read ini file once (cached) and return its content as dict """ try: res = cache[path] except KeyError: try: res = cache[path] = load(path) except (IOError, configparser.Error): if not ignore_errors: raise res = cache[path] = {} return res def change_settings(settings_dict, path, tmp_path=None): if not tmp_path: tmp_path = path + ".tmp" used_keys = [] with (open(path, 'r', encoding='utf-8') as fin, open(tmp_path, 'w', encoding='utf-8') as fout): for line in fin: stripped_line = line.strip() if stripped_line and not stripped_line.startswith('#'): key, _ = stripped_line.split('=', 1) key = key.strip() if key in settings_dict: fout.write(f'{key}={settings_dict[key]}\n') used_keys.append(key) continue fout.write(line) with open(tmp_path, 'a', encoding='utf-8') as fout: for key in settings_dict: if key not in used_keys: fout.write(f'{key}={settings_dict[key]}\n') os.rename(tmp_path, path) _NGINX_TOKENS_RE = re.compile( r""" ( # Comments (:? \# .* $ ) # Single-, double-quoted strings and bare strings without whitespaces | (:? "[^"\n]*?" ) | (:? '[^'\n]*?' ) | (:? [^"';\s\{\}]+ ) # Structural characters | ; | \{ | \} | \n ) """, re.IGNORECASE | re.MULTILINE | re.VERBOSE, ) def _ngx_tokenize(data): tokens = ( match.group(0) for match in _NGINX_TOKENS_RE.finditer(data) if match and match.group(0) ) # Explicitly ignore comments return (tok for tok in tokens if not tok.startswith('#')) def _ngx_take_until(it, val): for tok in it: if tok in val: return yield tok def _ngx_take_until_block_end(it): lvl = 1 for t in it: if t == "{": lvl += 1 elif t == "}": lvl -= 1 if lvl < 1: return yield t def _ngx_scan_block_info(block_tokens, need_fields): """Scan a block for required fields, skips nested blocks""" info = {} for tok in block_tokens: # We need to skip until the end of inner block if it occurs if tok == "{": for _ in _ngx_take_until_block_end(block_tokens): pass # Now gather the value, the last occurrence is in priority if tok in need_fields: value_tokens = _ngx_take_until(block_tokens, ";\n") info[tok] = list(value_tokens) return info def nginx_conf_loose_parser(data): """ Parse content of NGINX configuration in a manner tolerant to minor mistakes and extract relevant fields from all `server` directives. Relevant fields are: - `server_name` - `root` - returned as `document_root` - `ssl` - if `listen` field contains "ssl" word Doesn't handle interpolated values (ex. `${val}`) outside of quoted strings """ tokens = _ngx_tokenize(data) for tok in tokens: if tok != "server": continue # Nothing seems to be allowed between "server" directive and # the opening of his block, so we just discard everything # until first block opening seen for _ in _ngx_take_until(tokens, "{"): pass # Limit further scan by the inside of block block_tokens = _ngx_take_until_block_end(tokens) # By using only `block_tokens` we ensure all blocks are properly delimited info = _ngx_scan_block_info(block_tokens, ("server_name", "root", "listen")) try: server_name = info["server_name"] root = info["root"] except KeyError: continue if not server_name and not root: continue yield { "server_name": _strip_escape_quotes_of_config_value(server_name[0]), "document_root": _strip_escape_quotes_of_config_value(root[0]), "ssl": "ssl" in info.get("listen", []), } def nginx_conf_parser(conf_file): """Parse NGINX config file, see `nginx_conf_loose_parser` for more details""" if not os.path.isfile(conf_file): raise WebConfigMissing(f'File does not exists {conf_file}') dirty_data = read_unicode_file_with_decode_fallback(conf_file) return list(nginx_conf_loose_parser(dirty_data)) def apache_conf_parser(conf_file): if not os.path.isfile(conf_file): raise WebConfigMissing(f'File does not exists {conf_file}') conf_data = [] data_all = read_unicode_file_with_decode_fallback(conf_file).splitlines() data = [i for i in data_all if re.search('^((?!#).)*$', i)] ID = 0 enable = False result = {} vhost = [] while len(data) > 0: out = data.pop(0) if "<VirtualHost" in out: ip_port = out.split()[1] port = '0' try: ip, port = ip_port.split(':') port = port.replace('>', '') except ValueError: ip = ip_port vhost.append(ip) vhost.append(port) enable = True continue if "</VirtualHost>" in out: result[ID] = vhost ID+=1 enable = False vhost = [] continue if enable: vhost.append(out) continue for value in result.values(): # result[i][0] is an IP # result[i][1] is a port data = { 'user' : None, 'server_name' : '', 'document_root' : '', 'server_alias' : None, 'port' : int(value[1]), 'ssl' : False, } for line in value: if "ServerName" in line: data['server_name'] = line.split()[1].strip().replace('www.', '') continue if "DocumentRoot" in line: # remove all whitespaces (first strip) and also quotes (second one) data['document_root'] = line.split()[1].strip().strip('"') continue if "ServerAlias" in line: data['server_alias'] = ','.join(str(n) for n in line.split()[1:]) continue if "SuexecUserGroup" in line: data['user'] = line.split()[1].strip() if "SSLEngine" in line: data['ssl'] = line.split()[1].strip().lower() == 'on' conf_data.append(data) return conf_data def openlitespeed_conf_parser(conf_file): """ Parsing Open Litespeed config Retrieves data from user's openlitespeed.conf :param: full httpd.conf path (/usr/local/directadmin/data/users/<username>/openlitespeed.conf) :return: list of virtual host configurations. """ if not os.path.isfile(conf_file): raise WebConfigMissing(f'File does not exist {conf_file}') conf_data = [] data_all = read_unicode_file_with_decode_fallback(conf_file).splitlines() vhost_dict = None for line in data_all: line = line.strip() if line.startswith('virtualHost'): # Add the previous virtual host to the list if vhost_dict: conf_data.append(vhost_dict) # Start a new virtual host vhost_dict = {'user': None, 'server_name': None, 'document_root': None, 'server_alias': None, 'port': None, 'ssl': False} # Extract port information match_port = re.search(r'-(\d+)', line) vhost_dict['port'] = int(match_port.group(1)) if match_port else None if vhost_dict is None: continue if line.startswith('user'): parts = line.split() if len(parts) == 2: vhost_dict['user'] = parts[1] elif line.startswith('docRoot'): parts = line.split() if len(parts) == 2: vhost_dict['document_root'] = parts[1] elif line.startswith('vhDomain'): parts = line.split() if len(parts) == 2: vhost_dict['server_name'] = parts[1] elif line.startswith('vhAliases'): parts = line.split() if len(parts) == 2: vhost_dict['server_alias'] = parts[1] elif line.startswith('vhssl'): vhost_dict['ssl'] = True # Add the last virtual host to the list if vhost_dict: conf_data.append(vhost_dict) return conf_data PamLVECfg = namedtuple('PamLVECfg', ['min_uid', 'cagefs_enabled', 'groups']) def parse_pam_lve_config(configfile): """ Parse string like: "session required pam_lve.so 500 1 group1,group2" :param configfile: path to config file to parse :type configfile: str :return: PamLVECfg instance when pam_lve configuratiom is found, None otherwise :rtype: namedtuple :raises: IOError, ValueError """ with open(configfile, 'r', encoding='utf-8') as f: for line in f: if line.startswith('#'): continue s = line.split() if len(s) >= 3 and s[2] == 'pam_lve.so': # parse config string taking pam_lve defaults into account min_uid = int(s[3]) if len(s) >= 4 else 500 cagefs_enabled = bool(int(s[4])) if len(s) >= 5 else False groups = s[5].split(',') if len(s) >= 6 else ['wheel'] return PamLVECfg(min_uid, cagefs_enabled, groups) # pam_lve line is not found in config file return None def read_unicode_file_with_decode_fallback(file_path: str) -> str: with open(file_path, 'rb') as f: raw_data = f.read() try: return raw_data.decode() except UnicodeDecodeError: syslog.syslog( syslog.LOG_WARNING, f'Failed to decode "{file_path}" content as utf-8 - loading with placeholders for invalid unicode sequences' ) return raw_data.decode(errors='replace')
Upload File
Create Folder