ROOTPLOIT
Server: Apache
System: Linux node6122.myfcloud.com 6.14.3-x86_64-linode168 #1 SMP PREEMPT_DYNAMIC Mon Apr 21 19:47:55 EDT 2025 x86_64
User: bashacomputer (1004)
PHP: 7.4.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //lib64/nagios/plugins/check_supervisord.py
#!/usr/bin/env python

# -*- coding: utf-8 -*-

# nagios-check-supervisord
# check_supervisord.py

# Copyright (c) 2015-2021 Alexei Andrushievich <vint21h@vint21h.pp.ua>
# Check supervisord programs status Nagios plugin [https://github.com/vint21h/nagios-check-supervisord/]  # noqa: E501
#
# This file is part of nagios-check-supervisord.
#
# nagios-check-supervisord is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


from __future__ import unicode_literals

import os
import sys
import stat
from collections import OrderedDict
from argparse import Namespace, ArgumentParser  # pylint: disable=W0611  # noqa: F401


try:
    import xmlrpc.client as xmlrpclib
except ImportError:
    import xmlrpclib  # type: ignore


__all__ = [
    "main",
    "CheckSupervisord",
]


# metadata
VERSION = (2, 2, 1)
__version__ = ".".join(map(str, VERSION))


class CheckSupervisord(object):
    """
    Check supervisord programs status Nagios plugin.
    """

    OUTPUT_TEMPLATES = {
        "critical": {"text": "problem with '{name}': ({status})", "priority": 1},
        "warning": {
            "text": "something curiously with '{name}': ({status})",
            "priority": 2,
        },
        "unknown": {"text": "'{name}' not found in server response", "priority": 3},
        "ok": {"text": "'{name}': OK", "priority": 4},
    }
    STATUS_CRITICAL, STATUS_WARNING, STATUS_UNKNOWN, STATUS_OK = [
        "critical",
        "warning",
        "unknown",
        "ok",
    ]
    EXIT_CODES = {
        STATUS_OK: 0,
        STATUS_WARNING: 1,
        STATUS_CRITICAL: 2,
        STATUS_UNKNOWN: 3,
    }
    (
        STATE_STOPPED,
        STATE_RUNNING,
        STATE_STARTING,
        STATE_BACKOFF,
        STATE_STOPPING,
        STATE_EXITED,
        STATE_FATAL,
        STATE_UNKNOWN,
    ) = (
        "STOPPED",
        "RUNNING",
        "STARTING",
        "BACKOFF",
        "STOPPING",
        "EXITED",
        "FATAL",
        "UNKNOWN",
    )
    STATE_TO_TEMPLATE = {
        STATE_STOPPED: STATUS_OK,
        STATE_RUNNING: STATUS_OK,
        STATE_STARTING: STATUS_WARNING,
        STATE_BACKOFF: STATUS_WARNING,
        STATE_STOPPING: STATUS_WARNING,
        STATE_EXITED: STATUS_WARNING,
        STATE_FATAL: STATUS_CRITICAL,
        STATE_UNKNOWN: STATUS_UNKNOWN,
    }
    URI_TPL_HTTP, URI_TPL_HTTP_AUTH, URI_TPL_SOCKET = "http", "http-auth", "socket"
    URI_TEMPLATES = {
        URI_TPL_HTTP: "http://{server}:{port}",
        URI_TPL_HTTP_AUTH: "http://{username}:{password}@{server}:{port}",
        URI_TPL_SOCKET: "unix://{server}",
    }
    (
        PRIORITY_CRITICAL,
        PRIORITY_WARNING,
        PRIORITY_UNKNOWN,
        PRIORITY_OK,
    ) = range(1, 5)
    PRIORITY_TO_STATUS = {
        PRIORITY_CRITICAL: STATUS_CRITICAL,
        PRIORITY_WARNING: STATUS_WARNING,
        PRIORITY_UNKNOWN: STATUS_UNKNOWN,
        PRIORITY_OK: STATUS_OK,
    }
    STATUS_TO_PRIORITY = {
        STATUS_CRITICAL: PRIORITY_CRITICAL,
        STATUS_WARNING: PRIORITY_WARNING,
        STATUS_UNKNOWN: PRIORITY_UNKNOWN,
        STATUS_OK: PRIORITY_OK,
    }
    HELP_STATUSES = "Possible variants: {statuses}".format(
        statuses=", ".join(EXIT_CODES.keys())
    )

    def __init__(self):
        """
        Get command line args.
        """

        self.options = self._get_options()  # type: ignore

    def _get_options(self):
        """
        Parse commandline options arguments.

        :return: parsed command line arguments
        :rtype: Namespace
        """

        parser = ArgumentParser(
            description="Check supervisord programs status Nagios plugin"
        )
        parser.add_argument(
            "-s",
            "--server",
            action="store",
            dest="server",
            type=str,
            default="",
            metavar="SERVER",
            help="server name, IP address or unix socket path",
        )
        parser.add_argument(
            "-p",
            "--port",
            action="store",
            type=int,
            dest="port",
            default=9001,
            metavar="PORT",
            help="port number",
        )
        parser.add_argument(
            "-P",
            "--programs",
            action="store",
            dest="programs",
            type=str,
            default="",
            metavar="PROGRAMS",
            help="comma separated programs list, or empty for all programs in supervisord response",  # noqa: E501
        )
        parser.add_argument(
            "-u",
            "--username",
            action="store",
            dest="username",
            type=str,
            default="",
            metavar="USERNAME",
            help="supervisord user",
        )
        parser.add_argument(
            "-S",
            "--password",
            action="store",
            dest="password",
            type=str,
            default="",
            metavar="PASSWORD",
            help="supervisord user password",
        )
        parser.add_argument(
            "--stopped-state-exit-code",
            action="store",
            dest="stopped_state_exit_code",
            type=str,
            choices=self.EXIT_CODES.keys(),
            default=self.STATUS_OK,
            metavar="STOPPED_STATE_EXIT_CODE",
            help="stopped state exit code. {statuses}".format(
                statuses=self.HELP_STATUSES
            ),
        )
        parser.add_argument(
            "--starting-state-exit-code",
            action="store",
            dest="starting_state_exit_code",
            type=str,
            choices=self.EXIT_CODES.keys(),
            default=self.STATUS_WARNING,
            metavar="STARTING_STATE_EXIT_CODE",
            help="starting state exit code. {statuses}".format(
                statuses=self.HELP_STATUSES
            ),
        )
        parser.add_argument(
            "--network-errors-exit-code",
            action="store",
            dest="network_errors_exit_code",
            type=str,
            choices=self.EXIT_CODES.keys(),
            default=self.STATUS_UNKNOWN,
            metavar="NETWORK_ERRORS_EXIT_CODE",
            help="network errors exit code. {statuses}".format(
                statuses=self.HELP_STATUSES
            ),
        )
        parser.add_argument(
            "--no-programs-defined-exit-code",
            action="store",
            dest="no_programs_defined_exit_code",
            type=str,
            choices=self.EXIT_CODES.keys(),
            default=self.STATUS_UNKNOWN,
            metavar="NO_PROGRAMS_DEFINED_EXIT_CODE",
            help="no programs defined exit code. {statuses}".format(
                statuses=self.HELP_STATUSES
            ),
        )
        parser.add_argument(
            "-q",
            "--quiet",
            action="store_true",
            default=False,
            dest="quiet",
            help="be quiet",
        )
        parser.add_argument(
            "-v",
            "--version",
            action="version",
            version="{version}".format(version=__version__),
        )

        options = parser.parse_args()
        # update stopped state value from command line argument
        self.STATE_TO_TEMPLATE[self.STATE_STOPPED] = options.stopped_state_exit_code
        # update starting state value from command line argument
        self.STATE_TO_TEMPLATE[self.STATE_STARTING] = options.starting_state_exit_code

        # check mandatory command line options supplied
        if not options.server:
            parser.error(message="Required server address option missing")
        if options.username and not options.password:
            parser.error(message="Required supervisord user password missing")

        return options

    def _get_connection_uri(self, tpl):
        """
        Creates server connection URI formatted connection string.

        :param tpl: connection string template name
        :type tpl: str
        :return: server connection URI formatted connection string
        :rtype: str
        """

        payload = {
            "username": self.options.username,
            "password": self.options.password,
            "server": self.options.server,
            "port": self.options.port,
        }

        return self.URI_TEMPLATES[tpl].format(**payload)

    def _get_connection(self):
        """
        Creates and return connection to supervisord.

        :return: connection to supervisord
        :rtype: ServerProxy
        """

        if self.options.server.startswith("/") and stat.S_ISSOCK(
            os.stat(self.options.server).st_mode
        ):  # communicate with server via unix socket
            # (check is server address is path and path is unix socket)
            try:
                import supervisor.xmlrpc
            except ImportError as error:  # noqa: B014
                sys.stdout.write(
                    "ERROR: Couldn't load module. {error}\n".format(error=error)
                )
                sys.stdout.write(
                    "ERROR: Unix socket support not available! Please install nagios-check-supervisord with unix socket support: 'nagios-check-supervisord[unix-socket-support]' or install 'supervisor' separately.\n"  # noqa: E501
                )
                sys.exit(3)

            if all([self.options.username, self.options.password]):  # with auth
                connection = xmlrpclib.ServerProxy(
                    uri="https://",
                    transport=supervisor.xmlrpc.SupervisorTransport(
                        self.options.username,
                        self.options.password,
                        serverurl=self._get_connection_uri(tpl=self.URI_TPL_SOCKET),  # type: ignore  # noqa: E501
                    ),
                )
            else:
                connection = xmlrpclib.ServerProxy(
                    uri="https://",
                    transport=supervisor.xmlrpc.SupervisorTransport(
                        username=None,
                        password=None,
                        serverurl=self._get_connection_uri(tpl=self.URI_TPL_SOCKET),  # type: ignore  # noqa: E501
                    ),
                )

        else:  # communicate with server via http
            if all([self.options.username, self.options.password]):  # with auth
                connection = xmlrpclib.Server(
                    uri=self._get_connection_uri(tpl=self.URI_TPL_HTTP_AUTH)  # type: ignore  # noqa: E501
                )
            else:
                connection = xmlrpclib.Server(
                    uri=self._get_connection_uri(tpl=self.URI_TPL_HTTP)  # type: ignore  # noqa: E501
                )

        return connection

    def _get_data(self):
        """
        Get and return data from supervisord.

        :return: data from supervisord
        :rtype: List[Dict[str, Union[str, int]]]
        """

        try:
            connection = self._get_connection()  # type: ignore  # noqa: E501

            return connection.supervisor.getAllProcessInfo()

        except Exception as error:
            if not self.options.quiet:
                sys.stdout.write(
                    "ERROR: Server communication problem. {error}\n".format(error=error)
                )
            sys.exit(
                self.EXIT_CODES.get(
                    self.options.network_errors_exit_code, self.STATUS_UNKNOWN
                )
            )

    def _get_status(self, data):
        """
        Create main status.

        :param data: devices states info
        :type data: List[Dict[str, Union[str, int]]]
        :return: main check status
        :rtype: str
        """

        # for multiple check need to get main status by priority
        priority = (
            min(  # type: ignore  # noqa: C407
                [
                    self.OUTPUT_TEMPLATES[self.STATE_TO_TEMPLATE[info["statename"]]][
                        "priority"
                    ]
                    for info in data
                ]
            )
            if data
            else self.STATUS_TO_PRIORITY[self.options.no_programs_defined_exit_code]
        )
        status = self.PRIORITY_TO_STATUS.get(priority, self.PRIORITY_CRITICAL)  # type: ignore  # noqa: E501

        return status

    def _get_code(self, status):
        """
        Create exit code.

        :param status: main check status
        :type status: str
        :return: exit code
        :rtype: int
        """

        # create exit code (unknown if something happened wrong)
        return self.EXIT_CODES.get(status, self.STATUS_UNKNOWN)

    def _get_output(self, data, status):
        """
        Create Nagios and human readable supervisord statuses.

        :param data: supervisord XML-RPC call result
        :type data: List[Dict[str, Union[str, int]]]
        :param status: main check status
        :type status: str
        :return: human readable supervisord statuses
        :rtype: str
        """

        states = OrderedDict()
        programs = (
            map(
                lambda program: program.strip(),
                self.options.programs.strip().split(","),
            )
            if self.options.programs
            else map(lambda x: x["name"], data)
        )

        for program in programs:
            try:
                info = min(filter(lambda x: x["name"] == program, data))
                states.update(
                    {
                        program: {
                            "name": program,
                            "template": self.STATE_TO_TEMPLATE[info["statename"]],
                            "status": info["spawnerr"]
                            if info["spawnerr"]
                            else info["statename"],
                        }
                    }
                )
            except ValueError:
                states.update(
                    {program: {"name": program, "template": "unknown", "status": ""}}
                )

        output = (
            ", ".join(
                [
                    str(
                        self.OUTPUT_TEMPLATES[states[program]["template"]]["text"]
                    ).format(**states[program])
                    for program in sorted(
                        states.keys(),
                        key=lambda item: self.OUTPUT_TEMPLATES[  # type: ignore
                            states[item]["template"]
                        ]["priority"],
                    )
                ]
            )
            if len(states)
            else "No program configured/found"
        )

        # return full status string with main status
        # for multiple programs and all programs states
        return "{status}: {output}\n".format(
            **{"status": status.upper(), "output": output}
        )

    def check(self):
        """
        Get data from server and create plugin output.

        :return: plugin output and exit code
        :rtype: Tuple[str, int]
        """

        data = self._get_data()  # type: ignore
        status = self._get_status(data=data)  # type: ignore
        code = self._get_code(status=status)  # type: ignore

        return self._get_output(data=data, status=status), code  # type: ignore


def main():
    """
    Program main.
    """

    checker = CheckSupervisord()  # type: ignore
    output, code = checker.check()  # type: ignore
    sys.stdout.write(output)
    sys.exit(code)


if __name__ == "__main__":

    main()  # type: ignore