X7ROOT File Manager
Current Path:
/lib/python3.6/site-packages/up2date_client
lib
/
python3.6
/
site-packages
/
up2date_client
/
??
..
??
__init__.py
(0 B)
??
__pycache__
??
capabilities.py
(7.28 KB)
??
clientCaps.py
(2.16 KB)
??
clpwd.py
(3.56 KB)
??
config.py
(14.46 KB)
??
debUtils.py
(2.76 KB)
??
getMethod.py
(4.2 KB)
??
haltree.py
(4.53 KB)
??
hardware.py
(31.91 KB)
??
hardware_gudev.py
(12.91 KB)
??
hardware_hal.py
(11.33 KB)
??
hardware_udev.py
(12.99 KB)
??
pkgUtils.py
(295 B)
??
pkgplatform.py
(309 B)
??
pmPlugin.py
(2.79 KB)
??
rhnChannel.py
(4.91 KB)
??
rhnHardware.py
(328 B)
??
rhnPackageInfo.py
(2.34 KB)
??
rhncli.py
(9.12 KB)
??
rhnreg.py
(31.35 KB)
??
rhnreg_constants.py
(18.13 KB)
??
rhnserver.py
(9.31 KB)
??
rpcServer.py
(11.72 KB)
??
rpmUtils.py
(5.2 KB)
??
transaction.py
(4.09 KB)
??
tui.py
(43.7 KB)
??
up2dateAuth.py
(10.69 KB)
??
up2dateErrors.py
(10.26 KB)
??
up2dateLog.py
(2.06 KB)
??
up2dateUtils.py
(5.06 KB)
Editing: up2dateAuth.py
# import os import pickle import sys import time import socket import subprocess from up2date_client import rpcServer try: # python2 from types import DictType except ImportError: # python3 DictType = dict from rhn import rpclib from up2date_client import clientCaps from up2date_client import config from up2date_client import rhnserver from up2date_client import up2dateErrors from up2date_client import up2dateLog from up2date_client import up2dateUtils loginInfo = None pcklAuthFileName = "/var/spool/up2date/loginAuth.pkl" def getSystemId(): cfg = config.initUp2dateConfig() path = cfg["systemIdPath"] if not os.access(path, os.R_OK): return None f = open(path, "r") ret = f.read() f.close() return ret def getPreferredInterface(): """ Extract the preferred_interface parameter from system_id XML Returns 'IPv4' or 'IPv6' if specified, otherwise 'IPv4' as default """ log = up2dateLog.initLog() systemId = getSystemId() preferred_interface = "IPv4" if systemId is None: return preferred_interface try: # Read the system ID XML and parse it # rpclib.xmlrpclib is the standard way to do this across the project params = rpclib.xmlrpclib.loads(systemId)[0][0] cfg_interface = params.get("preferred_interface", "IPv4") # Only 'IPv4' and 'IPv6' are valid options if preferred_interface in ("IPv4", "IPv6"): preferred_interface = cfg_interface else: log.log_me( "Invalid preferred_interface value '%s' in system_id, " "defaulting to '%s'" % (cfg_interface, preferred_interface) ) return preferred_interface except Exception: # If there's any error parsing the XML, default to IPv4 log.log_me("Failed to parse system_id XML, preferred_interface defaulting to 'IPv4'") log.log_exception(*sys.exc_info()) return preferred_interface # if a user has upgraded to a newer release of Red Hat but still # has a systemid from their older release, they need to get an updated # systemid from the RHN servers. This takes care of that. def maybeUpdateVersion(): # if process is in the middle of system upgrade, do not switch channel automatically if os.environ.get('LEAPP_IPU_IN_PROGRESS'): return cfg = config.initUp2dateConfig() try: idVer = rpclib.xmlrpclib.loads(getSystemId())[0][0]['os_release'] except: # they may not even have a system id yet. return 0 systemVer = up2dateUtils.getVersion() if not cfg['channelOverride'] and idVer != systemVer: s = rhnserver.RegistrationRhnServer() newSystemId = s.registration.upgrade_version(getSystemId(), systemVer) path = cfg["systemIdPath"] dir = path[:path.rfind("/")] if not os.access(dir, os.W_OK): try: os.mkdir(dir) except: return 0 if not os.access(dir, os.W_OK): return 0 if os.access(path, os.F_OK): # already have systemid file there; let's back it up savePath = path + ".save" try: os.rename(path, savePath) except: return 0 f = open(path, "w") f.write(newSystemId) f.close() try: os.chmod(path, int('0600', 8)) except: pass def writeCachedLogin(): """ Pickle loginInfo to a file Returns: True -- wrote loginInfo to a pickle file False -- did _not_ write loginInfo to a pickle file """ log = up2dateLog.initLog() log.log_debug("writeCachedLogin() invoked") if not loginInfo: log.log_debug("writeCachedLogin() loginInfo is None, so bailing.") return False data = {'time': time.time(), 'loginInfo': loginInfo} pcklDir = os.path.dirname(pcklAuthFileName) if not os.access(pcklDir, os.W_OK): try: os.mkdir(pcklDir) os.chmod(pcklDir, int('0700', 8)) except: log.log_me("Unable to write pickled loginInfo to %s" % pcklDir) return False pcklAuth = open(pcklAuthFileName, 'wb') os.chmod(pcklAuthFileName, int('0600', 8)) pickle.dump(data, pcklAuth) pcklAuth.close() expireTime = data['time'] + float(loginInfo['X-RHN-Auth-Expire-Offset']) log.log_debug("Wrote pickled loginInfo at ", data['time'], " with expiration of ", expireTime, " seconds.") return True def readCachedLogin(): """ Read pickle info from a file Caches authorization info for connecting to the server. """ log = up2dateLog.initLog() log.log_debug("readCachedLogin invoked") if not os.access(pcklAuthFileName, os.R_OK): log.log_debug("Unable to read pickled loginInfo at: %s" % pcklAuthFileName) return False pcklAuth = open(pcklAuthFileName, 'rb') try: data = pickle.load(pcklAuth) except (EOFError, ValueError): log.log_debug("Unexpected EOF. Probably an empty file, \ regenerate auth file") pcklAuth.close() return False pcklAuth.close() # Check if system_id has changed try: idVer = rpclib.xmlrpclib.loads(getSystemId())[0][0]['system_id'] cidVer = "ID-%s" % data['loginInfo']['X-RHN-Server-Id'] if idVer != cidVer: log.log_debug("system id version changed: %s vs %s" % (idVer, cidVer)) return False except: pass createdTime = data['time'] li = data['loginInfo'] currentTime = time.time() expireTime = createdTime + float(li['X-RHN-Auth-Expire-Offset']) #Check if expired, offset is stored in "X-RHN-Auth-Expire-Offset" log.log_debug("Checking pickled loginInfo, currentTime=", currentTime, ", createTime=", createdTime, ", expire-offset=", float(li['X-RHN-Auth-Expire-Offset'])) if (currentTime > expireTime): log.log_debug("Pickled loginInfo has expired, created = %s, expire = %s." \ %(createdTime, expireTime)) return False _updateLoginInfo(li) log.log_debug("readCachedLogin(): using pickled loginInfo set to expire at ", expireTime) return True def _updateLoginInfo(li): """ Update the global var, "loginInfo" """ global loginInfo if type(li) == DictType: if type(loginInfo) == DictType: # must retain the reference. loginInfo.update(li) else: # this had better be the initial login or we lose the reference. loginInfo = li else: loginInfo = None # allow to pass in a system id for use in rhnreg # a bit of a kluge to make caps work correctly def login(systemId=None, forceUpdate=False, timeout=None): log = up2dateLog.initLog() log.log_debug("login(forceUpdate=%s) invoked" % (forceUpdate)) if not forceUpdate and not loginInfo: if readCachedLogin(): return loginInfo server = rhnserver.RhnServer(timeout=timeout) # send up the capabality info headerlist = clientCaps.caps.headerFormat() for (headerName, value) in headerlist: server.add_header(headerName, value) if systemId == None: systemId = getSystemId() if not systemId: return None maybeUpdateVersion() log.log_me("logging into up2date server") li = server.up2date.login(systemId, socket.getfqdn(), _get_panel_name(log)) # figure out if were missing any needed caps server.capabilities.validate() _updateLoginInfo(li) #update global var, loginInfo writeCachedLogin() #pickle global loginInfo if loginInfo: log.log_me("successfully retrieved authentication token " "from up2date server") log.log_debug("logininfo:", loginInfo) return loginInfo def updateLoginInfo(timeout=None): log = up2dateLog.initLog() log.log_me("updateLoginInfo() login info") # NOTE: login() updates the loginInfo object login(forceUpdate=True, timeout=timeout) if not loginInfo: raise up2dateErrors.AuthenticationError("Unable to authenticate") return loginInfo def getLoginInfo(timeout=None): global loginInfo try: loginInfo = loginInfo except NameError: loginInfo = None if loginInfo: return loginInfo # NOTE: login() updates the loginInfo object login(timeout=timeout) return loginInfo class _FailedToGetPanelName(Exception): pass def _get_panel_name(log): try: panel_name = _get_panel_name_via_cldetect() except _FailedToGetPanelName: # Do not spam logs when we're executed from under cldeploy script # and no cloudlinux packages are yet installed. if not _is_cldeploy_running(): log.log_exception(*sys.exc_info()) log.log_me("Failed to get panel name via cldetect, using fallback mechanism") panel_name = _fallback_get_panel_name() return panel_name.lower() def _is_cldeploy_running(): # NOTE(vlebedev): This is more or less a direct port of shell code from cldeploy script. lock_file_path = '/var/lock/cldeploy.lck' cldeploy_running = False pid = None if os.path.exists(lock_file_path): with open(lock_file_path) as f: pid = f.read().strip() if pid: pid_cmdline_path = "/proc/%s/cmdline" % pid if os.path.exists(pid_cmdline_path): with open(pid_cmdline_path) as f: cldeploy_running = 'cldeploy' in f.read() return cldeploy_running def _get_panel_name_via_cldetect(): binary, _ = cmd = ["/usr/bin/cldetect", "--detect-cp-nameonly"] if not os.path.exists(binary): raise _FailedToGetPanelName( "Failed to obtain panel name because '%s' does not exist" % binary ) process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) stdout, stderr = process.communicate() if process.returncode != 0: raise _FailedToGetPanelName( "Failed to obtain panel name using '%s' command; stderr: %s" % (" ".join(cmd), stderr) ) return stdout.strip() def _fallback_get_panel_name(): # NOTE(vlebedev): This is more or less a direct port of shell code from cldeploy script. # TODO(vlebedev): Pass $PANEL from cldeploy instead of duplication? if os.path.isdir("/usr/local/psa/admin/"): return "plesk" if os.path.isdir("/usr/local/interworx/"): return "interworx" if os.path.isdir("/usr/local/cpanel/whostmgr/docroot/"): return "cpanel" if os.path.isdir("/usr/local/ispmgr/"): return "ispmgr" if os.path.isdir("/usr/local/directadmin/"): return "directadmin" if os.path.isfile("/usr/local/mgr5/sbin/mgrctl"): return "ispmgr5" return "unknown"
Upload File
Create Folder