#!/usr/bin/env python3 """ DaSiWa I2V/FLF2V API Server для ComfyUI. Работает рядом с ComfyUI на той же машине. Асинхронный API (как RunPod): POST /run → {"id": "job_id", "status": "IN_QUEUE"} GET /status/ID → {"id": ..., "status": "IN_QUEUE|IN_PROGRESS|COMPLETED|FAILED", "output": ...} GET /health → {"status": "ok", "comfyui": "ok", "queue": 0} """ import os import sys import json import uuid import time import shutil import base64 import random import logging import binascii import subprocess import threading import queue import urllib.request import urllib.parse import websocket as ws_client from flask import Flask, request, jsonify from hmac_auth import verify_request # ============================================================================ # Конфигурация # ============================================================================ COMFY_HOST = os.getenv("COMFY_HOST", "127.0.0.1") COMFY_PORT = os.getenv("COMFY_PORT", "8188") API_PORT = int(os.getenv("API_PORT", "8080")) WORKFLOW_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "workflow_api.json") KEYS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "keys.json") COMFY_OUTPUT_DIR = os.getenv("COMFY_OUTPUT_DIR", "/ComfyUI/output") # ============================================================================ # Инициализация # ============================================================================ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger(__name__) app = Flask(__name__) # Загрузка ключей if not os.path.exists(KEYS_FILE): logger.error(f"❌ Файл ключей не найден: {KEYS_FILE}") logger.error(" Запусти: python generate_keys.py") sys.exit(1) with open(KEYS_FILE, "r") as f: keys = json.load(f) CLIENT_ID = keys["client_id"] SECRET_KEY = keys["secret_key"] logger.info(f"🔐 Ключи загружены. Client ID: {CLIENT_ID}") # Множество использованных nonce (защита от replay-атак) used_nonces = set() # WebSocket client ID ws_client_id = str(uuid.uuid4()) # ============================================================================ # Job Queue (асинхронная очередь как в RunPod) # ============================================================================ job_queue = queue.Queue() jobs = {} # job_id -> {status, input, output, error, created_at, started_at, completed_at} jobs_lock = threading.Lock() # ============================================================================ # Утилиты # ============================================================================ def to_nearest_multiple_of_16(value): """Округляет значение до ближайшего кратного 16.""" try: numeric_value = float(value) except Exception: raise ValueError(f"width/height value is not a number: {value}") adjusted = int(round(numeric_value / 16.0) * 16) return max(adjusted, 16) def save_base64_to_file(base64_data, temp_dir, filename): """Сохраняет base64 данные в файл.""" decoded = base64.b64decode(base64_data) os.makedirs(temp_dir, exist_ok=True) file_path = os.path.abspath(os.path.join(temp_dir, filename)) with open(file_path, "wb") as f: f.write(decoded) return file_path def download_file(url, temp_dir, filename): """Скачивает файл по URL.""" os.makedirs(temp_dir, exist_ok=True) file_path = os.path.abspath(os.path.join(temp_dir, filename)) result = subprocess.run( ["wget", "-O", file_path, "--no-verbose", url], capture_output=True, text=True, timeout=120 ) if result.returncode != 0: raise RuntimeError(f"Download failed: {result.stderr}") return file_path def process_image_input(job_input, prefix, temp_dir): """ Обрабатывает входные данные изображения. prefix: "image" или "last_image" Возвращает (file_path, True) или (None, False) """ path_key = f"{prefix}_path" url_key = f"{prefix}_url" b64_key = f"{prefix}_base64" if path_key in job_input and job_input[path_key]: return job_input[path_key], True elif url_key in job_input and job_input[url_key]: return download_file(job_input[url_key], temp_dir, f"{prefix}.png"), True elif b64_key in job_input and job_input[b64_key]: return save_base64_to_file(job_input[b64_key], temp_dir, f"{prefix}.png"), True return None, False def queue_prompt(prompt): """Отправляет prompt в ComfyUI.""" url = f"http://{COMFY_HOST}:{COMFY_PORT}/prompt" data = json.dumps({"prompt": prompt, "client_id": ws_client_id}).encode("utf-8") req = urllib.request.Request(url, data=data) return json.loads(urllib.request.urlopen(req).read()) def get_history(prompt_id): """Получает историю выполнения prompt.""" url = f"http://{COMFY_HOST}:{COMFY_PORT}/history/{prompt_id}" with urllib.request.urlopen(url) as response: return json.loads(response.read()) def generate_video(prompt): """Подключается к ComfyUI по WebSocket, запускает генерацию, ждёт результат.""" ws_url = f"ws://{COMFY_HOST}:{COMFY_PORT}/ws?clientId={ws_client_id}" ws = ws_client.WebSocket() ws.connect(ws_url) logger.info("🔌 WebSocket подключён к ComfyUI") prompt_id = queue_prompt(prompt)["prompt_id"] logger.info(f"📤 Prompt отправлен: {prompt_id}") # Ждём завершения while True: out = ws.recv() if isinstance(out, str): message = json.loads(out) if message["type"] == "executing": data = message["data"] if data["node"] is None and data["prompt_id"] == prompt_id: break ws.close() logger.info("✅ Генерация завершена") # Извлекаем видео history = get_history(prompt_id)[prompt_id] for node_id in history["outputs"]: node_output = history["outputs"][node_id] if "gifs" in node_output: for video in node_output["gifs"]: video_path = video["fullpath"] with open(video_path, "rb") as f: video_b64 = base64.b64encode(f.read()).decode("utf-8") # Очистка try: os.remove(video_path) except OSError: pass return video_b64 return None # ============================================================================ # Background Worker (обработка задач из очереди) # ============================================================================ def build_prompt(job_input, image_path, last_image_path, use_flf2v): """Загружает workflow и патчит параметрами задачи.""" with open(WORKFLOW_FILE, "r") as f: prompt = json.load(f) width = to_nearest_multiple_of_16(job_input.get("width", 528)) height = to_nearest_multiple_of_16(job_input.get("height", 768)) length = job_input.get("length", 81) steps = job_input.get("steps", 4) cfg = job_input.get("cfg", 1.0) seed = job_input.get("seed", -1) fps = job_input.get("fps", 16) sampler_name = job_input.get("sampler_name", "euler") scheduler = job_input.get("scheduler", "linear_quadratic") if seed == -1: seed = random.randint(0, 2**63 - 1) # Node 5: Positive prompt prompt["5"]["inputs"]["text"] = job_input.get("prompt", "") # Node 6: Negative prompt (use default or custom) negative_prompt = job_input.get("negative_prompt", prompt["6"]["inputs"]["text"]) prompt["6"]["inputs"]["text"] = negative_prompt # Node 7: Load first frame image prompt["7"]["inputs"]["image"] = image_path # Node 15: Load last frame image (for FLF2V mode) if use_flf2v and last_image_path: prompt["15"]["inputs"]["image"] = last_image_path logger.info(f"🎬 FLF2V: last frame = {last_image_path}") else: # I2V mode: switch to WanImageToVideo, remove end_image prompt["8"]["class_type"] = "WanImageToVideo" if "end_image" in prompt["8"]["inputs"]: del prompt["8"]["inputs"]["end_image"] if "15" in prompt: del prompt["15"] logger.info("🎬 I2V: single image mode") # Node 8: WanFirstLastFrameToVideo / WanImageToVideo prompt["8"]["inputs"]["width"] = width prompt["8"]["inputs"]["height"] = height prompt["8"]["inputs"]["length"] = length # Node 11: KSampler High prompt["11"]["inputs"]["noise_seed"] = seed prompt["11"]["inputs"]["steps"] = steps prompt["11"]["inputs"]["cfg"] = cfg prompt["11"]["inputs"]["sampler_name"] = sampler_name prompt["11"]["inputs"]["scheduler"] = scheduler prompt["11"]["inputs"]["end_at_step"] = steps // 2 # Node 12: KSampler Low prompt["12"]["inputs"]["noise_seed"] = seed prompt["12"]["inputs"]["steps"] = steps prompt["12"]["inputs"]["cfg"] = cfg prompt["12"]["inputs"]["sampler_name"] = sampler_name prompt["12"]["inputs"]["scheduler"] = scheduler prompt["12"]["inputs"]["start_at_step"] = steps // 2 # Node 14: Video output prompt["14"]["inputs"]["frame_rate"] = fps return prompt, seed, width, height def cleanup_comfy_output(): """Очистка output директории ComfyUI.""" try: if os.path.exists(COMFY_OUTPUT_DIR): for fname in os.listdir(COMFY_OUTPUT_DIR): fpath = os.path.join(COMFY_OUTPUT_DIR, fname) if os.path.isfile(fpath): os.unlink(fpath) elif os.path.isdir(fpath): shutil.rmtree(fpath) except Exception: pass def worker_loop(): """Фоновый воркер — берёт задачи из очереди и выполняет по одной.""" logger.info("⚙️ Worker thread started") while True: job_id = job_queue.get() # блокируется пока нет задач with jobs_lock: job = jobs.get(job_id) if not job: continue logger.info("=" * 60) logger.info(f"🎬 Job {job_id}: Начинаем генерацию") logger.info("=" * 60) with jobs_lock: job["status"] = "IN_PROGRESS" job["started_at"] = time.time() job_input = job["input"] temp_dir = job["temp_dir"] try: # Обработка изображений image_path, has_image = process_image_input(job_input, "image", temp_dir) if not has_image: raise ValueError("No input image provided") last_image_path, use_flf2v = process_image_input(job_input, "last_image", temp_dir) mode = "FLF2V" if use_flf2v else "I2V" logger.info(f"🎬 Job {job_id}: Режим {mode}") # Сборка промпта prompt, seed, width, height = build_prompt(job_input, image_path, last_image_path, use_flf2v) logger.info(f"📐 Job {job_id}: {width}x{height}, seed {seed}") # Генерация video_b64 = generate_video(prompt) if not video_b64: raise RuntimeError("Video generation failed — no output from ComfyUI") elapsed = time.time() - job["started_at"] logger.info(f"✅ Job {job_id}: Видео готово за {elapsed:.1f}s") with jobs_lock: job["status"] = "COMPLETED" job["completed_at"] = time.time() job["output"] = { "video": video_b64, "seed": seed, "mode": mode, "elapsed": round(elapsed, 1) } except Exception as e: logger.error(f"❌ Job {job_id}: {e}", exc_info=True) with jobs_lock: job["status"] = "FAILED" job["completed_at"] = time.time() job["error"] = str(e) finally: # Очистка if os.path.exists(temp_dir): shutil.rmtree(temp_dir, ignore_errors=True) cleanup_comfy_output() job_queue.task_done() # ============================================================================ # API Endpoints # ============================================================================ @app.before_request def check_hmac_auth(): """Проверяет HMAC подпись для всех запросов кроме health check.""" if request.path == "/health": return None body = request.get_data() headers = { "X-Client-Id": request.headers.get("X-Client-Id", ""), "X-Timestamp": request.headers.get("X-Timestamp", ""), "X-Nonce": request.headers.get("X-Nonce", ""), "X-Signature": request.headers.get("X-Signature", ""), } is_valid, error = verify_request(body, headers, SECRET_KEY, CLIENT_ID, used_nonces) if not is_valid: logger.warning(f"🚫 Auth failed: {error} from {request.remote_addr}") return jsonify({"error": "Unauthorized", "detail": error}), 401 @app.route("/health", methods=["GET"]) def health(): """Health check — без авторизации.""" try: url = f"http://{COMFY_HOST}:{COMFY_PORT}/" urllib.request.urlopen(url, timeout=5) comfy_status = "ok" except Exception: comfy_status = "unavailable" return jsonify({ "status": "ok", "comfyui": comfy_status, "queue": job_queue.qsize(), "timestamp": int(time.time()) }) @app.route("/run", methods=["POST"]) def run_job(): """Отправляет задачу в очередь. Возвращает job_id сразу.""" job_input = request.json or {} # Валидация: должно быть хотя бы одно изображение has_image = any(k in job_input and job_input[k] for k in ("image_base64", "image_url", "image_path")) if not has_image: return jsonify({"error": "No input image. Use image_base64, image_url, or image_path"}), 400 job_id = str(uuid.uuid4()) temp_dir = os.path.join("/tmp", f"job_{job_id[:8]}") # Логирование (без base64) log_input = {k: (f"[{len(v)}chars]" if k.endswith("_base64") else v) for k, v in job_input.items()} logger.info(f"📥 Job {job_id}: поставлен в очередь") logger.info(f" Параметры: {json.dumps(log_input, ensure_ascii=False)}") with jobs_lock: jobs[job_id] = { "status": "IN_QUEUE", "input": job_input, "temp_dir": temp_dir, "output": None, "error": None, "created_at": time.time(), "started_at": None, "completed_at": None, } job_queue.put(job_id) return jsonify({ "id": job_id, "status": "IN_QUEUE" }) @app.route("/status/", methods=["GET"]) def job_status(job_id): """Получить статус задачи. Когда COMPLETED — возвращает результат.""" with jobs_lock: job = jobs.get(job_id) if not job: return jsonify({"error": "Job not found"}), 404 response = { "id": job_id, "status": job["status"], } if job["status"] == "COMPLETED": response["output"] = job["output"] elif job["status"] == "FAILED": response["error"] = job["error"] return jsonify(response) @app.route("/purge/", methods=["POST"]) def purge_job(job_id): """Удалить завершённую задачу из памяти (освободить RAM от base64 видео).""" with jobs_lock: job = jobs.get(job_id) if not job: return jsonify({"error": "Job not found"}), 404 if job["status"] in ("IN_QUEUE", "IN_PROGRESS"): return jsonify({"error": "Cannot purge active job"}), 400 del jobs[job_id] return jsonify({"id": job_id, "purged": True}) # ============================================================================ # Запуск # ============================================================================ if __name__ == "__main__": logger.info("=" * 60) logger.info("🚀 DaSiWa API Server (async worker mode)") logger.info(f" ComfyUI: http://{COMFY_HOST}:{COMFY_PORT}") logger.info(f" API Port: {API_PORT}") logger.info(f" Workflow: {WORKFLOW_FILE}") logger.info(" Endpoints:") logger.info(" POST /run → поставить задачу") logger.info(" GET /status/ → статус / результат") logger.info(" POST /purge/ → удалить задачу из памяти") logger.info(" GET /health → здоровье") logger.info("=" * 60) # Проверяем подключение к ComfyUI try: urllib.request.urlopen(f"http://{COMFY_HOST}:{COMFY_PORT}/", timeout=5) logger.info("✅ ComfyUI доступен") except Exception: logger.warning("⚠️ ComfyUI недоступен — запросы будут ждать" " пока ComfyUI запустится") # Запуск фонового воркера worker_thread = threading.Thread(target=worker_loop, daemon=True) worker_thread.start() app.run(host="0.0.0.0", port=API_PORT, debug=False)