healthchecks_healthchecks/hc/lib/s3.py
Pēteris Caune b5d4f2aa74
Implement S3 outage mitigation
The mitigation is to not attempt GetObject calls if there have
been more than 3 S3 errors in the past minute. The implementation
uses the TokenBucket class that we normally use for rate-limiting.

An example scenario this is trying to avoid is:

* the S3 service becomes unavailable for 10 straight minutes.
  Each S3 request hangs until we hit the configured timeout
  (settings.S3_TIMEOUT)
* A client is frequently requesting the "Get ping's logged body"
  API call. Each call causes one webserver process to become
  busy for S3_TIMEOUT seconds.
* All workers become busy, request backlog fills up, our service
  starts returning 5xx errors.

With the mitigation, during an S3 outage, only the calls that
retrieve ping's logged body will return 503, the rest of the service
will (hopefully) work normally.

Fixes: #1114
2025-01-13 14:21:42 +02:00

152 lines
4.5 KiB
Python

from __future__ import annotations
import logging
from io import BytesIO
from threading import Thread
from uuid import UUID
from django.conf import settings
from hc.lib.statsd import statsd
try:
from minio import InvalidResponseError, Minio, S3Error
from minio.deleteobjects import DeleteObject
from urllib3 import PoolManager
from urllib3.exceptions import HTTPError, ReadTimeoutError
from urllib3.util import Retry
except ImportError:
# Enforce
settings.S3_BUCKET = None
_client = None
logger = logging.getLogger(__name__)
class GetObjectError(Exception):
pass
def client() -> Minio:
if not settings.S3_BUCKET:
raise Exception("Object storage is not configured")
global _client
if _client is None:
assert settings.S3_ENDPOINT
_client = Minio(
settings.S3_ENDPOINT,
settings.S3_ACCESS_KEY,
settings.S3_SECRET_KEY,
region=settings.S3_REGION,
secure=settings.S3_SECURE,
http_client=PoolManager(
timeout=settings.S3_TIMEOUT, retries=Retry(total=1)
),
)
return _client
ASCII_J = ord("j")
ASCII_Z = ord("z")
def enc(n: int) -> str:
"""Generate an object key in the "<sorting prefix>-<n>" form.
>>> [enc(i) for i in range(0, 5)]
['zj-0', 'zi-1', 'zh-2', 'zg-3', 'zf-4']
The purpose of the sorting prefix is to sort keys with smaller n values
last:
>>> sorted([enc(i) for i in range(0, 5)])
['zf-4', 'zg-3', 'zh-2', 'zi-1', 'zj-0']
This allows efficient lookup of objects with n
values below a specific threshold. For example, the following
retrieves all keys at bucket's root directory with n < 123:
>>> client.list_objects(bucket_name, start_after=enc(123))
"""
s = str(n)
len_inverted = chr(ASCII_Z - len(s) + 1)
inverted = "".join(chr(ASCII_J - int(c)) for c in s)
return len_inverted + inverted + "-" + s
def get_object(code: str, n: int) -> bytes | None:
if not settings.S3_BUCKET:
return None
statsd.incr("hc.lib.s3.getObject")
with statsd.timer("hc.lib.s3.getObjectTime"):
key = f"{code}/{enc(n)}"
response = None
try:
response = client().get_object(settings.S3_BUCKET, key)
return response.read()
except (S3Error, InvalidResponseError, HTTPError) as e:
if isinstance(e, S3Error) and e.code == "NoSuchKey":
# It's not an error condition if an object does not exist.
# Return None, don't log, don't raise.
return None
logger.exception(f"{e.__class__.__name__} in hc.lib.s3.get_object")
raise GetObjectError() from e
finally:
if response:
response.close()
response.release_conn()
def put_object(code: UUID, n: int, data: bytes) -> None:
assert settings.S3_BUCKET
key = "%s/%s" % (code, enc(n))
retries = 10
while True:
try:
client().put_object(settings.S3_BUCKET, key, BytesIO(data), len(data))
break
except S3Error as e:
if e.code == "InternalError" and retries > 0:
retries -= 1
print("InternalError, retrying (retries=%d)..." % retries)
continue
raise e
def _remove_objects(code: UUID, upto_n: int) -> None:
assert settings.S3_BUCKET
if upto_n <= 0:
return
prefix = "%s/" % code
start_after = prefix + enc(upto_n + 1)
q = client().list_objects(settings.S3_BUCKET, prefix, start_after=start_after)
delete_objs = [DeleteObject(obj.object_name) for obj in q]
if delete_objs:
num_objs = len(delete_objs)
try:
with statsd.timer("hc.lib.s3.removeObjectsTime"):
errors = client().remove_objects(settings.S3_BUCKET, delete_objs)
for e in errors:
statsd.incr("hc.lib.s3.removeObjectsErrors")
logger.error("remove_objects error: [%s] %s", e.code, e.message)
except ReadTimeoutError:
logger.exception("ReadTimeoutError while removing %d objects", num_objs)
statsd.incr("hc.lib.s3.removeObjectsErrors")
def remove_objects(check_code: str, upto_n: int, wait: bool = False) -> None:
"""Remove keys with n values below or equal to `upto_n`.
The S3 API calls can take seconds to complete,
therefore run the removal code on thread.
"""
t = Thread(target=_remove_objects, args=(check_code, upto_n))
t.start()
if wait:
t.join()