You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
109 lines
3.7 KiB
109 lines
3.7 KiB
""" |
|
HMAC авторизация для API запросов. |
|
Используется и на сервере, и на клиенте. |
|
|
|
Принцип: |
|
1. Клиент подписывает тело запроса секретным ключом + timestamp + nonce |
|
2. Сервер проверяет подпись тем же ключом |
|
3. Перехват бесполезен — подпись уникальна для каждого запроса |
|
4. Replay-атака невозможна — timestamp + nonce проверяются |
|
""" |
|
|
|
import hashlib |
|
import hmac |
|
import json |
|
import time |
|
import secrets |
|
|
|
|
|
# Максимальное расхождение времени (секунды) между клиентом и сервером |
|
MAX_TIMESTAMP_DRIFT = 300 # 5 минут |
|
|
|
|
|
def sign_request(body: bytes, secret_key: str, client_id: str) -> dict: |
|
""" |
|
Подписывает запрос. Возвращает заголовки для отправки. |
|
|
|
Args: |
|
body: тело запроса (bytes) |
|
secret_key: секретный ключ (общий для клиента и сервера) |
|
client_id: публичный ID клиента |
|
|
|
Returns: |
|
dict с заголовками: X-Client-Id, X-Timestamp, X-Nonce, X-Signature |
|
""" |
|
timestamp = str(int(time.time())) |
|
nonce = secrets.token_hex(16) |
|
|
|
# Строка для подписи: timestamp + nonce + body |
|
message = f"{timestamp}.{nonce}.".encode() + body |
|
signature = hmac.new( |
|
secret_key.encode(), |
|
message, |
|
hashlib.sha256 |
|
).hexdigest() |
|
|
|
return { |
|
"X-Client-Id": client_id, |
|
"X-Timestamp": timestamp, |
|
"X-Nonce": nonce, |
|
"X-Signature": signature, |
|
} |
|
|
|
|
|
def verify_request(body: bytes, headers: dict, secret_key: str, client_id: str, |
|
used_nonces: set = None) -> tuple[bool, str]: |
|
""" |
|
Проверяет подпись запроса. |
|
|
|
Args: |
|
body: тело запроса (bytes) |
|
headers: заголовки запроса |
|
secret_key: секретный ключ |
|
client_id: ожидаемый client_id |
|
used_nonces: множество уже использованных nonce (для защиты от replay) |
|
|
|
Returns: |
|
(is_valid, error_message) |
|
""" |
|
req_client_id = headers.get("X-Client-Id", "") |
|
timestamp = headers.get("X-Timestamp", "") |
|
nonce = headers.get("X-Nonce", "") |
|
signature = headers.get("X-Signature", "") |
|
|
|
# Проверка client_id |
|
if not hmac.compare_digest(req_client_id, client_id): |
|
return False, "Invalid client ID" |
|
|
|
# Проверка timestamp |
|
try: |
|
req_time = int(timestamp) |
|
except (ValueError, TypeError): |
|
return False, "Invalid timestamp" |
|
|
|
now = int(time.time()) |
|
if abs(now - req_time) > MAX_TIMESTAMP_DRIFT: |
|
return False, f"Timestamp expired (drift: {abs(now - req_time)}s)" |
|
|
|
# Проверка nonce (защита от replay-атак) |
|
if used_nonces is not None: |
|
if nonce in used_nonces: |
|
return False, "Nonce already used (replay attack?)" |
|
used_nonces.add(nonce) |
|
# Чистим старые nonce (старше MAX_TIMESTAMP_DRIFT) |
|
# В реальном проде это делается через Redis TTL, тут — просто ограничиваем размер |
|
if len(used_nonces) > 10000: |
|
used_nonces.clear() |
|
|
|
# Проверка подписи |
|
message = f"{timestamp}.{nonce}.".encode() + body |
|
expected = hmac.new( |
|
secret_key.encode(), |
|
message, |
|
hashlib.sha256 |
|
).hexdigest() |
|
|
|
if not hmac.compare_digest(signature, expected): |
|
return False, "Invalid signature" |
|
|
|
return True, "OK"
|
|
|