# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tools for interacting with the running computer or GPU."""
from __future__ import annotations
import logging
import multiprocessing
import os
import queue
import shutil
import subprocess
import time
from typing import TypeAlias
import numpy
import psutil
logger = logging.getLogger(__name__)
_nvidia_smi = shutil.which("nvidia-smi")
"""Location of the nvidia-smi program, if one exists."""
GB = float(2**30)
"""The number of bytes in a gigabyte."""
[docs]def run_nvidia_smi(
query: list | tuple, rename: list | tuple = None
) -> tuple | None:
"""Returns GPU information from query.
For a comprehensive list of options and help, execute ``nvidia-smi
--help-query-gpu`` on a host with a GPU
:param query: A list of query strings as defined by ``nvidia-smi --help-query-gpu``
:param rename: A list of keys to yield in the return value for each entry above. It
gives you the opportunity to rewrite some key names for convenience.
This list, if provided, must be of the same length as ``query``.
:return: An ordered dictionary (organized as 2-tuples) containing the queried
parameters (``rename`` versions). If ``nvidia-smi`` is not available,
returns ``None``. Percentage information is left alone,
memory information is transformed to gigabytes (floating-point).
"""
if _nvidia_smi is not None:
if rename is None:
rename = query
else:
assert len(rename) == len(query)
# Get GPU information based on GPU ID.
values_id = subprocess.getoutput(
"%s --query-gpu=%s --format=csv,noheader --id=%s"
% (
_nvidia_smi,
",".join(query),
os.environ.get("CUDA_VISIBLE_DEVICES"),
)
)
values = [k.strip() for k in values_id.split(",")]
t_values: list[str | float] = []
for k in values:
if k.endswith("%"):
t_values.append(float(k[:-1].strip()))
elif k.endswith("MiB"):
t_values.append(float(k[:-3].strip()) / 1024)
else:
t_values.append(k) # unchanged
return tuple(zip(rename, t_values))
return None
[docs]def gpu_constants() -> tuple | None:
"""Returns GPU (static) information using nvidia-smi.
See :py:func:`run_nvidia_smi` for operational details.
:return: If ``nvidia-smi`` is not available, returns ``None``, otherwise, we
return an ordered dictionary (organized as 2-tuples) containing the
following ``nvidia-smi`` query information:
* ``gpu_name``, as ``gpu_name`` (:py:class:`str`)
* ``driver_version``, as ``gpu_driver_version`` (:py:class:`str`)
* ``memory.total``, as ``gpu_memory_total`` (transformed to gigabytes,
:py:class:`float`)
"""
return run_nvidia_smi(
("gpu_name", "driver_version", "memory.total"),
("gpu_name", "gpu_driver_version", "gpu_memory_total"),
)
[docs]def gpu_log() -> tuple | None:
"""Returns GPU information about current non-static status using nvidia-
smi.
See :py:func:`run_nvidia_smi` for operational details.
:return: If ``nvidia-smi`` is not available, returns ``None``, otherwise, we
return an ordered dictionary (organized as 2-tuples) containing the
following ``nvidia-smi`` query information:
* ``memory.used``, as ``gpu_memory_used`` (transformed to gigabytes,
:py:class:`float`)
* ``memory.free``, as ``gpu_memory_free`` (transformed to gigabytes,
:py:class:`float`)
* ``100*memory.used/memory.total``, as ``gpu_memory_percent``,
(:py:class:`float`, in percent)
* ``utilization.gpu``, as ``gpu_percent``,
(:py:class:`float`, in percent)
"""
retval = run_nvidia_smi(
(
"memory.total",
"memory.used",
"memory.free",
"utilization.gpu",
),
(
"gpu_memory_total",
"gpu_memory_used",
"gpu_memory_free",
"gpu_percent",
),
)
# re-compose the output to generate expected values
return (
retval[1], # gpu_memory_used
retval[2], # gpu_memory_free
("gpu_memory_percent", 100 * (retval[1][1] / retval[0][1])),
retval[3], # gpu_percent
)
[docs]def cpu_constants() -> tuple:
"""Returns static CPU information about the current system.
:return: An ordered dictionary (organized as 2-tuples) containing these entries:
0. ``cpu_memory_total`` (:py:class:`float`): total memory available,
in gigabytes
1. ``cpu_count`` (:py:class:`int`): number of logical CPUs available
"""
return (
("cpu_memory_total", psutil.virtual_memory().total / GB),
("cpu_count", psutil.cpu_count(logical=True)),
)
[docs]class CPULogger:
"""Logs CPU information using :py:mod:`psutil`
Parameters
----------
pid : :py:class:`int`, Optional
Process identifier of the main process (parent process) to observe
"""
def __init__(self, pid=None):
this = psutil.Process(pid=pid)
self.cluster = [this] + this.children(recursive=True)
# touch cpu_percent() at least once for all processes in the cluster
[k.cpu_percent(interval=None) for k in self.cluster]
[docs] def log(self) -> tuple:
"""Returns current process cluster information.
:return: An ordered dictionary (organized as 2-tuples) containing these entries:
0. ``cpu_memory_used`` (:py:class:`float`): total memory used from
the system, in gigabytes
1. ``cpu_rss`` (:py:class:`float`): RAM currently used by
process and children, in gigabytes
2. ``cpu_vms`` (:py:class:`float`): total memory (RAM + swap) currently
used by process and children, in gigabytes
3. ``cpu_percent`` (:py:class:`float`): percentage of the total CPU
used by this process and children (recursively) since last call
(first time called should be ignored). This number depends on the
number of CPUs in the system and can be greater than 100%
4. ``cpu_processes`` (:py:class:`int`): total number of processes
including self and children (recursively)
5. ``cpu_open_files`` (:py:class:`int`): total number of open files by
self and children
"""
# check all cluster components and update process list
# done so we can keep the cpu_percent() initialization
stored_children = set(self.cluster[1:])
current_children = set(self.cluster[0].children(recursive=True))
keep_children = stored_children - current_children
new_children = current_children - stored_children
gone = set()
for k in new_children:
try:
k.cpu_percent(interval=None)
except (psutil.ZombieProcess, psutil.NoSuchProcess):
# child process is gone meanwhile
# update the intermediate list for this time
gone.add(k)
new_children = new_children - gone
self.cluster = (
self.cluster[:1] + list(keep_children) + list(new_children)
)
memory_info = []
cpu_percent = []
open_files = []
gone = set()
for k in self.cluster:
try:
memory_info.append(k.memory_info())
cpu_percent.append(k.cpu_percent(interval=None))
open_files.append(len(k.open_files()))
except (psutil.ZombieProcess, psutil.NoSuchProcess):
# child process is gone meanwhile, just ignore it
# it is too late to update any intermediate list
# at this point, but ensures to update counts later on
gone.add(k)
return (
("cpu_memory_used", psutil.virtual_memory().used / GB),
("cpu_rss", sum([k.rss for k in memory_info]) / GB),
("cpu_vms", sum([k.vms for k in memory_info]) / GB),
("cpu_percent", sum(cpu_percent)),
("cpu_processes", len(self.cluster) - len(gone)),
("cpu_open_files", sum(open_files)),
)
class _InformationGatherer:
"""A container to store monitoring information.
:param has_gpu: A flag indicating if we have a GPU installed on the
platform or not
:param main_pid: The main process identifier to monitor
:param logger: A logger to be used for logging messages
"""
def __init__(self, has_gpu: bool, main_pid: int, logger: logging.Logger):
self.cpu_logger = CPULogger(main_pid)
self.keys = [k[0] for k in self.cpu_logger.log()]
self.cpu_keys_len = len(self.keys)
self.has_gpu = has_gpu
self.logger = logger
if self.has_gpu:
self.keys += [k[0] for k in gpu_log()]
self.data: list = [[] for _ in self.keys]
def acc(self):
"""Accumulates another measurement."""
for i, k in enumerate(self.cpu_logger.log()):
self.data[i].append(k[1])
if self.has_gpu:
for i, k in enumerate(gpu_log()):
self.data[i + self.cpu_keys_len].append(k[1])
def summary(self):
"""Returns the current data."""
if len(self.data[0]) == 0:
self.logger.error("CPU/GPU logger was not able to collect any data")
retval = []
for k, values in zip(self.keys, self.data):
retval.append((k, values))
return tuple(retval)
def _monitor_worker(
interval: int | float,
has_gpu: bool,
main_pid: int,
stop: TypeAlias[multiprocessing.Event],
queue: TypeAlias[queue.Queue],
logging_level: int,
):
"""A monitoring worker that measures resources and returns lists.
:param interval: Number of seconds to wait between each measurement (maybe a floating
point number as accepted by :py:func:`time.sleep`)
:param has_gpu: A flag indicating if we have a GPU installed on the platform or not
:param main_pid: The main process identifier to monitor
:param stop: Indicates if we should continue running or stop
:param queue: A queue, to send monitoring information back to the spawner
:param logging_level: The logging level to use for logging from launched processes
"""
logger = multiprocessing.log_to_stderr(level=logging_level)
ra = _InformationGatherer(has_gpu, main_pid, logger)
while not stop.is_set():
try:
ra.acc() # guarantees at least an entry will be available
time.sleep(interval)
except Exception:
logger.warning(
"Iterative CPU/GPU logging did not work properly " "this once",
exc_info=True,
)
time.sleep(0.5) # wait half a second, and try again!
queue.put(ra.summary())
[docs]class ResourceMonitor:
"""An external, non-blocking CPU/GPU resource monitor.
Parameters
----------
:param interval: Number of seconds to wait between each measurement (maybe a floating
point number as accepted by :py:func:`time.sleep`)
:param has_gpu: A flag indicating if we have a GPU installed on the platform or not
:param main_pid: The main process identifier to monitor
:param logging_level: The logging level to use for logging from launched processes
"""
def __init__(
self,
interval: int | float,
has_gpu: bool,
main_pid: int,
logging_level: int,
):
self.interval = interval
self.has_gpu = has_gpu
self.main_pid = main_pid
self.event = multiprocessing.Event()
self.q: multiprocessing.Queue = multiprocessing.Queue()
self.logging_level = logging_level
self.monitor = multiprocessing.Process(
target=_monitor_worker,
name="ResourceMonitorProcess",
args=(
self.interval,
self.has_gpu,
self.main_pid,
self.event,
self.q,
self.logging_level,
),
)
self.data = None
[docs] @staticmethod
def monitored_keys(has_gpu):
return _InformationGatherer(has_gpu, None, logger).keys
def __enter__(self):
"""Starts the monitoring process."""
self.monitor.start()
return self
def __exit__(self, *exc):
"""Stops the monitoring process and returns the summary of
observations."""
self.event.set()
self.monitor.join()
if self.monitor.exitcode != 0:
logger.error(
f"CPU/GPU resource monitor process exited with code "
f"{self.monitor.exitcode}. Check logs for errors!"
)
try:
data = self.q.get(timeout=2 * self.interval)
except queue.Empty:
logger.warn(
f"CPU/GPU resource monitor did not provide anything when "
f"joined (even after a {2*self.interval}-second timeout - "
f"this is normally due to exceptions on the monitoring process. "
f"Check above for other exceptions."
)
self.data = None
else:
# summarize the returned data by creating means
summary = []
for k, values in data:
if values:
if k in ("cpu_processes", "cpu_open_files"):
summary.append((k, numpy.max(values)))
else:
summary.append((k, numpy.mean(values)))
else:
summary.append((k, 0.0))
self.data = tuple(summary)