File: //lib64/nagios/plugins/check_jetbackup.py
#!/usr/bin/env python3
"""
JetBackup Health Check Plugin for Icinga
Description:
- Checks JetBackup MongoDB connection
- Verifies JetBackup license status
- Monitors backup job durations (--cdur / --wdur)
- Monitors recent JetBackup alerts (--calert / --walert)
- Prints backup destiantion(s)
Usage:
./check_jetbackup.py [--cdur 24] [--wdur 12] [--calert 24] [--walert 12]
Exit Codes:
0 OK
1 WARNING
2 CRITICAL
3 UNKNOWN
"""
import os
import sys
import subprocess
import shutil
import json
import re
from datetime import datetime, timedelta
import argparse
def run_command(command):
try:
return subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
universal_newlines=True
)
except subprocess.CalledProcessError:
return None
def check_jetmongo():
auth_file = "/usr/local/jetapps/etc/.mongod.auth"
if not os.path.isfile(auth_file):
print("OK: JetBackup is not installed on this server")
return 0
try:
with open(auth_file) as f:
password = next((line.strip().split("=", 1)[1] for line in f if line.startswith("PASS=")), None)
if not password:
print("CRITICAL: Password not found in auth file")
return 2
except Exception as e:
print(f"CRITICAL: Failed to read auth file: {e}")
return 2
mongo_paths = [
"/usr/local/jetapps/usr/bin/mongo",
"/usr/local/jetapps/usr/bin/mongosh"
]
mongo_cli = next((path for path in mongo_paths if os.access(path, os.X_OK)), None)
if not mongo_cli:
print("CRITICAL: mongo cli not found")
return 2
result = run_command([
mongo_cli,
"--port", "27217",
"--username", "root",
"--password", password,
"--authenticationDatabase", "admin",
"--quiet", "jetbackup",
"--eval", "printjson(db.stats().ok)"
])
if result is None:
print("CRITICAL: Failed to connect to JetMongo")
return 2
if result.stdout.strip() != "1":
print("CRITICAL: JetMongo is not working")
return 2
print("OK: JetMongo is working")
return 0
def check_jetbackup5_license():
result = run_command([
"sudo", "/usr/bin/jetbackup5api", "-F", "getDashboardDetails", "-O", "json"
])
if result is None:
print("CRITICAL: Failed to run jetbackup5api")
return 2
if '"licenseIssue":false' in result.stdout:
print("OK: No licensing issues found (JetBackup 5)")
return 0
else:
print("CRITICAL: There are licensing issues with the JetBackup 5 instance")
return 2
def check_jetbackup4_license():
result = run_command([
"sudo", "/usr/bin/jetapi", "backup", "-F", "licenseStatus"
])
if result is None:
print("CRITICAL: Failed to run jetapi")
return 2
if "licenseIssue: 1" in result.stdout:
print("CRITICAL: There are licensing issues with the JetBackup 4 instance")
return 2
else:
print("OK: No licensing issues found (JetBackup 4)")
return 0
def check_jetbackup_license():
if shutil.which("jetbackup5api"):
return check_jetbackup5_license()
else:
return check_jetbackup4_license()
def parse_time(time_str):
if not time_str:
return None
time_str = re.sub(r"(\d{2}):(\d{2})$", r"\1\2", time_str)
timezone_offset = timedelta(hours=int(time_str[-4:-2]), minutes=int(time_str[-2:]))
return datetime.strptime(time_str[:-6], "%Y-%m-%dT%H:%M:%S") + timezone_offset
def check_jetbackup_job_durations():
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--cdur", type=int, default=24, help="Critical duration threshold in hours (default: 24)")
parser.add_argument("--wdur", type=int, default=12, help="Warning duration threshold in hours (default: 12)")
args, _ = parser.parse_known_args()
critical_hours = args.cdur
warning_hours = args.wdur
if shutil.which("jetbackup5api"):
jobs_cmd = ["jetbackup5api", "-F", "listBackupJobs", "-O", "json"]
elif shutil.which("jetapi"):
jobs_cmd = ["jetapi", "backup", "-F", "listBackupJobs", "-O", "json"]
else:
print("UNKNOWN: No suitable JetBackup command found")
return 3
jobs_output = run_command(jobs_cmd)
if not jobs_output:
print("UNKNOWN: Failed to get JetBackup job data")
return 3
try:
jobs_data = json.loads(jobs_output.stdout)
except ValueError:
print("UNKNOWN: Failed to parse JetBackup jobs JSON")
return 3
jobs = jobs_data.get("data", {}).get("jobs", [])
if not jobs:
print("CRITICAL: No backup jobs found")
return 2
now = datetime.now()
critical_alerts = []
warning_alerts = []
for job in jobs:
job_id = job.get("_id")
running = job.get("running", False)
last_run = parse_time(job.get("last_run"))
if running and last_run:
age = now - last_run
if age > timedelta(hours=critical_hours):
critical_alerts.append(job_id)
elif age > timedelta(hours=warning_hours):
warning_alerts.append(job_id)
if critical_alerts:
print(f"CRITICAL: BackupJob(s) running > {critical_hours}h: {', '.join(critical_alerts)}")
return 2
elif warning_alerts:
print(f"WARNING: BackupJob(s) running > {warning_hours}h: {', '.join(warning_alerts)}")
return 1
print("OK: JetBackup Queue is ok")
return 0
def check_jetbackup_alerts():
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--calert", type=int, default=24, help="Critical alert threshold in hours")
parser.add_argument("--walert", type=int, default=12, help="Warning alert threshold in hours")
args, _ = parser.parse_known_args()
def is_command_available(command):
try:
subprocess.call([command], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except OSError:
return False
def get_current_users_count():
try:
if os.path.exists("/usr/local/cpanel/cpanel"):
command = "whmapi1 --output=jsonpretty get_current_users_count"
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
output, _ = process.communicate()
data = json.loads(output)
return data['data']['users']
except:
pass
try:
if os.path.exists("/usr/local/psa/version"):
command = "plesk bin customer --list | wc -l"
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
output, _ = process.communicate()
return int(output.strip())
except:
pass
try:
if os.path.exists("/usr/local/directadmin/directadmin"):
command = "/bin/ls -1 /usr/local/directadmin/data/users/ | /usr/bin/wc -l"
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
output, _ = process.communicate()
return int(output.decode('utf-8').strip())
except:
pass
def get_critical_titles(alerts, hours, source_cmd):
titles = ""
now = datetime.now()
threshold = now - timedelta(hours=hours)
exclude = ["Restore process for the account"]
for alert in alerts:
created = datetime.strptime(alert["created"][:-6], "%Y-%m-%dT%H:%M:%S")
title = alert["title"]
if any(ex in title for ex in exclude):
continue
if source_cmd == "jetapi" and alert.get("level", "0") == 128 and created > threshold:
titles += (" | " if titles else "") + title
elif source_cmd == "jetbackup5api" and alert.get("level", "0") == 4 and created > threshold:
titles += (" | " if titles else "") + title
return titles
if is_command_available("jetbackup5api"):
output = run_command(["jetbackup5api", "-F", "listAlerts", "-D", "sort[created]=-1", "-O", "json"])
command = "jetbackup5api"
elif is_command_available("jetapi"):
output = run_command(["jetapi", "backup", "-F", "listAlerts", "-O", "json"])
command = "jetapi"
else:
print("JetBackup not found on the server")
return 0
if not output:
print("UNKNOWN: Failed to retrieve alerts")
return 3
try:
data = json.loads(output.stdout)
alerts = data["data"]["alerts"]
except Exception:
print("UNKNOWN: Failed to parse JetBackup alerts")
return 3
users_count = get_current_users_count()
critical_titles = get_critical_titles(alerts, args.calert, command)
warning_titles = get_critical_titles(alerts, args.walert, command)
found_no_accounts = any("No accounts found for backup" in a.get("message", "") for a in alerts)
jb_config_export = any("JB configurations export Backup job Failed" in a.get("title", "") for a in alerts)
jb_download_backup = any("Download process for the account" in a.get("title", "") for a in alerts)
if critical_titles:
if users_count > 0 and found_no_accounts and not jb_config_export:
print(f"CRITICAL: {critical_titles} ({users_count} users)")
return 2
elif users_count == 0 and found_no_accounts:
print(f"OK: No accounts found for backup ({users_count} users)")
return 0
elif jb_config_export and len(critical_titles.split("|")) == 1:
print(f"WARNING: JB configurations export Backup job Failed ({users_count} users)")
return 1
elif jb_download_backup and len(critical_titles.split("|")) == 1:
print(f"WARNING: {critical_titles} ({users_count} users)")
return 1
else:
print(f"CRITICAL: {critical_titles} ({users_count} users)")
return 2
elif warning_titles:
if users_count > 0 and found_no_accounts:
print(f"WARNING: {warning_titles} ({users_count} users)")
return 1
elif users_count == 0 and found_no_accounts:
print(f"OK: No accounts found for backup ({users_count} users)")
return 0
else:
print(f"WARNING: {warning_titles} ({users_count} users)")
return 1
else:
print(f"OK: JetBackup alerts clean ({users_count} users)")
return 0
def get_backup_destinations():
binary = None
for cmd in ['jetbackup5api', 'jetapi']:
if os.path.exists(os.path.join('/usr/bin', cmd)):
binary = cmd
break
try:
subprocess.check_output(['which', cmd], stderr=subprocess.PIPE)
binary = cmd
break
except subprocess.CalledProcessError:
continue
try:
# Run the API command and get output
output = subprocess.check_output(
[binary, "-F", "listBackupJobs", "-O", "json"],
stderr=subprocess.PIPE
).decode('utf-8')
# Parse JSON and extract unique hosts
data = json.loads(output)
jet_destinations = set()
jobs = data.get('data', {}).get('jobs', [])
for job in jobs:
destinations = job.get('destination_details', [])
for dest in destinations:
options = dest.get('options', {})
host = options.get('host')
if host:
jet_destinations.add(host)
elif options.get('bucket'):
bucket = options.get('bucket')
region = options.get('region', '')
endpoint = options.get('endpoint', '')
formatted = f"bucket={bucket}/region={region}/endpoint={endpoint}"
jet_destinations.add(formatted)
backup_destinations = ",".join(jet_destinations) if jet_destinations else None
if backup_destinations:
print("OK: Backup destination(s):", backup_destinations)
return 0
else:
print("OK: Could not find a backup destination")
return 0
except Exception as e:
return None
if __name__ == "__main__":
if not (shutil.which("jetbackup5api") or shutil.which("jetbackup4api") or shutil.which("jetapi")):
print("OK: JetBackup is not installed on this server")
sys.exit(0)
if shutil.which("backuply"):
print("OK: Backuply is installed on this server")
sys.exit(0)
for check_fn in [check_jetmongo, get_backup_destinations, check_jetbackup_license, check_jetbackup_job_durations, check_jetbackup_alerts]:
result = check_fn()
if result != 0:
sys.exit(result)