Source code for autonomy_toolkit.containers.docker_client

# SPDX-License-Identifier: MIT
"""Helpful utilities for interacting with docker."""

# Imports from autonomy_toolkit
from autonomy_toolkit.utils.logger import LOGGER
from autonomy_toolkit.utils.atk_config import ATKConfig
from autonomy_toolkit.utils.files import file_exists

# External imports
import subprocess
import os
from typing import Optional, Any, List, Dict

ENV = os.environ.copy()
ENV["COMPOSE_IGNORE_ORPHANS"] = 1


[docs] class ContainerException(Exception): """ Exception class that is used by the :class:`ContainerClient` when an error occurs Args: message (Any): The message to be stored in the base class Exception stdout (str): The stdout from the command stderr (str): The stderr from the command """ def __init__(self, message: Any, stdout: str = None, stderr: str = None): super().__init__(message) self.stdout = stdout self.stderr = stderr
[docs] class DockerClient: """Client interface for interacting with docker compose orchestration. Args: config (ATKConfig): The ATK configuration object. dry_run (bool): Whether to actually run the commands or just print them. Use DEBUG logging level to see the commands. opts (List[str]): Options to pass to the ``docker compose`` command. args (List[str]): Arguments to pass to the ``docker compose <command>`` command. """ def __init__( self, config: ATKConfig, *, dry_run: bool = False, opts: List[str] = [], args: List[str] = [], ): self.config = config self.dry_run = dry_run self.services = config.services # Options passed to the compose command # i.e. .. compose ..opts <command> self._opts = opts self._opts = ["-f", self.config.compose_file] + self._opts for env_file in self.config.env_files: if file_exists(env_file): self._opts = ["--env-file", env_file] + self._opts # Args passed to the compose subcommand # i.e. .. compose <command> ...args self._args = args
[docs] def down(self) -> bool: """Bring down the containers. Returns: bool: Whether the command succeeded. """ return self.run_cmd("down")
[docs] def build(self) -> bool: """Build the images. Returns: bool: Whether the command succeeded. """ return self.run_cmd("build")
[docs] def up(self) -> bool: """Bring up the containers. Returns: bool: Whether the command succeeded. """ return self.run_cmd("up", "-d")
[docs] def run(self) -> bool: """Run a command in a container. Returns: bool: Whether the command succeeded. """ return self.run_cmd("run")
[docs] def attach(self) -> bool: """Attach to a container. NOTE: We assume a shell session is desired. This is somewhat difficult, as we don't know the default shell of the user in the container. We therefore have to determine this at runtime with a pretty nasty command. Returns: bool: Whether the command succeeded. """ # fmt: off exec_cmd = "($(awk -F: -v user=\"$(whoami)\" '$1 == user {print $NF}' /etc/passwd) || true)" # fmt: on self._args += ["sh", "-c", exec_cmd] return self.run_cmd("exec")
[docs] def run_cmd(self, cmd, *args, without_ots: bool = False, **kwargs) -> bool: """Run a command using the system wide ``compose`` command Additional positional args (``*args``) will be passed as command arguments when running the command. Named arguments will be passed to :meth:`subprocess.run` (`see their docs <https://docs.python.org/3/library/subprocess.html#subprocess.run>`_). Args: cmd (str): The command to run. Returns: Bool: Whether the command succeeded. """ return self.run_compose_cmd( *self._opts, cmd, *args, *self.services, *self._args, **kwargs )
[docs] def run_compose_cmd(self, *args, **kwargs) -> bool: """Run a docker compose command.""" return self._run_cmd("docker", "compose", *args, **kwargs)
def _run_cmd(self, *args, return_output=False, **kwargs): if return_output: kwargs["stdout"] = subprocess.PIPE kwargs["stderr"] = subprocess.PIPE cmd = " ".join([str(arg) for arg in args]) LOGGER.debug(f"{cmd}") def post_process_stream(stream: Optional[bytes]): if stream is None: return "" stream = stream.decode() if len(stream) != 0 and stream[-1] == "\n": stream = stream[:-1] return stream args = [arg for arg in args if arg] if not self.dry_run: completed_process = subprocess.run(args, **kwargs) else: LOGGER.info(f"'dry_run' set to true. Not running command.") return "", "" stdout = post_process_stream(completed_process.stdout) stderr = post_process_stream(completed_process.stderr) if completed_process.returncode: LOGGER.debug( f"Got an error code of '{completed_process.returncode}': {cmd}: {stdout}: {stderr}", ) if return_output: return stdout, stderr return completed_process.returncode