X7ROOT File Manager
Current Path:
/opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/lib/commons
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
lvestats
/
lib
/
commons
/
??
..
??
__init__.py
(219 B)
??
__pycache__
??
argparse_utils.py
(11.25 KB)
??
dateutil.py
(5.6 KB)
??
decorators.py
(893 B)
??
func.py
(15.89 KB)
??
htpasswd.py
(2.25 KB)
??
litespeed.py
(6.67 KB)
??
logsetup.py
(4.5 KB)
??
proctitle.py
(2.9 KB)
??
profiler.py
(575 B)
??
progress.py
(1016 B)
??
sentry.py
(6.17 KB)
??
server_status.py
(1.31 KB)
??
sizeutil.py
(2.59 KB)
??
users_manager.py
(2.97 KB)
Editing: users_manager.py
#!/usr/bin/python # coding=utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging from collections import namedtuple from typing import Optional, Dict, Generator, Tuple, List # NOQA from clcommon import cpapi, FormattedException from lvestats.lib.commons import func class UserNotFoundError(FormattedException): pass class User(namedtuple('User', ['username', 'domain', 'reseller'])): pass class UsersInfoManager(object): """ Implements some different functions for user management; """ def __init__(self): self.users_cache = {} # type: Dict[str, User] def build_users_cache(self, reseller): # type: (Optional[str]) -> None """Cache data from cpapi for given reseller""" try: self.users_cache = dict(self._iter_panel_users(reseller)) except cpapi.NotSupported: logging.info("Control panel API is not implemented, " "some features may not work properly") def _iter_panel_users(self, reseller): # type: (Optional[str]) -> Generator[Tuple[str, User]] for login, reseller_, domain in self._iter_panel_users_tuples(reseller): yield login, User(username=login, reseller=reseller_, domain=domain) @staticmethod def _iter_panel_users_tuples(reseller): # type: (Optional[str]) -> Tuple[str, str, str] if reseller is None: for login, reseller_, domain in cpapi.cpinfo(keyls=('cplogin', 'reseller', 'dns')): yield login, reseller_, domain else: # TODO: do we really need get_reseller_domains? for login, domain in list(func.get_reseller_domains(reseller).items()): yield login, reseller, domain def get_domain(self, username, raise_exc=True): # type: (str, bool) -> Optional[str] """Get domain for user""" try: return self.users_cache[username].domain except KeyError as e: if raise_exc: raise UserNotFoundError({ 'message': "An error occurred while getting domain for user %(username)s", 'context': {'username': username}}) from e return None def get_reseller(self, username, raise_exc=True): # type: (str, bool) -> Optional[str] """Get reseller for user""" try: return self.users_cache[username].reseller except KeyError as e: if raise_exc: raise UserNotFoundError({ 'message': "An error occurred while getting reseller for user %(username)s", 'context': {'username': username}}) from e return None def get_login_list(self): # type: () -> List[str] """Get list of cached users""" return list(self.users_cache.keys()) g_users_manager = UsersInfoManager()
Upload File
Create Folder