Monitoring#

Command assert#

I use this script to check that the result of shell commands correspond to some expected output. You can execute any arbirtary shell command.

If the resulting output is an unexpected one, a notification is sent.

The script also creates an RSS feed to complement the standard notifications. The RSS feed file should be accessible by an HTTP server such as Apache.

../../_images/command_assert_0.png

A Gotify notification showing a Gitea server error#

Basic setup#

  1. install the dependencies

    apt-get install python3-pip pythpon3-venv
    
  2. install fpyutils. See reference

  3. create a new user

    useradd --system -s /bin/bash -U command-assert
    passwd command-assert
    usermod -aG jobs command-assert
    
  4. create the jobs directories. See reference

    mkdir -p /home/jobs/{scripts,services}/by-user/command-assert
    
  5. create a new virtual environment

    cd /home/jobs/scripts/by-user/command-assert
    python3 -m venv .venv
    . .venv/bin/activate
    
  6. create the requirements.txt file

    /home/jobs/scripts/by-user/command-assert/requirements.txt#
    apprise
    feedgenerator
    PyYAML
    
  7. install the dependencies

    pip3 install -r requirements.txt
    deactivate
    
  8. create the script

    /home/jobs/scripts/by-user/command-assert/command_assert.py#
    #!/usr/bin/env python3
    #
    # command_assert.py
    #
    # Copyright (C) 2020-2024 Franco Masotti
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    r"""command_assert.py."""
    
    import datetime
    import pathlib
    import re
    import shlex
    import subprocess  # nosec
    import sys
    import uuid
    
    import apprise
    import feedgenerator
    import yaml
    
    
    class InvalidCache(Exception):
        pass
    
    
    class InvalidConfiguration(Exception):
        pass
    
    
    def send_notification(message: str, apobj, title: str = 'command assert'):
        apobj.notify(
            body=message,
            title=title,
        )
    
    
    def run_command(
        command: str,
        file_descriptor: str,
        process_timeout_interval: int = 60,
        process_in_timeout_retval: int = -131072,
        process_in_timeout_output: str = '<--##--##-->',
    ) -> tuple:
        r"""Run the command and capture the selected output and return value."""
        if file_descriptor not in ['stderr', 'stdout', 'both']:
            raise ValueError
    
        command = shlex.split(command)
        try:
            # No exception is raised unless the process goes in timeout.
            result = subprocess.run(command,
                                    capture_output=True,
                                    shell=False,
                                    timeout=process_timeout_interval)  # nosec
            if file_descriptor == 'stdout':
                output = result.stdout
            elif file_descriptor == 'stderr':
                output = result.stderr
            elif file_descriptor == 'both':
                output = result.stdout + result.stderr
            output = output.decode('UTF-8')
            retval = result.returncode
        except subprocess.TimeoutExpired:
            output = process_in_timeout_output
            retval = process_in_timeout_retval
    
        return output, retval
    
    
    def assert_output(output: str,
                      expected_output: str,
                      retval: int,
                      expected_retval: int,
                      strict_matching=False) -> bool:
        r"""Check that the output and the return value correspond to expected values."""
        # Escape special regex characters.
        expected_output = re.escape(expected_output)
    
        if strict_matching:
            assertion_passes = re.match(
                expected_output, output) is not None and retval == expected_retval
        else:
            # Similar to grep.
            assertion_passes = re.search(
                expected_output, output) is not None and retval == expected_retval
    
        return assertion_passes
    
    
    ########
    # Feed #
    ########
    def add_feed_element(feed, id: int, title: str, content: str,
                         date: datetime.datetime, description: str,
                         author_email: str, author_name: str, link: str):
        feed.add_item(
            unique_id=str(id),
            title=title,
            link=link,
            description=description,
            author_email=author_email,
            author_name=author_name,
            pubdate=date,
            updatedate=date,
            content=content,
        )
    
    
    #########
    # Files #
    #########
    def read_yaml_file(file: str) -> dict:
        data = dict()
        if pathlib.Path(file).is_file():
            data = yaml.load(open(file), Loader=yaml.SafeLoader)
    
        return data
    
    
    def read_cache_file(file: str) -> dict:
        cache = read_yaml_file(file)
        if not check_cache_structure(cache):
            raise InvalidCache
    
        return cache
    
    
    def write_cache(cache: dict, cache_file: str):
        with open(cache_file, 'w') as f:
            f.write(yaml.dump(cache))
    
    
    ##################################
    # Check configuration structure  #
    ##################################
    def check_configuration_structure(configuration: dict) -> bool:
        ok = True
        if ('message_status' in configuration
                and 'process_in_timeout' in configuration
                and 'feed' in configuration and 'commands' in configuration):
            ok = True
        else:
            ok = False
    
        if (ok and 'ok' in configuration['message_status']
                and 'error' in configuration['message_status']
                and 'retval' in configuration['process_in_timeout']
                and 'output' in configuration['process_in_timeout']
                and 'enabled' in configuration['feed']
                and 'feed' in configuration['feed']
                and 'cache' in configuration['feed']
                and 'total_last_feeds_to_keep' in configuration['feed']
                and 'title' in configuration['feed']
                and 'link' in configuration['feed']
                and 'author_name' in configuration['feed']
                and 'author_email' in configuration['feed']
                and 'description' in configuration['feed']
                and isinstance(configuration['message_status']['ok'], str)
                and isinstance(configuration['message_status']['error'], str)
                and isinstance(configuration['process_in_timeout']['retval'], int)
                and isinstance(configuration['process_in_timeout']['output'], str)
                and isinstance(configuration['feed']['enabled'], bool)
                and isinstance(configuration['feed']['feed'], str)
                and isinstance(configuration['feed']['cache'], str) and isinstance(
                    configuration['feed']['total_last_feeds_to_keep'], int)
                and isinstance(configuration['feed']['title'], str)
                and isinstance(configuration['feed']['link'], str)
                and isinstance(configuration['feed']['author_name'], str)
                and isinstance(configuration['feed']['author_email'], str)
                and isinstance(configuration['feed']['description'], str)):
            ok = ok & True
        else:
            ok = ok & False
    
        if isinstance(configuration['commands'], dict):
            ok = ok & True
        else:
            ok = ok & False
    
        commands_keys = list(configuration['commands'].keys())
        i = 0
        while ok and i < len(commands_keys):
            cmd = configuration['commands'][commands_keys[i]]
            if ('command' in cmd and 'file_descriptor' in cmd
                    and 'strict_matching' in cmd and 'expected_output' in cmd
                    and 'expected_retval' in cmd and 'timeout_interval' in cmd
                    and 'log_if_ok' in cmd and 'feed' in cmd
                    and isinstance(cmd['command'], str)
                    and isinstance(cmd['file_descriptor'], str)
                    and isinstance(cmd['strict_matching'], bool)
                    and isinstance(cmd['expected_output'], str)
                    and isinstance(cmd['expected_retval'], int)
                    and isinstance(cmd['timeout_interval'], int)
                    and isinstance(cmd['log_if_ok'], bool)
                    and isinstance(cmd['feed'], dict)):
                ok = ok & True
                feed = cmd['feed']
            else:
                ok = ok & False
            if (ok and 'enabled' in feed and 'title' in feed and 'content' in feed
                    and 'description' in feed
                    and 'no_repeat_timeout_seconds' in feed
                    and isinstance(feed['enabled'], bool)
                    and isinstance(feed['title'], str)
                    and isinstance(feed['content'], str)
                    and isinstance(feed['description'], str)
                    and isinstance(feed['no_repeat_timeout_seconds'], int)):
                ok = ok & True
            else:
                ok = ok & False
    
            i += 1
    
        return ok
    
    
    #########################
    # Check cache structure #
    #########################
    def check_cache_structure(cache: dict) -> bool:
        i = 0
        ok = True
        elements = list(cache.keys())
    
        if len(elements) > 0:
            min = elements[0]
    
        while ok and i < len(elements):
            if not isinstance(elements[i], int):
                ok = ok & False
            if ok and elements[i] > 0:
                if elements[i] < min:
                    ok = ok & False
                else:
                    min = elements[i]
            i += 1
    
        i = 0
        while ok and i < len(cache):
            if (ok and 'command_id' in cache[elements[i]]
                    and 'content' in cache[elements[i]]
                    and 'description' in cache[elements[i]]
                    and 'email' in cache[elements[i]]
                    and 'link' in cache[elements[i]]
                    and 'name' in cache[elements[i]]
                    and 'pub_date' in cache[elements[i]]
                    and 'title' in cache[elements[i]]
                    and isinstance(cache[elements[i]]['command_id'], str)
                    and isinstance(cache[elements[i]]['content'], str)
                    and isinstance(cache[elements[i]]['description'], str)
                    and isinstance(cache[elements[i]]['email'], str)
                    and isinstance(cache[elements[i]]['link'], str)
                    and isinstance(cache[elements[i]]['name'], str) and isinstance(
                        cache[elements[i]]['pub_date'], datetime.datetime)
                    and isinstance(cache[elements[i]]['title'], str)):
                ok = ok & True
            else:
                ok = ok & False
    
            i += 1
    
        return ok
    
    
    if __name__ == '__main__':
    
        def main():
            r"""Run the pipeline."""
            # Load the configuration.
            configuration_file = shlex.quote(sys.argv[1])
            config = yaml.load(open(configuration_file), Loader=yaml.SafeLoader)
            if not check_configuration_structure(config):
                raise InvalidConfiguration
    
            # Create an Apprise instance.
            apobj = apprise.Apprise()
    
            # Add all of the notification services by their server url.
            for uri in config['apprise_notifiers']['dest']:
                apobj.add(uri)
                print(uri)
    
            commands = config['commands']
            # Create a new feed.
            feed = feedgenerator.Atom1Feed(
                title=config['feed']['title'],
                link=config['feed']['link'],
                author_name=config['feed']['author_name'],
                author_email=config['feed']['author_email'],
                description=config['feed']['description'],
            )
            now = datetime.datetime.now(datetime.timezone.utc)
    
            # Load feed cache.
            cache = read_cache_file(config['feed']['cache'])
            if cache is None:
                cache = dict()
    
            # First and last key will be used as offsets.
            if len(cache) > 0:
                last_key = list(cache.keys())[-1]
                first_key = list(cache.keys())[0]
            else:
                last_key = 0
                first_key = 1
    
            # Keep only the last existing n elements.
            # Elements added to the running session will be purged on
            # the next run.
            old_cache_len = len(cache)
            cache = dict(
                list(cache.items())
                [-config['feed']['total_last_feeds_to_keep']:len(cache)])
    
            # Update the first key be removing the first elements.
            first_key += old_cache_len - config['feed']['total_last_feeds_to_keep']
            # Set a default value if there are not enough elements.
            if first_key < 0:
                first_key = 1
    
            # i is the unique id of the feed, excluding the offset.
            i = 0
            for c in cache:
                # Replay existing cache.
                add_feed_element(
                    feed,
                    first_key + i,
                    cache[c]['title'],
                    cache[c]['content'],
                    cache[c]['pub_date'],
                    cache[c]['description'],
                    cache[c]['email'],
                    cache[c]['name'],
                    cache[c]['link'],
                )
                i += 1
    
            # Counter for the cache elements.
            k = 1
            for command in commands:
                output, retval = run_command(
                    commands[command]['command'],
                    commands[command]['file_descriptor'],
                    commands[command]['timeout_interval'],
                    config['process_in_timeout']['retval'],
                    config['process_in_timeout']['output'],
                )
                assertion_passes = assert_output(
                    output, commands[command]['expected_output'], retval,
                    commands[command]['expected_retval'],
                    commands[command]['strict_matching'])
                if assertion_passes:
                    result = config['message_status']['ok']
                else:
                    result = config['message_status']['error']
    
                # Log results.
                if not assertion_passes or commands[command]['log_if_ok']:
    
                    message = command + ' returned: ' + result
                    try:
                        send_notification(message, apobj,
                                          config['apprise_notifiers']['title'])
                    except Exception as e:
                        print(e)
    
                    # Create new feed.
                    if commands[command]['feed']['enabled']:
                        command_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, command))
                        found = False
                        idx = None
                        j = len(cache) - 1
                        cache_keys = list(cache.keys())
                        # Get the most recent item. Filter by uuid.
                        # See
                        # https://docs.python.org/3.8/library/stdtypes.html#dict.values
                        # about dict order iteration.
                        while not found and j >= 0:
                            if cache[cache_keys[j]]['command_id'] == command_id:
                                found = True
                                idx = cache_keys[j]
                            j -= 1
    
                        timeout = commands[command]['feed'][
                            'no_repeat_timeout_seconds']
                        if (found and
                            (now - cache[idx]['pub_date']).seconds > timeout
                                or not found):
                            add_feed_element(
                                feed,
                                first_key + i,
                                commands[command]['feed']['title'],
                                commands[command]['feed']['content'],
                                now,
                                config['feed']['description'],
                                config['feed']['author_email'],
                                config['feed']['author_name'],
                                '',
                            )
    
                            # Always append.
                            # last_key+k always > last_key
                            cache[last_key + k] = {
                                'title': commands[command]['feed']['title'],
                                'content': commands[command]['feed']['content'],
                                'pub_date': now,
                                'description': config['feed']['description'],
                                'email': config['feed']['author_email'],
                                'name': config['feed']['author_name'],
                                'link': '',
                                'command_id': command_id,
                            }
    
                            k += 1
                            i += 1
    
            # if k > 1 means that new elements were added in the last run.
            if ((k > 1 or not pathlib.Path(config['feed']['feed']).is_file())
                    and config['feed']['enabled']):
                write_cache(cache, config['feed']['cache'])
                with open(config['feed']['feed'], 'w') as fp:
                    feed.write(fp, 'utf-8')
    
        main()
    
  9. create a configuration file

    /home/jobs/scripts/by-user/command-assert/command_assert.mypurpose.yaml#
    #
    # command_assert.mypurpose.yaml
    #
    # Copyright (C) 2020-2024 Franco Masotti
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    
    # The string that is used for the notifications
    message status:
        ok: 'OK'
        error: 'ERROR'
    
    # Default values if a process goes in timeout.
    process in timeout:
        retval: -131072
        output: '<--##--##-->'
    
    # XML feed header.
    feed:
        enabled: true
    
        # Path of the XML feed file.
        # This file is most useful if served with a web server.
        feed: '/home/command-assert/out/command_assert.mypurpose.xml'
    
        # Path of the cache file.
        cache: '/home/jobs/scripts/by-user/command-assert/.command_assert.mypurpose.yml'
    
        total_last_feeds_to_keep: 128
    
        # Feed metadata.
        title: 'Outages of mypurpose'
        link: 'https://outage.my.domain'
        author_name: 'bot'
        author_email: 'myusername@gmail.com'
        description: 'Updates on outages'
    
    commands:
        webserver SSL:
            # The command as you would execute in a shell.
            command: 'curl --head https://my-server.com'
    
            # {stdout,stderr,both}
            file_descriptor: 'stdout'
    
            # If set to true match for the exact expected_output.
            strict_matching: false
    
            # A pattern that needs to be matched in the output.
            # Regex are NOT supported.
            expected_output: 'Server: Apache'
    
            # The return value is usually 0 for successful processes.
            expected_retval: 0
    
            # Force kill the process after this time interval in seconds.
            timeout_interval: 5
    
            # if set to true, send notifications even if the process completes correctly.
            log_if_ok: false
    
            feed:
                enabled: true
                title: 'outage mypurpose'
    
                # use HTML.
                content: '<em>Sorry</em>, the webserver was down'
    
                description: 'outage mypurpose'
    
                # If an error already exists in cache for less than no_repeat_timeout_seconds,
                # then do not repeat the feed.
                no_repeat_timeout_seconds: 3600
    
        SSH server:
            command: 'ssh -p nonexistent@my-server.com'
            file_descriptor: 'stderr'
            strict_matching: false
            expected_output: 'NOTICE'
            expected_retval: 255
            timeout_interval: 5
            log_if_ok: false
            feed:
                enabled: true
                title: 'outage mypurpose'
                content: '<em>Sorry</em>, the SSH server was down'
                description: 'outage mypurpose'
                no_repeat_timeout_seconds: 3600
    
    apprise_notifiers:
      # Follow the examples on
      # https://github.com/caronc/apprise
      dest:
        - 'nctalks://myuser:mypassword@myhost/roomid/'
        - 'mailtos://myusername:my%20awesome%20password?port=465&mode=ssl&from=myusername@gmail.com&to=myusername@gmail.com'
      title: 'command assert'
    
  10. create a Systemd service unit file

    /home/jobs/services/by-user/command-assert/command-assert.mypurpose.service#
    [Unit]
    Description=Command assert mypurpose
    Requires=network-online.target
    After=network-online.target
    
    [Service]
    Type=simple
    WorkingDirectory=/home/jobs/scripts/by-user/command-assert
    ExecStart=/bin/sh -c '. .venv/bin/activate && ./command_assert.py ./command_assert.mypurpose.yaml; deactivate'
    User=command-assert
    Group=command-assert
    
    [Install]
    WantedBy=multi-user.target
    
  11. create a Systemd timer unit file

    /home/jobs/services/by-user/command-assert/command-assert.mypurpose.timer#
    [Unit]
    Description=Once every 30 minutes command assert mypurpose
    
    [Timer]
    OnCalendar=*:0/30
    Persistent=true
    
    [Install]
    WantedBy=timers.target
    
  12. fix owners and permissions

    chown -R command-assert:command-assert /home/jobs/{scripts,services}/by-user/command-assert
    chmod 700 -R /home/jobs/{scripts,services}/by-user/command-assert
    
    .. note:: Avoid changing permission for the Python virtual environment!
    
  13. run the deploy script

