Monitoring#
Command assert#
I use this script to check that the result of shell commands correspond to some expected output. You can execute any arbirtary shell command.
If the resulting output is an unexpected one, a notification is sent.
The script also creates an RSS feed to complement the standard notifications. The RSS feed file should be accessible by an HTTP server such as Apache.
Basic setup#
install the dependencies
apt-get install python3-pip pythpon3-venv
install fpyutils. See reference
create a new user
useradd --system -s /bin/bash -U command-assert passwd command-assert usermod -aG jobs command-assert
create the jobs directories. See reference
mkdir -p /home/jobs/{scripts,services}/by-user/command-assert
create a new virtual environment
cd /home/jobs/scripts/by-user/command-assert python3 -m venv .venv . .venv/bin/activate
create the
requirements.txt
fileapprise feedgenerator PyYAML
install the dependencies
pip3 install -r requirements.txt deactivate
create the
script
#!/usr/bin/env python3 # # command_assert.py # # Copyright (C) 2020-2024 Franco Masotti # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. r"""command_assert.py.""" import datetime import pathlib import re import shlex import subprocess # nosec import sys import uuid import apprise import feedgenerator import yaml class InvalidCache(Exception): pass class InvalidConfiguration(Exception): pass def send_notification(message: str, apobj, title: str = 'command assert'): apobj.notify( body=message, title=title, ) def run_command( command: str, file_descriptor: str, process_timeout_interval: int = 60, process_in_timeout_retval: int = -131072, process_in_timeout_output: str = '<--##--##-->', ) -> tuple: r"""Run the command and capture the selected output and return value.""" if file_descriptor not in ['stderr', 'stdout', 'both']: raise ValueError command = shlex.split(command) try: # No exception is raised unless the process goes in timeout. result = subprocess.run(command, capture_output=True, shell=False, timeout=process_timeout_interval) # nosec if file_descriptor == 'stdout': output = result.stdout elif file_descriptor == 'stderr': output = result.stderr elif file_descriptor == 'both': output = result.stdout + result.stderr output = output.decode('UTF-8') retval = result.returncode except subprocess.TimeoutExpired: output = process_in_timeout_output retval = process_in_timeout_retval return output, retval def assert_output(output: str, expected_output: str, retval: int, expected_retval: int, strict_matching=False) -> bool: r"""Check that the output and the return value correspond to expected values.""" # Escape special regex characters. expected_output = re.escape(expected_output) if strict_matching: assertion_passes = re.match( expected_output, output) is not None and retval == expected_retval else: # Similar to grep. assertion_passes = re.search( expected_output, output) is not None and retval == expected_retval return assertion_passes ######## # Feed # ######## def add_feed_element(feed, id: int, title: str, content: str, date: datetime.datetime, description: str, author_email: str, author_name: str, link: str): feed.add_item( unique_id=str(id), title=title, link=link, description=description, author_email=author_email, author_name=author_name, pubdate=date, updatedate=date, content=content, ) ######### # Files # ######### def read_yaml_file(file: str) -> dict: data = dict() if pathlib.Path(file).is_file(): data = yaml.load(open(file), Loader=yaml.SafeLoader) return data def read_cache_file(file: str) -> dict: cache = read_yaml_file(file) if not check_cache_structure(cache): raise InvalidCache return cache def write_cache(cache: dict, cache_file: str): with open(cache_file, 'w') as f: f.write(yaml.dump(cache)) ################################## # Check configuration structure # ################################## def check_configuration_structure(configuration: dict) -> bool: ok = True if ('message_status' in configuration and 'process_in_timeout' in configuration and 'feed' in configuration and 'commands' in configuration): ok = True else: ok = False if (ok and 'ok' in configuration['message_status'] and 'error' in configuration['message_status'] and 'retval' in configuration['process_in_timeout'] and 'output' in configuration['process_in_timeout'] and 'enabled' in configuration['feed'] and 'feed' in configuration['feed'] and 'cache' in configuration['feed'] and 'total_last_feeds_to_keep' in configuration['feed'] and 'title' in configuration['feed'] and 'link' in configuration['feed'] and 'author_name' in configuration['feed'] and 'author_email' in configuration['feed'] and 'description' in configuration['feed'] and isinstance(configuration['message_status']['ok'], str) and isinstance(configuration['message_status']['error'], str) and isinstance(configuration['process_in_timeout']['retval'], int) and isinstance(configuration['process_in_timeout']['output'], str) and isinstance(configuration['feed']['enabled'], bool) and isinstance(configuration['feed']['feed'], str) and isinstance(configuration['feed']['cache'], str) and isinstance( configuration['feed']['total_last_feeds_to_keep'], int) and isinstance(configuration['feed']['title'], str) and isinstance(configuration['feed']['link'], str) and isinstance(configuration['feed']['author_name'], str) and isinstance(configuration['feed']['author_email'], str) and isinstance(configuration['feed']['description'], str)): ok = ok & True else: ok = ok & False if isinstance(configuration['commands'], dict): ok = ok & True else: ok = ok & False commands_keys = list(configuration['commands'].keys()) i = 0 while ok and i < len(commands_keys): cmd = configuration['commands'][commands_keys[i]] if ('command' in cmd and 'file_descriptor' in cmd and 'strict_matching' in cmd and 'expected_output' in cmd and 'expected_retval' in cmd and 'timeout_interval' in cmd and 'log_if_ok' in cmd and 'feed' in cmd and isinstance(cmd['command'], str) and isinstance(cmd['file_descriptor'], str) and isinstance(cmd['strict_matching'], bool) and isinstance(cmd['expected_output'], str) and isinstance(cmd['expected_retval'], int) and isinstance(cmd['timeout_interval'], int) and isinstance(cmd['log_if_ok'], bool) and isinstance(cmd['feed'], dict)): ok = ok & True feed = cmd['feed'] else: ok = ok & False if (ok and 'enabled' in feed and 'title' in feed and 'content' in feed and 'description' in feed and 'no_repeat_timeout_seconds' in feed and isinstance(feed['enabled'], bool) and isinstance(feed['title'], str) and isinstance(feed['content'], str) and isinstance(feed['description'], str) and isinstance(feed['no_repeat_timeout_seconds'], int)): ok = ok & True else: ok = ok & False i += 1 return ok ######################### # Check cache structure # ######################### def check_cache_structure(cache: dict) -> bool: i = 0 ok = True elements = list(cache.keys()) if len(elements) > 0: min = elements[0] while ok and i < len(elements): if not isinstance(elements[i], int): ok = ok & False if ok and elements[i] > 0: if elements[i] < min: ok = ok & False else: min = elements[i] i += 1 i = 0 while ok and i < len(cache): if (ok and 'command_id' in cache[elements[i]] and 'content' in cache[elements[i]] and 'description' in cache[elements[i]] and 'email' in cache[elements[i]] and 'link' in cache[elements[i]] and 'name' in cache[elements[i]] and 'pub_date' in cache[elements[i]] and 'title' in cache[elements[i]] and isinstance(cache[elements[i]]['command_id'], str) and isinstance(cache[elements[i]]['content'], str) and isinstance(cache[elements[i]]['description'], str) and isinstance(cache[elements[i]]['email'], str) and isinstance(cache[elements[i]]['link'], str) and isinstance(cache[elements[i]]['name'], str) and isinstance( cache[elements[i]]['pub_date'], datetime.datetime) and isinstance(cache[elements[i]]['title'], str)): ok = ok & True else: ok = ok & False i += 1 return ok if __name__ == '__main__': def main(): r"""Run the pipeline.""" # Load the configuration. configuration_file = shlex.quote(sys.argv[1]) config = yaml.load(open(configuration_file), Loader=yaml.SafeLoader) if not check_configuration_structure(config): raise InvalidConfiguration # Create an Apprise instance. apobj = apprise.Apprise() # Add all of the notification services by their server url. for uri in config['apprise_notifiers']['dest']: apobj.add(uri) print(uri) commands = config['commands'] # Create a new feed. feed = feedgenerator.Atom1Feed( title=config['feed']['title'], link=config['feed']['link'], author_name=config['feed']['author_name'], author_email=config['feed']['author_email'], description=config['feed']['description'], ) now = datetime.datetime.now(datetime.timezone.utc) # Load feed cache. cache = read_cache_file(config['feed']['cache']) if cache is None: cache = dict() # First and last key will be used as offsets. if len(cache) > 0: last_key = list(cache.keys())[-1] first_key = list(cache.keys())[0] else: last_key = 0 first_key = 1 # Keep only the last existing n elements. # Elements added to the running session will be purged on # the next run. old_cache_len = len(cache) cache = dict( list(cache.items()) [-config['feed']['total_last_feeds_to_keep']:len(cache)]) # Update the first key be removing the first elements. first_key += old_cache_len - config['feed']['total_last_feeds_to_keep'] # Set a default value if there are not enough elements. if first_key < 0: first_key = 1 # i is the unique id of the feed, excluding the offset. i = 0 for c in cache: # Replay existing cache. add_feed_element( feed, first_key + i, cache[c]['title'], cache[c]['content'], cache[c]['pub_date'], cache[c]['description'], cache[c]['email'], cache[c]['name'], cache[c]['link'], ) i += 1 # Counter for the cache elements. k = 1 for command in commands: output, retval = run_command( commands[command]['command'], commands[command]['file_descriptor'], commands[command]['timeout_interval'], config['process_in_timeout']['retval'], config['process_in_timeout']['output'], ) assertion_passes = assert_output( output, commands[command]['expected_output'], retval, commands[command]['expected_retval'], commands[command]['strict_matching']) if assertion_passes: result = config['message_status']['ok'] else: result = config['message_status']['error'] # Log results. if not assertion_passes or commands[command]['log_if_ok']: message = command + ' returned: ' + result try: send_notification(message, apobj, config['apprise_notifiers']['title']) except Exception as e: print(e) # Create new feed. if commands[command]['feed']['enabled']: command_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, command)) found = False idx = None j = len(cache) - 1 cache_keys = list(cache.keys()) # Get the most recent item. Filter by uuid. # See # https://docs.python.org/3.8/library/stdtypes.html#dict.values # about dict order iteration. while not found and j >= 0: if cache[cache_keys[j]]['command_id'] == command_id: found = True idx = cache_keys[j] j -= 1 timeout = commands[command]['feed'][ 'no_repeat_timeout_seconds'] if (found and (now - cache[idx]['pub_date']).seconds > timeout or not found): add_feed_element( feed, first_key + i, commands[command]['feed']['title'], commands[command]['feed']['content'], now, config['feed']['description'], config['feed']['author_email'], config['feed']['author_name'], '', ) # Always append. # last_key+k always > last_key cache[last_key + k] = { 'title': commands[command]['feed']['title'], 'content': commands[command]['feed']['content'], 'pub_date': now, 'description': config['feed']['description'], 'email': config['feed']['author_email'], 'name': config['feed']['author_name'], 'link': '', 'command_id': command_id, } k += 1 i += 1 # if k > 1 means that new elements were added in the last run. if ((k > 1 or not pathlib.Path(config['feed']['feed']).is_file()) and config['feed']['enabled']): write_cache(cache, config['feed']['cache']) with open(config['feed']['feed'], 'w') as fp: feed.write(fp, 'utf-8') main()
create a
configuration file
# # command_assert.mypurpose.yaml # # Copyright (C) 2020-2024 Franco Masotti # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # The string that is used for the notifications message status: ok: 'OK' error: 'ERROR' # Default values if a process goes in timeout. process in timeout: retval: -131072 output: '<--##--##-->' # XML feed header. feed: enabled: true # Path of the XML feed file. # This file is most useful if served with a web server. feed: '/home/command-assert/out/command_assert.mypurpose.xml' # Path of the cache file. cache: '/home/jobs/scripts/by-user/command-assert/.command_assert.mypurpose.yml' total_last_feeds_to_keep: 128 # Feed metadata. title: 'Outages of mypurpose' link: 'https://outage.my.domain' author_name: 'bot' author_email: 'myusername@gmail.com' description: 'Updates on outages' commands: webserver SSL: # The command as you would execute in a shell. command: 'curl --head https://my-server.com' # {stdout,stderr,both} file_descriptor: 'stdout' # If set to true match for the exact expected_output. strict_matching: false # A pattern that needs to be matched in the output. # Regex are NOT supported. expected_output: 'Server: Apache' # The return value is usually 0 for successful processes. expected_retval: 0 # Force kill the process after this time interval in seconds. timeout_interval: 5 # if set to true, send notifications even if the process completes correctly. log_if_ok: false feed: enabled: true title: 'outage mypurpose' # use HTML. content: '<em>Sorry</em>, the webserver was down' description: 'outage mypurpose' # If an error already exists in cache for less than no_repeat_timeout_seconds, # then do not repeat the feed. no_repeat_timeout_seconds: 3600 SSH server: command: 'ssh -p nonexistent@my-server.com' file_descriptor: 'stderr' strict_matching: false expected_output: 'NOTICE' expected_retval: 255 timeout_interval: 5 log_if_ok: false feed: enabled: true title: 'outage mypurpose' content: '<em>Sorry</em>, the SSH server was down' description: 'outage mypurpose' no_repeat_timeout_seconds: 3600 apprise_notifiers: # Follow the examples on # https://github.com/caronc/apprise dest: - 'nctalks://myuser:mypassword@myhost/roomid/' - 'mailtos://myusername:my%20awesome%20password?port=465&mode=ssl&from=myusername@gmail.com&to=myusername@gmail.com' title: 'command assert'
create a
Systemd service unit file
[Unit] Description=Command assert mypurpose Requires=network-online.target After=network-online.target [Service] Type=simple WorkingDirectory=/home/jobs/scripts/by-user/command-assert ExecStart=/bin/sh -c '. .venv/bin/activate && ./command_assert.py ./command_assert.mypurpose.yaml; deactivate' User=command-assert Group=command-assert [Install] WantedBy=multi-user.target
create a
Systemd timer unit file
[Unit] Description=Once every 30 minutes command assert mypurpose [Timer] OnCalendar=*:0/30 Persistent=true [Install] WantedBy=timers.target
fix owners and permissions
chown -R command-assert:command-assert /home/jobs/{scripts,services}/by-user/command-assert chmod 700 -R /home/jobs/{scripts,services}/by-user/command-assert .. note:: Avoid changing permission for the Python virtual environment!
run the deploy script