mirror of
https://github.com/healthchecks/healthchecks.git
synced 2025-03-14 20:32:51 +00:00
497 lines
18 KiB
Python
497 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import random
|
|
import uuid
|
|
from datetime import date, datetime
|
|
from datetime import timedelta as td
|
|
from secrets import token_urlsafe
|
|
from typing import TYPE_CHECKING, Any
|
|
from urllib.parse import quote, urlencode
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.hashers import check_password, make_password
|
|
from django.contrib.auth.models import User
|
|
from django.core.signing import BadSignature, TimestampSigner
|
|
from django.db import models
|
|
from django.db.models import Q, QuerySet
|
|
from django.db.models.functions import Lower
|
|
from django.urls import reverse
|
|
from django.utils.timezone import now
|
|
|
|
from hc.lib import emails
|
|
from hc.lib.date import month_boundaries, week_boundaries
|
|
from hc.lib.signing import sign_bounce_id
|
|
from hc.lib.urls import absolute_reverse
|
|
|
|
if TYPE_CHECKING:
|
|
# Importing Check at runtime would cause a circular import, so only import it
|
|
# during type checking
|
|
from hc.api.models import Check
|
|
|
|
CheckQuerySet = QuerySet[Check]
|
|
|
|
|
|
NO_NAG = td()
|
|
NAG_PERIODS = (
|
|
(NO_NAG, "Disabled"),
|
|
(td(hours=1), "Hourly"),
|
|
(td(days=1), "Daily"),
|
|
)
|
|
|
|
REPORT_CHOICES = (("off", "Off"), ("weekly", "Weekly"), ("monthly", "Monthly"))
|
|
# How long an account can be over limits before it is scheduled for deletion
|
|
OVER_LIMIT_GRACE = td(days=31)
|
|
# When scheduling for deletion, how many days in the future to schedule
|
|
DELETION_GRACE = td(days=31)
|
|
|
|
|
|
def month(dt: datetime) -> date:
|
|
"""For a given datetime, return the matching first-day-of-month date."""
|
|
return dt.date().replace(day=1)
|
|
|
|
|
|
class ProfileManager(models.Manager["Profile"]):
|
|
def for_user(self, user: User) -> Profile:
|
|
try:
|
|
return user.profile
|
|
except Profile.DoesNotExist:
|
|
profile = Profile(user=user)
|
|
if not settings.USE_PAYMENTS:
|
|
# If not using payments, set high limits
|
|
profile.check_limit = 10000
|
|
profile.sms_limit = 10000
|
|
profile.call_limit = 10000
|
|
profile.team_limit = 10000
|
|
|
|
profile.save()
|
|
return profile
|
|
|
|
|
|
class Profile(models.Model):
|
|
user = models.OneToOneField(User, models.CASCADE)
|
|
next_report_date = models.DateTimeField(null=True, blank=True)
|
|
reports = models.CharField(max_length=10, default="monthly", choices=REPORT_CHOICES)
|
|
nag_period = models.DurationField(default=NO_NAG, choices=NAG_PERIODS)
|
|
next_nag_date = models.DateTimeField(null=True, blank=True)
|
|
ping_log_limit = models.IntegerField(default=100)
|
|
check_limit = models.IntegerField(default=20)
|
|
token = models.CharField(max_length=128, blank=True)
|
|
|
|
last_sms_date = models.DateTimeField(null=True, blank=True)
|
|
sms_limit = models.IntegerField(default=5)
|
|
sms_sent = models.IntegerField(default=0)
|
|
|
|
last_call_date = models.DateTimeField(null=True, blank=True)
|
|
call_limit = models.IntegerField(default=0)
|
|
calls_sent = models.IntegerField(default=0)
|
|
|
|
team_limit = models.IntegerField(default=2)
|
|
sort = models.CharField(max_length=20, default="created")
|
|
# The date when "Inactive Account Notification" is sent
|
|
deletion_notice_date = models.DateTimeField(null=True, blank=True)
|
|
# Set manually by admin, causes an orange banner in web UI
|
|
deletion_scheduled_date = models.DateTimeField(null=True, blank=True)
|
|
# If the account is over its check limit, the date when it went over the limit
|
|
over_limit_date = models.DateTimeField(null=True, blank=True)
|
|
last_active_date = models.DateTimeField(null=True, blank=True)
|
|
tz = models.CharField(max_length=36, default="UTC")
|
|
theme = models.CharField(max_length=10, null=True, blank=True)
|
|
|
|
totp = models.CharField(max_length=32, null=True, blank=True)
|
|
totp_created = models.DateTimeField(null=True, blank=True)
|
|
|
|
objects = ProfileManager()
|
|
|
|
def __str__(self) -> str:
|
|
return f"Profile for {self.user.email}"
|
|
|
|
def notifications_url(self) -> str:
|
|
return absolute_reverse("hc-notifications")
|
|
|
|
def reports_unsub_url(self) -> str:
|
|
signer = TimestampSigner(salt="reports")
|
|
signed_username = signer.sign(self.user.username)
|
|
return absolute_reverse("hc-unsubscribe-reports", args=[signed_username])
|
|
|
|
def prepare_token(self) -> str:
|
|
token = token_urlsafe(24)
|
|
# Store a hashed transformation of the login token
|
|
self.token = make_password(token, "login")
|
|
self.save()
|
|
# Sign the token so we can check its age later
|
|
return TimestampSigner().sign(token)
|
|
|
|
def check_token(self, token: str) -> bool:
|
|
try:
|
|
token = TimestampSigner().unsign(token, max_age=3600)
|
|
except BadSignature:
|
|
return False
|
|
|
|
return "login" in self.token and check_password(token, self.token)
|
|
|
|
def send_instant_login_link(
|
|
self, membership: Member | None = None, redirect_url: str | None = None
|
|
) -> None:
|
|
token = self.prepare_token()
|
|
url = absolute_reverse("hc-check-token", args=[self.user.username, token])
|
|
if redirect_url:
|
|
url += "?next=%s" % redirect_url
|
|
|
|
ctx = {
|
|
"button_text": "Log In",
|
|
"button_url": url,
|
|
"membership": membership,
|
|
}
|
|
emails.login(self.user.email, ctx)
|
|
|
|
def send_change_email_link(self, new_email: str) -> None:
|
|
payload = {
|
|
"u": self.user.username,
|
|
"t": self.prepare_token(),
|
|
"e": new_email,
|
|
}
|
|
signed_payload = TimestampSigner().sign_object(payload)
|
|
url = absolute_reverse("hc-change-email-verify", args=[signed_payload])
|
|
|
|
ctx = {
|
|
"button_text": "Log In",
|
|
"button_url": url,
|
|
}
|
|
emails.login(new_email, ctx)
|
|
|
|
def send_transfer_request(self, project: Project) -> None:
|
|
token = self.prepare_token()
|
|
settings_path = reverse("hc-project-settings", args=[project.code])
|
|
url = absolute_reverse("hc-check-token", args=[self.user.username, token])
|
|
url += f"?next={settings_path}"
|
|
|
|
ctx = {
|
|
"button_text": "Project Settings",
|
|
"button_url": url,
|
|
"project": project,
|
|
}
|
|
emails.transfer_request(self.user.email, ctx)
|
|
|
|
def send_sms_limit_notice(self, transport: str) -> None:
|
|
ctx = {"transport": transport, "limit": self.sms_limit}
|
|
if self.sms_limit != 500 and settings.USE_PAYMENTS:
|
|
ctx["url"] = absolute_reverse("hc-pricing")
|
|
|
|
emails.sms_limit(self.user.email, ctx)
|
|
|
|
def send_call_limit_notice(self) -> None:
|
|
ctx: dict[str, Any] = {"limit": self.call_limit}
|
|
if self.call_limit != 500 and settings.USE_PAYMENTS:
|
|
ctx["url"] = absolute_reverse("hc-pricing")
|
|
|
|
emails.call_limit(self.user.email, ctx)
|
|
|
|
def projects(self) -> QuerySet[Project]:
|
|
"""Return a queryset of all projects we have access to."""
|
|
|
|
is_owner = Q(owner_id=self.user_id)
|
|
is_member = Q(member__user_id=self.user_id)
|
|
q = Project.objects.filter(is_owner | is_member)
|
|
return q.distinct().order_by(Lower("name"))
|
|
|
|
def checks_from_all_projects(self) -> CheckQuerySet:
|
|
"""Return a queryset of checks from projects we have access to."""
|
|
|
|
from hc.api.models import Check
|
|
|
|
return Check.objects.filter(project__in=self.projects())
|
|
|
|
def send_report(self, nag: bool = False) -> bool:
|
|
q = self.checks_from_all_projects()
|
|
|
|
# Has there been a ping in last 6 months?
|
|
result = q.aggregate(models.Max("last_ping"))
|
|
last_ping = result["last_ping__max"]
|
|
|
|
six_months_ago = now() - td(days=180)
|
|
if last_ping is None or last_ping < six_months_ago:
|
|
return False
|
|
|
|
# Sort checks by project. Need this because will group by project in template.
|
|
q = q.select_related("project").order_by("project_id")
|
|
# list() executes the query, to avoid DB access while rendering the template.
|
|
checks = list(q)
|
|
|
|
unsub_url = self.reports_unsub_url()
|
|
headers = {
|
|
"X-Bounce-ID": sign_bounce_id("r.%s" % self.user.username),
|
|
"List-Unsubscribe": "<%s>" % unsub_url,
|
|
"List-Unsubscribe-Post": "List-Unsubscribe=One-Click",
|
|
}
|
|
ctx: dict[str, Any] = {
|
|
"sort": self.sort,
|
|
"unsub_link": unsub_url,
|
|
"notifications_url": self.notifications_url(),
|
|
"tz": self.tz,
|
|
}
|
|
|
|
if not nag:
|
|
# For weekly and monthly reports, calculate the downtimes,
|
|
# throw away the current period, keep two previous periods
|
|
if self.reports == "weekly":
|
|
boundaries = week_boundaries(3, self.tz)
|
|
else:
|
|
boundaries = month_boundaries(3, self.tz)
|
|
|
|
for check in checks:
|
|
downtimes = check.downtimes_by_boundary(boundaries, self.tz)
|
|
# downtimes_by_boundary returns records in descending order,
|
|
# but the template will need them in ascending order:
|
|
downtimes.reverse()
|
|
setattr(check, "past_downtimes", downtimes[:-1])
|
|
|
|
# boundaries are in descending order, but the template
|
|
# will need them in ascending order:
|
|
boundaries.reverse()
|
|
ctx["checks"] = checks
|
|
ctx["boundaries"] = boundaries[:-1]
|
|
ctx["monthly_or_weekly"] = self.reports
|
|
emails.report(self.user.email, ctx, headers)
|
|
|
|
if nag:
|
|
# For nags, only show checks that are currently down
|
|
checks = [c for c in checks if c.get_status() == "down"]
|
|
if not checks:
|
|
return False
|
|
ctx["checks"] = checks
|
|
ctx["num_down"] = len(checks)
|
|
ctx["nag_period"] = self.nag_period.total_seconds()
|
|
emails.nag(self.user.email, ctx, headers)
|
|
|
|
return True
|
|
|
|
def sms_sent_this_month(self) -> int:
|
|
# IF last_sms_date was never set, we have not sent any messages yet.
|
|
if not self.last_sms_date:
|
|
return 0
|
|
|
|
# If last sent date is not from this month, we've sent 0 this month.
|
|
if month(now()) > month(self.last_sms_date):
|
|
return 0
|
|
|
|
return self.sms_sent
|
|
|
|
def authorize_sms(self) -> bool:
|
|
"""If monthly limit not exceeded, increase counter and return True"""
|
|
|
|
sent_this_month = self.sms_sent_this_month()
|
|
if sent_this_month >= self.sms_limit:
|
|
return False
|
|
|
|
self.sms_sent = sent_this_month + 1
|
|
self.last_sms_date = now()
|
|
self.save()
|
|
return True
|
|
|
|
def calls_sent_this_month(self) -> int:
|
|
# IF last_call_date was never set, we have not made any phone calls yet.
|
|
if not self.last_call_date:
|
|
return 0
|
|
|
|
# If last sent date is not from this month, we've made 0 calls this month.
|
|
if month(now()) > month(self.last_call_date):
|
|
return 0
|
|
|
|
return self.calls_sent
|
|
|
|
def authorize_call(self) -> bool:
|
|
"""If monthly limit not exceeded, increase counter and return True"""
|
|
|
|
sent_this_month = self.calls_sent_this_month()
|
|
if sent_this_month >= self.call_limit:
|
|
return False
|
|
|
|
self.calls_sent = sent_this_month + 1
|
|
self.last_call_date = now()
|
|
self.save()
|
|
return True
|
|
|
|
def num_checks_used(self) -> int:
|
|
from hc.api.models import Check
|
|
|
|
return Check.objects.filter(project__owner_id=self.user_id).count()
|
|
|
|
def num_checks_available(self) -> int:
|
|
return self.check_limit - self.num_checks_used()
|
|
|
|
def can_accept(self, project: Project) -> bool:
|
|
return project.check_set.count() <= self.num_checks_available()
|
|
|
|
def update_next_nag_date(self) -> None:
|
|
any_down = self.checks_from_all_projects().filter(status="down").exists()
|
|
if any_down and self.next_nag_date is None and self.nag_period:
|
|
self.next_nag_date = now() + self.nag_period
|
|
self.save(update_fields=["next_nag_date"])
|
|
elif not any_down and self.next_nag_date:
|
|
self.next_nag_date = None
|
|
self.save(update_fields=["next_nag_date"])
|
|
|
|
def choose_next_report_date(self) -> datetime | None:
|
|
"""Calculate the target date for the next monthly/weekly report.
|
|
|
|
Monthly reports should get sent on 1st of each month, between
|
|
9AM and 11AM in user's timezone.
|
|
|
|
Weekly reports should get sent on Mondays, between
|
|
9AM and 11AM in user's timezone.
|
|
|
|
"""
|
|
|
|
if self.reports == "off":
|
|
return None
|
|
|
|
dt = now().astimezone(ZoneInfo(self.tz))
|
|
dt = dt.replace(hour=9, minute=0) + td(minutes=random.randrange(0, 120))
|
|
|
|
while True:
|
|
dt += td(days=1)
|
|
if self.reports == "monthly" and dt.day == 1:
|
|
return dt
|
|
elif self.reports == "weekly" and dt.weekday() == 0:
|
|
return dt
|
|
|
|
def is_past_over_limit_grace(self) -> bool:
|
|
"""Return True if this profile is over limits for 31 or more days."""
|
|
if not self.over_limit_date:
|
|
return False
|
|
|
|
return now() > self.over_limit_date + OVER_LIMIT_GRACE
|
|
|
|
def schedule_for_deletion(self) -> None:
|
|
self.deletion_scheduled_date = now() + DELETION_GRACE
|
|
self.save()
|
|
|
|
|
|
class Project(models.Model):
|
|
code = models.UUIDField(default=uuid.uuid4, unique=True)
|
|
name = models.CharField(max_length=200, blank=True)
|
|
owner = models.ForeignKey(User, models.CASCADE)
|
|
api_key = models.CharField(max_length=128, blank=True, db_index=True)
|
|
api_key_readonly = models.CharField(max_length=128, blank=True, db_index=True)
|
|
badge_key = models.CharField(max_length=150, unique=True)
|
|
ping_key = models.CharField(max_length=128, blank=True, null=True, unique=True)
|
|
show_slugs = models.BooleanField(default=False)
|
|
|
|
def __str__(self) -> str:
|
|
return self.name or self.owner.email
|
|
|
|
@property
|
|
def owner_profile(self) -> Profile:
|
|
return Profile.objects.for_user(self.owner)
|
|
|
|
def num_checks_available(self) -> int:
|
|
return self.owner_profile.num_checks_available()
|
|
|
|
def invite_suggestions(self) -> QuerySet[User]:
|
|
q = User.objects.filter(memberships__project__owner_id=self.owner_id)
|
|
q = q.exclude(memberships__project=self)
|
|
return q.distinct().order_by("email")
|
|
|
|
def can_invite_new_users(self) -> bool:
|
|
q = User.objects.filter(memberships__project__owner_id=self.owner_id)
|
|
used = q.distinct().count()
|
|
return used < self.owner_profile.team_limit
|
|
|
|
def invite(self, user: User, role: str) -> bool:
|
|
if Member.objects.filter(user=user, project=self).exists():
|
|
return False
|
|
|
|
if self.owner_id == user.id:
|
|
return False
|
|
|
|
m = Member.objects.create(user=user, project=self, role=role)
|
|
checks_url = reverse("hc-checks", args=[self.code])
|
|
|
|
if settings.EMAIL_HOST:
|
|
profile = Profile.objects.for_user(user)
|
|
profile.send_instant_login_link(membership=m, redirect_url=checks_url)
|
|
return True
|
|
|
|
def update_next_nag_dates(self) -> None:
|
|
"""Update next_nag_date on profiles of all members of this project."""
|
|
|
|
is_owner = Q(user_id=self.owner_id)
|
|
is_member = Q(user__memberships__project=self)
|
|
q = Profile.objects.filter(is_owner | is_member).exclude(nag_period=NO_NAG)
|
|
|
|
for profile in q:
|
|
profile.update_next_nag_date()
|
|
|
|
return None
|
|
|
|
def get_n_down(self) -> int:
|
|
result = 0
|
|
for check in self.check_set.all():
|
|
if check.get_status() == "down":
|
|
result += 1
|
|
|
|
return result
|
|
|
|
def have_channel_issues(self) -> bool:
|
|
errors = list(self.channel_set.values_list("last_error", flat=True))
|
|
|
|
# It's a problem if a project has no integrations at all
|
|
if len(errors) == 0:
|
|
return True
|
|
|
|
# It's a problem if any integration has a logged error
|
|
return True if max(errors) else False
|
|
|
|
def transfer_request(self) -> Member | None:
|
|
return self.member_set.filter(transfer_request_date__isnull=False).first()
|
|
|
|
def dashboard_url(self) -> str | None:
|
|
if not self.api_key_readonly:
|
|
return None
|
|
|
|
frag = urlencode({self.api_key_readonly: str(self)}, quote_via=quote)
|
|
return reverse("hc-dashboard") + "#" + frag
|
|
|
|
def checks_url(self) -> str:
|
|
return absolute_reverse("hc-checks", args=[self.code])
|
|
|
|
def auth_metrics_url(self) -> str:
|
|
return absolute_reverse("hc-auth-metrics", args=[self.code])
|
|
|
|
def get_absolute_url(self) -> str:
|
|
return reverse("hc-checks", args=[self.code])
|
|
|
|
|
|
class Member(models.Model):
|
|
class Role(models.TextChoices):
|
|
READONLY = "r", "Read-only"
|
|
REGULAR = "w", "Member"
|
|
MANAGER = "m", "Manager"
|
|
|
|
user = models.ForeignKey(User, models.CASCADE, related_name="memberships")
|
|
project = models.ForeignKey(Project, models.CASCADE)
|
|
transfer_request_date = models.DateTimeField(null=True, blank=True)
|
|
role = models.CharField(max_length=1, default=Role.REGULAR, choices=Role.choices)
|
|
|
|
class Meta:
|
|
constraints = [
|
|
models.UniqueConstraint(
|
|
fields=["user", "project"], name="accounts_member_no_duplicates"
|
|
)
|
|
]
|
|
|
|
def can_accept(self) -> bool:
|
|
return self.user.profile.can_accept(self.project)
|
|
|
|
@property
|
|
def is_rw(self) -> bool:
|
|
return self.role in (Member.Role.REGULAR, Member.Role.MANAGER)
|
|
|
|
|
|
class Credential(models.Model):
|
|
code = models.UUIDField(default=uuid.uuid4, unique=True)
|
|
name = models.CharField(max_length=100)
|
|
user = models.ForeignKey(User, models.CASCADE, related_name="credentials")
|
|
created = models.DateTimeField(auto_now_add=True)
|
|
data = models.BinaryField()
|