# nobodd: a boot configuration tool for the Raspberry Pi
#
# Copyright (c) 2024 Dave Jones <dave.jones@canonical.com>
# Copyright (c) 2024 Canonical Ltd.
#
# SPDX-License-Identifier: GPL-3.0
"""
Provides a simple interface to systemd's notification and watchdog services.
.. autoclass:: Systemd
"""
import os
import socket
from . import lang
[docs]
class Systemd:
"""
Provides a simple interface to systemd's notification and watchdog
services. It is suggested applications obtain a single, top-level instance
of this class via :func:`get_systemd` and use it to communicate with
systemd.
"""
__slots__ = ('_socket',)
LISTEN_FDS_START = 3
def __init__(self, address=None):
# Remove NOTIFY_SOCKET implicitly so child processes don't inherit it
self._socket = None
if address is None:
address = os.environ.pop('NOTIFY_SOCKET', None)
if address is not None:
if len(address) <= 1 or address[0] not in ('@', '/'):
return None
if address[0] == '@':
address = '\0' + address[1:] # abstract namespace socket
self._socket = socket.socket(
socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
try:
self._socket.connect(address)
except IOError:
self._socket = None
[docs]
def available(self):
"""
If systemd's notification socket is not available, raises
:exc:`RuntimeError`. Services expecting systemd notifications to be
available can call this to assert that notifications will be noticed.
"""
if self._socket is None:
raise RuntimeError(lang._(
'systemd notification socket unavailable'))
[docs]
def notify(self, state):
"""
Send a notification to systemd. *state* is a string type (if it is a
unicode string it will be encoded with the 'ascii' codec).
"""
if self._socket is not None:
if isinstance(state, str):
state = state.encode('ascii')
self._socket.sendall(state)
[docs]
def ready(self):
"""
Notify systemd that service startup is complete.
"""
self.notify('READY=1')
[docs]
def reloading(self):
"""
Notify systemd that the service is reloading its configuration. Call
:func:`ready` when reload is complete.
"""
self.notify('RELOADING=1')
[docs]
def stopping(self):
"""
Notify systemd that the service is stopping.
"""
self.notify('STOPPING=1')
[docs]
def extend_timeout(self, timeout):
"""
Notify systemd to extend the start / stop timeout by *timeout* seconds.
A timeout will occur if the service does not call :func:`ready` or
terminate within *timeout* seconds but *only* if the original timeout
(set in the systemd configuration) has already been exceeded.
For example, if the stopping timeout is configured as 90s, and the
service calls :func:`stopping`, systemd expects the service to
terminate within 90s. After 10s the service calls
:func:`extend_timeout` with a *timeout* of 10s. 20s later the service
has not yet terminated but systemd does *not* consider the timeout
expired as only 30s have elapsed of the original 90s timeout.
"""
self.notify(f'EXTEND_TIMEOUT_USEC={timeout * 1000000:d}')
[docs]
def watchdog_ping(self):
"""
Ping the systemd watchdog. This must be done periodically if
:func:`watchdog_period` returns a value other than ``None``.
"""
self.notify('WATCHDOG=1')
[docs]
def watchdog_reset(self, timeout):
"""
Reset the systemd watchdog timer to *timeout* seconds.
"""
self.notify(f'WATCHDOG_USEC={timeout * 1000000:d}')
[docs]
def watchdog_period(self):
"""
Returns the time (in seconds) before which systemd expects the process
to call :func:`watchdog_ping`. If a watchdog timeout is not set, the
function returns ``None``.
"""
timeout = os.environ.get('WATCHDOG_USEC')
if timeout is not None:
pid = os.environ.get('WATCHDOG_PID')
if pid is None or int(pid) == os.getpid():
return int(timeout) / 1000000
return None
[docs]
def watchdog_clean(self):
"""
Unsets the watchdog environment variables so that no future child
processes will inherit them.
.. warning::
After calling this function, :func:`watchdog_period` will return
``None`` but systemd will continue expecting :func:`watchdog_ping`
to be called periodically. In other words, you should call
:func:`watchdog_period` first and store its result somewhere before
calling this function.
"""
os.environ.pop('WATCHDOG_USEC', None)
os.environ.pop('WATCHDOG_PID', None)
[docs]
def main_pid(self, pid=None):
"""
Report the main PID of the process to systemd (for services that
confuse systemd with their forking behaviour). If *pid* is None,
:func:`os.getpid` is called to determine the calling process' PID.
"""
if pid is None:
pid = os.getpid()
self.notify(f'MAINPID={pid:d}')
[docs]
def listen_fds(self):
"""
Return file-descriptors passed to the service by systemd, e.g. as part
of socket activation or file descriptor stores. It returns a
:class:`dict` mapping each file-descriptor to its name, or the string
"unknown" if no name was given.
"""
try:
if int(os.environ['LISTEN_PID']) != os.getpid():
raise ValueError(lang._('wrong LISTEN_PID'))
fds = int(os.environ['LISTEN_FDS'])
except (ValueError, KeyError):
return {}
try:
names = os.environ['LISTEN_FDNAMES'].split(':')
except KeyError:
names = ['unknown'] * fds
if len(names) != fds:
return {}
return {
fd: name
for fd, name in zip(
range(self.LISTEN_FDS_START, self.LISTEN_FDS_START + fds),
names)
}
_SYSTEMD = None
[docs]
def get_systemd():
"""
Return a single top-level instance of :class:`Systemd`; repeated calls will
return the same instance.
"""
global _SYSTEMD
if _SYSTEMD is None:
_SYSTEMD = Systemd()
return _SYSTEMD