Sharing RSS feeds#

We assume that the Apache HTTP webserver is up and running before following these steps.

  1. install the dependencies

    apt-get install bindfs
    
  2. add this entry to the fstab file. In this example the directory is mounted in /srv/http/command_assert

    /etc/fstab#
    /home/command-assert/out /srv/http/command_assert fuse.bindfs  auto,force-user=www-data,force-group=www-data,ro 0 0
    
  3. create a directory readable be Apache

    mkdir -p /srv/http/command_assert
    chown www-data:www-data /srv/http/command_assert
    chmod 700 /srv/http/command_assert
    
  4. serve the files via HTTP by creating a new Apache virtual host. Replace FQDN with the appropriate domain and include this file from the Apache configuration

    /etc/apache2/command_assert.apache.conf#
    <IfModule mod_ssl.c>
    <VirtualHost *:443>
    
        UseCanonicalName on
    
        Keepalive On
        RewriteEngine on
    
        ServerName ${FQDN}
    
        # Set the icons also to avoid 404 errors.
        Alias /icons/ "/usr/share/apache2/icons/"
    
        DocumentRoot "/srv/http/command_assert"
        <Directory "/srv/http/command_assert">
            Options -ExecCGI -Includes
            Options +Indexes +SymlinksIfOwnerMatch
            IndexOptions NameWidth=* +SuppressDescription FancyIndexing Charset=UTF-8 VersionSort FoldersFirst
    
            ReadmeName footer.html
            IndexIgnore header.html footer.html
    
            #
            # AllowOverride controls what directives may be placed in .htaccess files.
            # It can be "All", "None", or any combination of the keywords:
            #   AllowOverride FileInfo AuthConfig Limit
            #
            AllowOverride All
    
            #
            # Controls who can get stuff from this server.
            #
            Require all granted
        </Directory>
    
        SSLCompression      off
    
        Include /etc/letsencrypt/options-ssl-apache.conf
        SSLCertificateFile /etc/letsencrypt/live/${FQDN}/fullchain.pem
        SSLCertificateKeyFile /etc/letsencrypt/live/${FQDN}/privkey.pem
    </VirtualHost>
    </IfModule>
    
  5. create an HTML file to be served as an explanation for the RSS feeds

    /srv/http/command_assert/footer.html#
    <h1>Command assert</h1>
    
    <h2>My purpose</h2>
    
  6. restart the Apache webserver

    systemctl restart apache2
    
  7. run the bind-mount

    mount -a