diff --git a/transfer/ssh.py b/transfer/ssh.py index 53733fa..30c3115 100644 --- a/transfer/ssh.py +++ b/transfer/ssh.py @@ -1,38 +1,16 @@ # -*- coding: utf-8 -*- """ ssh.py — SSH-подготовка для transfer на target-сервер. -Логика: найти ключ → проверить подключение к target. -Если ключ зашифрован (passphrase) — пропускаем (пользователь должен был разблокировать через ssh-agent). -Если нет ключа или не подходит — меню: пароль target / новый ключ. +НЕ ищем ключи на source — спрашиваем пользователя как подключиться к target. """ import os -import glob from core.color import info, warn, success, error as cerror, prompt, confirm from core.runner import run, exists -_SSH_DIR = os.path.expanduser("~/.ssh") - - -def list_private_keys(): - """Ищет приватные SSH-ключи в ~/.ssh/ (id_* без .pub).""" - if not os.path.isdir(_SSH_DIR): - return [] - keys = [] - for pattern in ("id_rsa", "id_ed25519", "id_ecdsa", "id_dsa"): - for f in glob.glob(os.path.join(_SSH_DIR, pattern)): - if not f.endswith(".pub") and os.path.isfile(f): - pub = f + ".pub" - keys.append({ - "private": f, - "public": pub if os.path.isfile(pub) else None, - "type": os.path.basename(f).replace("id_", ""), - }) - return keys - def check_ssh_keyless(host, user, port, key_path, timeout=10): - """Проверяет подключение к target с данным ключом. БЕЗ passphrase-промптов.""" + """Проверяет подключение к target с данным ключом.""" if not exists("ssh"): return (False, "ssh не найден") identity = f"-i '{key_path}'" if key_path and os.path.isfile(key_path) else "" @@ -40,13 +18,10 @@ def check_ssh_keyless(host, user, port, key_path, timeout=10): r = run(cmd, check=False, capture=True, timeout=30) if r.returncode == 0 and "migrate-ok" in r.stdout: return (True, "OK") - # Разбираем ошибку err = (r.stderr or "").strip() if "passphrase" in err.lower(): return (False, "PASSPHRASE") - if "permission denied" in err.lower() or "too many auth" in err.lower(): - return (False, "DENIED") - return (False, err[:100]) + return (False, err[:120]) def generate_temp_keypair(host_label): @@ -65,7 +40,7 @@ def generate_temp_keypair(host_label): def ssh_copy_id(host, user, port, pubkey_path): - """ssh-copy-id на target. Интерактивно (пользователь вводит пароль target).""" + """ssh-copy-id на target. Интерактивно (пароль target).""" if not exists("ssh-copy-id"): return False warn("⚠ Сейчас запустится ssh-copy-id — введите ПАРОЛЬ от target-сервера ↓") @@ -73,27 +48,7 @@ def ssh_copy_id(host, user, port, pubkey_path): f"ssh-copy-id -p {port} -o ConnectTimeout=30 -o StrictHostKeyChecking=accept-new -i '{pubkey_path}' {user}@{host}", check=False, capture=False, timeout=120 ) - if r.returncode == 0: - success("Ключ добавлен на target") - return True - warn(f"ssh-copy-id не удался (код: {r.returncode})") - return False - - -def manual_copy_instructions(host, user, port, pubkey_path): - """Инструкции по ручному добавлению ключа на target.""" - print() - cerror("SSH-подключение к target не удалось.") - info("Варианты:") - try: - with open(pubkey_path, "r") as f: - pk = f.read().strip() - except Exception: - pk = "(не удалось прочитать)" - print(f" 1) Вручную на target: mkdir -p ~/.ssh; echo '{pk}' >> ~/.ssh/authorized_keys") - print(f" 2) Или: ssh-copy-id -p {port} -i {pubkey_path} {user}@{host}") - print(f" 3) Потом: docker-migrate --resume") - print() + return r.returncode == 0 def ensure_sshpass(): @@ -113,69 +68,54 @@ def ensure_sshpass(): def pick_or_setup_ssh_key(host, user, port): """ - Возвращает путь к рабочему приватному ключу для target. + Возвращает путь к приватному ключу для target. Если None — значит нужен пароль target (sshpass). """ - keys = list_private_keys() - if keys: - info(f"Найдено SSH-ключей на этом сервере: {len(keys)}") - for k in keys: - pub_ok = "✓ .pub" if k["public"] else "✗ нет .pub" - print(f" - {os.path.basename(k['private'])} ({pub_ok})") - - # Проверяем каждый ключ - usable = None - encrypted = 0 - denied = 0 - for k in keys: - label = os.path.basename(k["private"]) - ok, why = check_ssh_keyless(host, user, port, k["private"]) - if ok: - success(f" ✓ Ключ '{label}' подходит для {host}") - usable = k["private"] - break - if why == "PASSPHRASE": - warn(f" ! Ключ '{label}' зашифрован passphrase (разблокируйте через ssh-add, если нужен)") - encrypted += 1 - elif why == "DENIED": - info(f" ✗ Ключ '{label}' не принят target (Permission denied)") - denied += 1 - else: - info(f" ✗ Ключ '{label}': {why[:60]}") - - if usable: - return usable - - # Выводим диагностику - print() - if encrypted and not denied: - info(f"Все ключи зашифрованы passphrase. Если хотите использовать их — разблокируйте:") - info(f" eval $(ssh-agent) && ssh-add") - info(f" Затем перезапустите: docker-migrate --resume") - elif denied: - info(f"Ключи есть, но target их не принимает (не добавлены в ~/.ssh/authorized_keys).") - - # Меню выбора info("") info("Как подключиться к target-серверу?") - print(" 1) Пароль — ввести пароль от target (sshpass)") - print(" 2) Новый ключ — сгенерировать ed25519 + ssh-copy-id на target") + print(" 1) У меня есть приватный ключ от target (файл .pem/.key/.ppk)") + print(" 2) У меня есть логин и пароль от target") print(" 0) Отмена") choice = prompt("Ваш выбор", default="1").strip() - if choice == "2": - temp = generate_temp_keypair(host.replace(".", "_")) - if not temp: - raise RuntimeError("Не удалось сгенерировать ключ") - if ssh_copy_id(host, user, port, temp["public"]): - ok, _ = check_ssh_keyless(host, user, port, temp["private"]) - if ok: - success("Временный ключ работает!") - return temp["private"] - manual_copy_instructions(host, user, port, temp["public"]) - raise RuntimeError("ssh-copy-id не удался. Настройте вручную и запустите --resume") - if choice == "1": - return None # caller запросит пароль + # Пользователь даёт путь к ключу target + key_path = prompt("Путь к приватному ключу (например, /root/server.pem)").strip() + if not key_path or not os.path.isfile(key_path): + raise RuntimeError(f"Файл не найден: {key_path}") + ok, why = check_ssh_keyless(host, user, port, key_path) + if ok: + success("Ключ подходит для target!") + return key_path + if why == "PASSPHRASE": + warn(f"Ключ защищён passphrase. Разблокируйте: ssh-add {key_path}") + raise RuntimeError("Ключ зашифрован. Разблокируйте через ssh-add и перезапустите.") + raise RuntimeError(f"Ключ не подходит: {why}") + + if choice == "2": + # Пароль target + info("") + info("Выберите способ:") + print(" 1) Ввести пароль от target — scp через sshpass") + print(" 2) Сгенерировать временный ключ + ssh-copy-id (target спросит пароль)") + sub = prompt("Ваш выбор", default="1").strip() + + if sub == "2": + temp = generate_temp_keypair(host.replace(".", "_")) + if not temp: + raise RuntimeError("Не удалось сгенерировать ключ") + if ssh_copy_id(host, user, port, temp["public"]): + ok, _ = check_ssh_keyless(host, user, port, temp["private"]) + if ok: + success("Временный ключ добавлен на target!") + return temp["private"] + # ssh-copy-id не сработал + warn("ssh-copy-id не удался. Варианты:") + print(f" 1) Вручную на target: mkdir -p ~/.ssh; echo '$(cat {temp['public']})' >> ~/.ssh/authorized_keys") + print(f" 2) Затем: docker-migrate --resume") + raise RuntimeError("ssh-copy-id не удался.") + + # sub == "1" или любое другое — возвращаем None, caller запросит пароль + return None raise RuntimeError("Перенос отменён пользователем.")