You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

109 lines
3.7 KiB

"""
HMAC авторизация для API запросов.
Используется и на сервере, и на клиенте.
Принцип:
1. Клиент подписывает тело запроса секретным ключом + timestamp + nonce
2. Сервер проверяет подпись тем же ключом
3. Перехват бесполезен — подпись уникальна для каждого запроса
4. Replay-атака невозможна — timestamp + nonce проверяются
"""
import hashlib
import hmac
import json
import time
import secrets
# Максимальное расхождение времени (секунды) между клиентом и сервером
MAX_TIMESTAMP_DRIFT = 300 # 5 минут
def sign_request(body: bytes, secret_key: str, client_id: str) -> dict:
"""
Подписывает запрос. Возвращает заголовки для отправки.
Args:
body: тело запроса (bytes)
secret_key: секретный ключ (общий для клиента и сервера)
client_id: публичный ID клиента
Returns:
dict с заголовками: X-Client-Id, X-Timestamp, X-Nonce, X-Signature
"""
timestamp = str(int(time.time()))
nonce = secrets.token_hex(16)
# Строка для подписи: timestamp + nonce + body
message = f"{timestamp}.{nonce}.".encode() + body
signature = hmac.new(
secret_key.encode(),
message,
hashlib.sha256
).hexdigest()
return {
"X-Client-Id": client_id,
"X-Timestamp": timestamp,
"X-Nonce": nonce,
"X-Signature": signature,
}
def verify_request(body: bytes, headers: dict, secret_key: str, client_id: str,
used_nonces: set = None) -> tuple[bool, str]:
"""
Проверяет подпись запроса.
Args:
body: тело запроса (bytes)
headers: заголовки запроса
secret_key: секретный ключ
client_id: ожидаемый client_id
used_nonces: множество уже использованных nonce (для защиты от replay)
Returns:
(is_valid, error_message)
"""
req_client_id = headers.get("X-Client-Id", "")
timestamp = headers.get("X-Timestamp", "")
nonce = headers.get("X-Nonce", "")
signature = headers.get("X-Signature", "")
# Проверка client_id
if not hmac.compare_digest(req_client_id, client_id):
return False, "Invalid client ID"
# Проверка timestamp
try:
req_time = int(timestamp)
except (ValueError, TypeError):
return False, "Invalid timestamp"
now = int(time.time())
if abs(now - req_time) > MAX_TIMESTAMP_DRIFT:
return False, f"Timestamp expired (drift: {abs(now - req_time)}s)"
# Проверка nonce (защита от replay-атак)
if used_nonces is not None:
if nonce in used_nonces:
return False, "Nonce already used (replay attack?)"
used_nonces.add(nonce)
# Чистим старые nonce (старше MAX_TIMESTAMP_DRIFT)
# В реальном проде это делается через Redis TTL, тут — просто ограничиваем размер
if len(used_nonces) > 10000:
used_nonces.clear()
# Проверка подписи
message = f"{timestamp}.{nonce}.".encode() + body
expected = hmac.new(
secret_key.encode(),
message,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected):
return False, "Invalid signature"
return True, "OK"