Source code for quicken._cli

"""CLI wrapper interface for starting/using server process.
"""
from functools import wraps
import json
import logging
import multiprocessing
import os
from pathlib import Path
from typing import Callable, Dict, Optional, Union

from fasteners import InterProcessLock

from ._client import Client
from ._typing import NoneFunction
from ._constants import socket_name, server_state_name
from ._protocol import ProcessState, Request, RequestTypes
from ._signal import blocked_signals, forwarded_signals, SignalProxy
from ._xdg import cache_dir, chdir, RuntimeDir


logger = logging.getLogger(__name__)


CliFactoryT = Callable[[], NoneFunction]
BoolProvider = Callable[[], bool]


class QuickenError(Exception):
    """Generic error during server start - message has details.
    """
    pass


[docs]def cli_factory( name: str, *, runtime_dir_path: Optional[str] = None, log_file: Optional[str] = None, server_idle_timeout: Optional[float] = None, bypass_server: BoolProvider = None, reload_server: BoolProvider = None, ): """Decorator to mark a function that provides the main script entry point. To benefit most from the daemon speedup, you must do required imports within the factory function itself and then have the returned function itself do a minimal amount of configuration - only those things dependent on e.g. environment/cwd. If any imported top-level modules make use of environment then they must be reconfigured on invocation of the cli, otherwise the environment of future clients will not be taken into account. Args: name: the name used for the socket file. runtime_dir_path: the directory used for the socket and pid file. If not provided then we fall back to: `$XDG_RUNTIME_DIR/quicken-{name}` or `$TMPDIR/quicken-{name}-{uid}` or `/tmp/quicken-{name}-{uid}`. If the directory exists it must be owned by the current user and have permissions 700. log_file: optional log file used by the server, must be an absolute path. If not provided the default is `$XDG_CACHE_HOME/quicken-{name}/server.log` or `$HOME/.cache/quicken-{name}/server.log`. server_idle_timeout: time in seconds after which the server will shut down if no requests are being processed. bypass_server: if True then run command directly instead of trying to use daemon. reload_server: if True then restart the server before executing the function. Throws: QuickenError: If any directory used by runtime_dir does not have the correct permissions. """ def inner_cli_factory(factory_fn: CliFactoryT) -> NoneFunction: @wraps(factory_fn) def run_cli() -> Optional[int]: """ Returns: Result from function or remote execution, suitable for passing to :func:`sys.exit`. """ nonlocal log_file if log_file is None: log_file = cache_dir(f'quicken-{name}') / 'server.log' log_file = Path(log_file).absolute() if bypass_server and bypass_server(): logger.debug('Bypassing server') return factory_fn()() runtime_dir = RuntimeDir(f'quicken-{name}', runtime_dir_path) client = None with CliServerManager( factory_fn, runtime_dir, log_file, server_idle_timeout) as manager: try: client = manager.connect() if reload_server and reload_server(): logger.debug('Reloading server') client = manager.restart() # TODO: Get server version. # TODO: Get server context and kill without pid. except ConnectionRefusedError: logger.warning( 'Failed to connect to server - executing cli directly.') if not client: multiprocessing.current_process().authkey = os.urandom(32) return factory_fn()() return _run_client(client) return run_cli return inner_cli_factory
class CliServerManager: """Responsible for starting (if applicable) and connecting to the server. Race conditions are prevented by acquiring an exclusive lock on runtime_dir/admin during connection and start. """ def __init__( self, factory_fn, runtime_dir: RuntimeDir, log_file, server_idle_timeout): """ Args: factory_fn: function that provides the server request handler runtime_dir: runtime dir used for locks/socket log_file: server log file server_idle_timeout: idle timeout communicated to server if the process of connecting results in server start """ self._factory = factory_fn self._runtime_dir = runtime_dir self._log_file = log_file self._idle_timeout = server_idle_timeout self._lock = InterProcessLock('admin') # We initialize the Client and Listener classes without an authkey # parameter since there's no way to pre-share the secret securely # between processes not part of the same process tree. However, the # internal Client/Listener used as part of # multiprocessing.resource_sharer DOES initialize its own Client and # Listener with multiprocessing.current_process().authkey. We must have # some value so we use this dummy value. multiprocessing.current_process().authkey = b'0' * 32 def connect(self) -> Client: """Attempt to connect to the server, starting it if required. Args: timeout: seconds to wait for successful connection or startup Returns: Client connected to the server """ try: return self._get_client() except FileNotFoundError: # Server not up, no problem, we'll try to start it. logger.debug('Server not up, starting it.') except ConnectionRefusedError: socket_file = self._runtime_dir.path(socket_name) # Server may have died unexpectedly. logger.warning('Could not connect to server, starting it.') # Clean up the socket file before proceeding. socket_file.unlink() self._start_server() # Try to connect again, this time we don't catch anything, leave it to # the caller. return self._get_client() def restart(self) -> Client: """Restart the server and reconnect. """ self._stop_server() return self.connect() def _get_client(self) -> Client: """ Raises: FileNotFoundError if the socket file is not present ConnectionRefusedError if the socket file is present but the server is not accepting connections """ with chdir(self._runtime_dir): return Client(socket_name) def _get_server_state(self) -> Dict: """Retrieve server state data. """ with chdir(self._runtime_dir): return json.loads( Path(server_state_name).read_text(encoding='utf-8')) def _stop_server(self): from psutil import NoSuchProcess, Process server_state = self._get_server_state() pid = server_state['pid'] create_time = server_state['create_time'] try: process = Process(pid=pid) except NoSuchProcess: logger.debug( f'Daemon reload requested but process with pid {pid}' ' does not exist.') return if process.create_time() != create_time: logger.debug( 'Daemon reload requested but start time does not match' ' expected (probably new process re-using pid), skipping.') return try: # We don't want to leave it to the server to remove the socket since # we do not wait for it. with chdir(self._runtime_dir): os.unlink(socket_name) except FileNotFoundError: # No problem, if the file was removed at some point it doesn't # impact us. pass # This will cause the server to stop accepting clients and start # shutting down. It will wait for any still-running processes before # stopping completely, but it does not consume any other resources that # we are concerned with. process.terminate() def _start_server(self): """Start server as background process. The socket for the server has been created by the time this function returns. This function only returns in the parent, not the background process. """ cli = self._factory() # Lazy import so we only take the time to import if we have to start # the server. from ._server import run run( cli, log_file=self._log_file, runtime_dir=self._runtime_dir, server_idle_timeout=self._idle_timeout) def __enter__(self): """Enter the server admin lock context. """ with chdir(self._runtime_dir): # TODO: Ensure that this creates our lock file with 700 since # otherwise it might not be respected. self._lock.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): """Exit the server admin lock context. """ self._lock.release() def _run_client(client: Client) -> int: """Run command client against daemon listening at provided `socket_file`. Sends process context and waits for exit code. Process context includes: - environment - cwd - command line - file descriptors for stdin/out/err Args: sock: Socket connected to server. Must be a type appropriate for passing file descriptors. Returns: exit code of the process Raises: ConnectionRefusedError if server is not listening/available. """ logger.debug('Starting client communication') # Assume that we've already vetted the server and now we just need to run # the process. proxy = SignalProxy() # We must block signals before requesting remote process start otherwise # a user signal to the client may race with our ability to propagate it. with blocked_signals(forwarded_signals): state = ProcessState.for_current_process() logger.debug('Requesting process start') req = Request(RequestTypes.run_process, state) response = client.send(req) pid = response.contents logger.debug('Process running with pid: %d', pid) proxy.set_target(pid) logger.debug('Waiting for process to finish') response = client.send(Request(RequestTypes.wait_process_done, None)) return response.contents