X7ROOT File Manager
Current Path:
/opt/cloudlinux/venv/lib/python3.11/site-packages/pyvirtualdisplay
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
pyvirtualdisplay
/
??
..
??
__init__.py
(241 B)
??
__pycache__
??
about.py
(20 B)
??
abstractdisplay.py
(12.29 KB)
??
display.py
(3.18 KB)
??
examples
??
py.typed
(0 B)
??
smartdisplay.py
(2.6 KB)
??
util.py
(486 B)
??
xauth.py
(1.14 KB)
??
xephyr.py
(1.6 KB)
??
xvfb.py
(1.9 KB)
??
xvnc.py
(1.87 KB)
Editing: abstractdisplay.py
import fnmatch import logging import os import select import subprocess import tempfile import time from threading import Lock from pyvirtualdisplay import xauth from pyvirtualdisplay.util import get_helptext, platform_is_osx log = logging.getLogger(__name__) # try: # import fcntl # except ImportError: # fcntl = None _mutex = Lock() _mutex_popen = Lock() _MIN_DISPLAY_NR = 1000 _USED_DISPLAY_NR_LIST = [] _X_START_TIMEOUT = 10 _X_START_TIME_STEP = 0.1 _X_START_WAIT = 0.1 class XStartTimeoutError(Exception): pass class XStartError(Exception): pass def _lock_files(): tmpdir = "/tmp" try: ls = os.listdir(tmpdir) except FileNotFoundError: log.warning("missing /tmp") return [] pattern = ".X*-lock" names = fnmatch.filter(ls, pattern) ls = [os.path.join(tmpdir, child) for child in names] ls = [p for p in ls if os.path.isfile(p)] return ls def _search_for_display(): # search for free display ls = list(map(lambda x: int(x.split("X")[1].split("-")[0]), _lock_files())) if len(ls): display = max(_MIN_DISPLAY_NR, max(ls) + 3) else: display = _MIN_DISPLAY_NR return display class AbstractDisplay(object): """ Common parent for X servers (Xvfb,Xephyr,Xvnc) """ def __init__(self, program, use_xauth, retries, extra_args, manage_global_env): self._extra_args = extra_args self._retries = retries self._program = program self.stdout = None self.stderr = None self.old_display_var = None self._subproc = None self.display = None self._is_started = False self._manage_global_env = manage_global_env self._reset_global_env = False self._pipe_wfd = None self._retries_current = 0 helptext = get_helptext(program) self._has_displayfd = "-displayfd" in helptext if not self._has_displayfd: log.debug("-displayfd flag is missing.") PYVIRTUALDISPLAY_DISPLAYFD = os.environ.get("PYVIRTUALDISPLAY_DISPLAYFD") if PYVIRTUALDISPLAY_DISPLAYFD: log.debug("PYVIRTUALDISPLAY_DISPLAYFD=%s", PYVIRTUALDISPLAY_DISPLAYFD) # '0'->false, '1'->true self._has_displayfd = bool(int(PYVIRTUALDISPLAY_DISPLAYFD)) else: # TODO: macos: displayfd is available on XQuartz-2.7.11 but it doesn't work, always 0 is returned if platform_is_osx(): self._has_displayfd = False self._check_flags(helptext) if use_xauth and not xauth.is_installed(): raise xauth.NotFoundError() self._use_xauth = use_xauth self._old_xauth = None self._xauth_filename = None def _check_flags(self, helptext): pass def _cmd(self): raise NotImplementedError() def _redirect_display(self, on): """ on: * True -> set $DISPLAY to virtual screen * False -> set $DISPLAY to original screen :param on: bool """ d = self.new_display_var if on else self.old_display_var if d is None: log.debug("unset $DISPLAY") try: del os.environ["DISPLAY"] except KeyError: log.warning("$DISPLAY was already unset.") else: log.debug("set $DISPLAY=%s", d) os.environ["DISPLAY"] = d def _env(self): env = os.environ.copy() env["DISPLAY"] = self.new_display_var return env def start(self): """ start display :rtype: self """ if self._is_started: raise XStartError(self, "Display was started twice.") self._is_started = True if self._has_displayfd: self._start1_has_displayfd() else: i = 0 while True: self._retries_current = i + 1 try: self._start1() break except XStartError: log.warning("start failed %s", i + 1) time.sleep(0.05) i += 1 if i >= self._retries: raise XStartError( "No success after %s retries. Last stderr: %s" % (self._retries, self.stderr) ) if self._manage_global_env: self._redirect_display(True) self._reset_global_env = True def _popen(self, use_pass_fds): with _mutex_popen: if use_pass_fds: self._subproc = subprocess.Popen( self._command, pass_fds=[self._pipe_wfd], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, ) else: self._subproc = subprocess.Popen( self._command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, ) def _start1_has_displayfd(self): # stdout doesn't work on osx -> create own pipe rfd, self._pipe_wfd = os.pipe() self._command = self._cmd() + self._extra_args log.debug("command: %s", self._command) self._popen(use_pass_fds=True) self.display = int(self._wait_for_pipe_text(rfd)) os.close(rfd) os.close(self._pipe_wfd) self.new_display_var = ":%s" % int(self.display) if self._use_xauth: self._setup_xauth() # https://github.com/ponty/PyVirtualDisplay/issues/2 # https://github.com/ponty/PyVirtualDisplay/issues/14 self.old_display_var = os.environ.get("DISPLAY", None) def _start1(self): with _mutex: self.display = _search_for_display() while self.display in _USED_DISPLAY_NR_LIST: self.display += 1 self.new_display_var = ":%s" % int(self.display) _USED_DISPLAY_NR_LIST.append(self.display) self._command = self._cmd() + self._extra_args log.debug("command: %s", self._command) self._popen(use_pass_fds=False) self.new_display_var = ":%s" % int(self.display) if self._use_xauth: self._setup_xauth() # https://github.com/ponty/PyVirtualDisplay/issues/2 # https://github.com/ponty/PyVirtualDisplay/issues/14 self.old_display_var = os.environ.get("DISPLAY", None) # wait until X server is active start_time = time.time() d = self.new_display_var ok = False time.sleep(0.05) # give time for early exit while True: if not self.is_alive(): break try: xdpyinfo = subprocess.Popen( ["xdpyinfo"], env=self._env(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, ) _, _ = xdpyinfo.communicate() exit_code = xdpyinfo.returncode except FileNotFoundError: log.warning( "xdpyinfo was not found, X start can not be checked! Please install xdpyinfo!" ) time.sleep(_X_START_WAIT) # old method ok = True break # try: # xdpyinfo = EasyProcess(["xdpyinfo"], env=self._env()) # xdpyinfo.enable_stdout_log = False # xdpyinfo.enable_stderr_log = False # exit_code = xdpyinfo.call().return_code # except EasyProcessError: # log.warning( # "xdpyinfo was not found, X start can not be checked! Please install xdpyinfo!" # ) # time.sleep(_X_START_WAIT) # old method # ok = True # break if exit_code != 0: pass else: log.info('Successfully started X with display "%s".', d) ok = True break if time.time() - start_time >= _X_START_TIMEOUT: break time.sleep(_X_START_TIME_STEP) if not self.is_alive(): log.warning("process exited early. stderr:%s", self.stderr) msg = "Failed to start process: %s" raise XStartError(msg % self) if not ok: msg = 'Failed to start X on display "%s" (xdpyinfo check failed, stderr:[%s]).' raise XStartTimeoutError(msg % (d, xdpyinfo.stderr)) def _wait_for_pipe_text(self, rfd): s = "" start_time = time.time() while True: (rfd_changed_ls, _, _) = select.select([rfd], [], [], 0.1) if not self.is_alive(): raise XStartError( "%s program closed. command: %s stderr: %s" % (self._program, self._command, self.stderr) ) if rfd in rfd_changed_ls: c = os.read(rfd, 1) if c == b"\n": break s += c.decode("ascii") # this timeout is for "eternal" hang. see #62 if time.time() - start_time >= 600: # = 10 minutes raise XStartTimeoutError( "No reply from program %s. command:%s" % ( self._program, self._command, ) ) return s def stop(self): """ stop display :rtype: self """ if not self._is_started: raise XStartError("stop() is called before start().") if self._reset_global_env: self._redirect_display(False) if self.is_alive(): try: self._subproc.kill() except OSError as oserror: log.debug("exception in terminate:%s", oserror) self._subproc.wait() self._read_stdout_stderr() if self._use_xauth: self._clear_xauth() return self def _read_stdout_stderr(self): if self.stdout is None: (self.stdout, self.stderr) = self._subproc.communicate() log.debug("stdout=%s", self.stdout) log.debug("stderr=%s", self.stderr) def _setup_xauth(self): """ Set up the Xauthority file and the XAUTHORITY environment variable. """ handle, filename = tempfile.mkstemp( prefix="PyVirtualDisplay.", suffix=".Xauthority" ) self._xauth_filename = filename os.close(handle) # Save old environment self._old_xauth = {} self._old_xauth["AUTHFILE"] = os.getenv("AUTHFILE") self._old_xauth["XAUTHORITY"] = os.getenv("XAUTHORITY") os.environ["AUTHFILE"] = os.environ["XAUTHORITY"] = filename cookie = xauth.generate_mcookie() xauth.call("add", self.new_display_var, ".", cookie) def _clear_xauth(self): """ Clear the Xauthority file and restore the environment variables. """ os.remove(self._xauth_filename) for varname in ["AUTHFILE", "XAUTHORITY"]: if self._old_xauth[varname] is None: del os.environ[varname] else: os.environ[varname] = self._old_xauth[varname] self._old_xauth = None def __enter__(self): """used by the :keyword:`with` statement""" self.start() return self def __exit__(self, *exc_info): """used by the :keyword:`with` statement""" self.stop() def is_alive(self): if not self._subproc: return False # return self.return_code is None rc = self._subproc.poll() if rc is not None: # proc exited self._read_stdout_stderr() return rc is None # @property # def return_code(self): # if not self._subproc: # return None # rc = self._subproc.poll() # if rc is not None: # # proc exited # self._read_stdout_stderr() # return rc @property def pid(self): """ PID (:attr:`subprocess.Popen.pid`) :rtype: int """ if self._subproc: return self._subproc.pid
Upload File
Create Folder