""" HMAC авторизация для API запросов. Используется и на сервере, и на клиенте. Принцип: 1. Клиент подписывает тело запроса секретным ключом + timestamp + nonce 2. Сервер проверяет подпись тем же ключом 3. Перехват бесполезен — подпись уникальна для каждого запроса 4. Replay-атака невозможна — timestamp + nonce проверяются """ import hashlib import hmac import json import time import secrets # Максимальное расхождение времени (секунды) между клиентом и сервером MAX_TIMESTAMP_DRIFT = 300 # 5 минут def sign_request(body: bytes, secret_key: str, client_id: str) -> dict: """ Подписывает запрос. Возвращает заголовки для отправки. Args: body: тело запроса (bytes) secret_key: секретный ключ (общий для клиента и сервера) client_id: публичный ID клиента Returns: dict с заголовками: X-Client-Id, X-Timestamp, X-Nonce, X-Signature """ timestamp = str(int(time.time())) nonce = secrets.token_hex(16) # Строка для подписи: timestamp + nonce + body message = f"{timestamp}.{nonce}.".encode() + body signature = hmac.new( secret_key.encode(), message, hashlib.sha256 ).hexdigest() return { "X-Client-Id": client_id, "X-Timestamp": timestamp, "X-Nonce": nonce, "X-Signature": signature, } def verify_request(body: bytes, headers: dict, secret_key: str, client_id: str, used_nonces: set = None) -> tuple[bool, str]: """ Проверяет подпись запроса. Args: body: тело запроса (bytes) headers: заголовки запроса secret_key: секретный ключ client_id: ожидаемый client_id used_nonces: множество уже использованных nonce (для защиты от replay) Returns: (is_valid, error_message) """ req_client_id = headers.get("X-Client-Id", "") timestamp = headers.get("X-Timestamp", "") nonce = headers.get("X-Nonce", "") signature = headers.get("X-Signature", "") # Проверка client_id if not hmac.compare_digest(req_client_id, client_id): return False, "Invalid client ID" # Проверка timestamp try: req_time = int(timestamp) except (ValueError, TypeError): return False, "Invalid timestamp" now = int(time.time()) if abs(now - req_time) > MAX_TIMESTAMP_DRIFT: return False, f"Timestamp expired (drift: {abs(now - req_time)}s)" # Проверка nonce (защита от replay-атак) if used_nonces is not None: if nonce in used_nonces: return False, "Nonce already used (replay attack?)" used_nonces.add(nonce) # Чистим старые nonce (старше MAX_TIMESTAMP_DRIFT) # В реальном проде это делается через Redis TTL, тут — просто ограничиваем размер if len(used_nonces) > 10000: used_nonces.clear() # Проверка подписи message = f"{timestamp}.{nonce}.".encode() + body expected = hmac.new( secret_key.encode(), message, hashlib.sha256 ).hexdigest() if not hmac.compare_digest(signature, expected): return False, "Invalid signature" return True, "OK"