diff --git a/source/source.py b/source/source.py index 64e72f5..438fb94 100644 --- a/source/source.py +++ b/source/source.py @@ -379,16 +379,15 @@ def do_transfer_offer(): if key_path: state.set_stage("TRANSFER", target_host=host, target_user=user, target_port=port_int, ssh_key=key_path) else: - # Нет SSH-ключа — спрашиваем пароль и пробуем sshpass - password = prompt_password(f"Пароль для {user}@{host} (Enter для отмены)") + # Нет SSH-ключа — запрашиваем пароль от TARGET сервера + password = prompt_password(f"Пароль от target-сервера {user}@{host} (Enter для отмены)") if not password: - state.set_error("ssh_key_setup", "", "Пароль не введён", suggestion="Запустите docker-migrate --resume или настройте SSH-ключ") - raise RuntimeError("Пароль не введён. Установите SSH-ключ или введите пароль.") - # Проверяем/устанавливаем sshpass + state.set_error("ssh_key_setup", "", "Пароль не введён", suggestion="Запустите docker-migrate --resume или настройте SSH-ключ на target") + raise RuntimeError("Пароль от target не введён.") from transfer.ssh import ensure_sshpass if not ensure_sshpass(): - state.set_error("ssh_key_setup", "", "sshpass не найден и не удалось установить", suggestion="Установите sshpass: apt-get install -y sshpass") - raise RuntimeError("sshpass не найден. Установите: apt-get install -y sshpass") + state.set_error("ssh_key_setup", "", "sshpass не найден", suggestion="apt-get install -y sshpass") + raise RuntimeError("sshpass не установлен.") state.set_stage("TRANSFER", target_host=host, target_user=user, target_port=port_int, ssh_key=None, ssh_password=password) from transfer.transfer import do_transfer diff --git a/transfer/ssh.py b/transfer/ssh.py index 51d40e9..53733fa 100644 --- a/transfer/ssh.py +++ b/transfer/ssh.py @@ -1,95 +1,56 @@ # -*- coding: utf-8 -*- """ -ssh.py — SSH-диагностика: поиск ключей, проверка подключения, генерация временных ключей +ssh.py — SSH-подготовка для transfer на target-сервер. +Логика: найти ключ → проверить подключение к target. +Если ключ зашифрован (passphrase) — пропускаем (пользователь должен был разблокировать через ssh-agent). +Если нет ключа или не подходит — меню: пароль target / новый ключ. """ import os import glob -import re from core.color import info, warn, success, error as cerror, prompt, confirm from core.runner import run, exists -from core import state _SSH_DIR = os.path.expanduser("~/.ssh") def list_private_keys(): - """Ищет приватные SSH-ключи в ~/.ssh/""" + """Ищет приватные SSH-ключи в ~/.ssh/ (id_* без .pub).""" if not os.path.isdir(_SSH_DIR): return [] keys = [] - # Стандартные паттерны приватных ключей (не .pub) for pattern in ("id_rsa", "id_ed25519", "id_ecdsa", "id_dsa"): for f in glob.glob(os.path.join(_SSH_DIR, pattern)): - # Убедимся что это не .pub файл if not f.endswith(".pub") and os.path.isfile(f): - # Проверим что есть соответствующий .pub pub = f + ".pub" - has_pub = os.path.isfile(pub) keys.append({ "private": f, - "public": pub if has_pub else None, + "public": pub if os.path.isfile(pub) else None, "type": os.path.basename(f).replace("id_", ""), - "has_pub": has_pub, }) - # Ищем и другие файлы без расширения (которые содержат PRIVATE KEY) - for f in glob.glob(os.path.join(_SSH_DIR, "*")): - if os.path.isfile(f) and not f.endswith(".pub") and not f.endswith(".config"): - try: - with open(f, "r", encoding="utf-8", errors="ignore") as fh: - first = fh.read(100) - if "PRIVATE KEY" in first or "OPENSSH" in first: - pub = f + ".pub" - has_pub = os.path.isfile(pub) - if f not in [k["private"] for k in keys]: - keys.append({ - "private": f, - "public": pub if has_pub else None, - "type": "unknown", - "has_pub": has_pub, - }) - except Exception: - pass return keys -def check_ssh_connectivity(host, user, port, key_path=None, timeout=10): - """Проверяет SSH-соединение. Возвращает (ok, stdout, stderr).""" +def check_ssh_keyless(host, user, port, key_path, timeout=10): + """Проверяет подключение к target с данным ключом. БЕЗ passphrase-промптов.""" if not exists("ssh"): - return (False, "", "ssh не найден") - + return (False, "ssh не найден") identity = f"-i '{key_path}'" if key_path and os.path.isfile(key_path) else "" cmd = f"ssh {identity} -p {port} -o ConnectTimeout={timeout} -o BatchMode=yes -o StrictHostKeyChecking=accept-new {user}@{host} 'echo migrate-ok'" r = run(cmd, check=False, capture=True, timeout=30) if r.returncode == 0 and "migrate-ok" in r.stdout: - return (True, r.stdout, r.stderr) - return (False, r.stdout, r.stderr) - - -def test_keys_against_target(host, user, port, keys): - """Перебирает ключи и проверяет какой работает с target.""" - if not keys: - return None - info(f"Проверяем SSH-подключение к {user}@{host}:{port} с найденными ключами ...") - working = None - for k in keys: - label = os.path.basename(k["private"]) - ok, out, err = check_ssh_connectivity(host, user, port, key_path=k["private"]) - if ok: - success(f" ✓ Ключ '{label}' работает!") - working = k - break - else: - # Проверим, не passphrase ли - if "passphrase" in err.lower() or "password" in err.lower(): - warn(f" ! Ключ '{label}' требует passphrase (зашифрован). Можно ввести passphrase позже.") - else: - info(f" ✗ Ключ '{label}' не подходит ({err.strip()[:60]}).") - return working + return (True, "OK") + # Разбираем ошибку + err = (r.stderr or "").strip() + if "passphrase" in err.lower(): + return (False, "PASSPHRASE") + if "permission denied" in err.lower() or "too many auth" in err.lower(): + return (False, "DENIED") + return (False, err[:100]) def generate_temp_keypair(host_label): - """Генерирует временную пару ed25519 в /tmp/ для миграции.""" + """Генерирует временную пару ed25519 в /tmp/.""" temp_dir = "/tmp/docker-migrate-ssh-keys" os.makedirs(temp_dir, exist_ok=True) priv = os.path.join(temp_dir, f"migrate_{host_label}") @@ -97,65 +58,52 @@ def generate_temp_keypair(host_label): info("Генерируем временную SSH-пару ed25519 ...") run(f"ssh-keygen -t ed25519 -C 'docker-migrate-temp' -f '{priv}' -N ''", check=False) if os.path.isfile(priv) and os.path.isfile(pub): - success(f"Временная пара создана: {priv}") - # Устанавливаем безопасные права os.chmod(priv, 0o600) + success(f"Временная пара создана: {priv}") return {"private": priv, "public": pub, "type": "ed25519", "temp": True} return None def ssh_copy_id(host, user, port, pubkey_path): - """Копирует публичный ключ на target через ssh-copy-id. Интерактивно, т.к. запрашивает пароль target.""" - if exists("ssh-copy-id"): - warn("⚠ Сейчас запустится ssh-copy-id — на target запросит ПАРОЛЬ. Введите его ниже ↓") - # Запускаем без capture_output, чтобы prompt пароля был виден пользователю - r = run( - f"ssh-copy-id -p {port} -o ConnectTimeout=30 -o StrictHostKeyChecking=accept-new -i '{pubkey_path}' {user}@{host}", - check=False, - capture=False, - timeout=120 - ) - if r.returncode == 0: - success("Ключ добавлен на target через ssh-copy-id") - return True - else: - warn(f"ssh-copy-id не удался (код: {r.returncode})") + """ssh-copy-id на target. Интерактивно (пользователь вводит пароль target).""" + if not exists("ssh-copy-id"): + return False + warn("⚠ Сейчас запустится ssh-copy-id — введите ПАРОЛЬ от target-сервера ↓") + r = run( + f"ssh-copy-id -p {port} -o ConnectTimeout=30 -o StrictHostKeyChecking=accept-new -i '{pubkey_path}' {user}@{host}", + check=False, capture=False, timeout=120 + ) + if r.returncode == 0: + success("Ключ добавлен на target") + return True + warn(f"ssh-copy-id не удался (код: {r.returncode})") return False def manual_copy_instructions(host, user, port, pubkey_path): - """Выводит инструкции по ручному добавлению ключа.""" + """Инструкции по ручному добавлению ключа на target.""" print() - cerror("SSH-подключение к target не удалось ни с одним ключом.") - print() - info("Варианты решения:") - print(f" {yellow('1')} Добавьте публичный ключ на target вручную:") + cerror("SSH-подключение к target не удалось.") + info("Варианты:") try: with open(pubkey_path, "r") as f: - pubkey_content = f.read().strip() + pk = f.read().strip() except Exception: - pubkey_content = "(не удалось прочитать)" - print(f" На {user}@{host} выполните:") - print(f" mkdir -p ~/.ssh && chmod 700 ~/.ssh") - print(f" echo '{pubkey_content}' >> ~/.ssh/authorized_keys") - print(f" chmod 600 ~/.ssh/authorized_keys") - print() - print(f" {yellow('2')} Или скопируйте через ssh-copy-id (если есть парольный доступ):") - print(f" ssh-copy-id -p {port} -i {pubkey_path} {user}@{host}") - print() - print(f" {yellow('3')} После добавления ключа запустите миграцию заново:") - print(f" docker-migrate --resume") + pk = "(не удалось прочитать)" + print(f" 1) Вручную на target: mkdir -p ~/.ssh; echo '{pk}' >> ~/.ssh/authorized_keys") + print(f" 2) Или: ssh-copy-id -p {port} -i {pubkey_path} {user}@{host}") + print(f" 3) Потом: docker-migrate --resume") print() def ensure_sshpass(): - """Проверяет наличие sshpass, при необходимости ставит через apt.""" + """Устанавливает sshpass если нужно.""" if exists("sshpass"): return True info("sshpass не найден. Устанавливаем ...") r1 = run("apt-get update", check=False, timeout=60) if r1.returncode != 0: - warn(f"apt-get update не удался: {r1.stderr.strip()[:120]}") + warn(f"apt-get update: {r1.stderr.strip()[:120]}") r = run("apt-get install -y sshpass", check=False, timeout=120) if r.returncode != 0: warn(f"Не удалось установить sshpass: {r.stderr.strip()[:120]}") @@ -163,99 +111,71 @@ def ensure_sshpass(): return exists("sshpass") -def try_key_with_passphrase(key_path, passphrase): - """Проверяет, работает ли ключ с введённым passphrase через ssh-agent или expect.""" - # Способ 1: ssh-agent + ssh-add - from core.runner import run - import subprocess - try: - # Запускаем ssh-agent и добавляем ключ - agent_out = subprocess.run(["ssh-agent", "-s"], capture_output=True, text=True) - if agent_out.returncode != 0: - return False - # Парсим переменные окружения из ssh-agent -s - env_lines = agent_out.stdout.strip().split(";") - agent_env = {} - for ln in env_lines: - if "=" in ln and "SSH_" in ln: - k, v = ln.strip().split("=", 1) - agent_env[k] = v - - # ssh-add с passphrase через PIPE - add_proc = subprocess.Popen( - ["ssh-add", key_path], - env={**os.environ, **agent_env}, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True - ) - stdout, stderr = add_proc.communicate(input=passphrase + "\n", timeout=10) - if add_proc.returncode == 0: - return True - # Если не сработало — убиваем ssh-agent - subprocess.run(["ssh-agent", "-k"], env={**os.environ, **agent_env}, capture_output=True) - except Exception: - pass - return False - - def pick_or_setup_ssh_key(host, user, port): """ - Главная функция SSH-подготовки. Возвращает путь к приватному ключу. - Если None — значит используем пароль (sshpass). + Возвращает путь к рабочему приватному ключу для target. + Если None — значит нужен пароль target (sshpass). """ - # 1. Найти существующие ключи keys = list_private_keys() if keys: - info(f"Найдено SSH-ключей: {len(keys)}") + info(f"Найдено SSH-ключей на этом сервере: {len(keys)}") for k in keys: - label = os.path.basename(k["private"]) - status = "(есть .pub)" if k["has_pub"] else "(❗ нет .pub!)" - print(f" - {label} {status}") + pub_ok = "✓ .pub" if k["public"] else "✗ нет .pub" + print(f" - {os.path.basename(k['private'])} ({pub_ok})") - # 2. Проверить ключи БЕЗ passphrase - working_key = test_keys_against_target(host, user, port, keys) - if working_key: - return working_key["private"] - - # 3. Проверить ключи — если зашифрован и не подходит, просто сообщаем и идём дальше - encrypted_count = 0 + # Проверяем каждый ключ + usable = None + encrypted = 0 + denied = 0 for k in keys: label = os.path.basename(k["private"]) - ok, _, err = check_ssh_connectivity(host, user, port, key_path=k["private"]) - if not ok and ("passphrase" in err.lower() or "password" in err.lower()): - warn(f" ! Ключ '{label}' зашифрован (passphrase). Пропускаем.") - encrypted_count += 1 - if encrypted_count and not working_key: - info(f"Все найденные ключи зашифрованы или не подходят для {host}.") + ok, why = check_ssh_keyless(host, user, port, k["private"]) + if ok: + success(f" ✓ Ключ '{label}' подходит для {host}") + usable = k["private"] + break + if why == "PASSPHRASE": + warn(f" ! Ключ '{label}' зашифрован passphrase (разблокируйте через ssh-add, если нужен)") + encrypted += 1 + elif why == "DENIED": + info(f" ✗ Ключ '{label}' не принят target (Permission denied)") + denied += 1 + else: + info(f" ✗ Ключ '{label}': {why[:60]}") - # 4. Ничего не работает — выбор метода + if usable: + return usable + + # Выводим диагностику print() - info("Выберите способ подключения к target:") - print(" 1 Пароль (sshpass) — ввести пароль от target прямо сейчас") - print(" 2 Сгенерировать временный ed25519 ключ — скрипт сам сделает ssh-copy-id") - print(" 0 Отмена — передать архив вручную") + if encrypted and not denied: + info(f"Все ключи зашифрованы passphrase. Если хотите использовать их — разблокируйте:") + info(f" eval $(ssh-agent) && ssh-add") + info(f" Затем перезапустите: docker-migrate --resume") + elif denied: + info(f"Ключи есть, но target их не принимает (не добавлены в ~/.ssh/authorized_keys).") + + # Меню выбора + info("") + info("Как подключиться к target-серверу?") + print(" 1) Пароль — ввести пароль от target (sshpass)") + print(" 2) Новый ключ — сгенерировать ed25519 + ssh-copy-id на target") + print(" 0) Отмена") choice = prompt("Ваш выбор", default="1").strip() if choice == "2": - # Генерация временного ключа - temp_key = generate_temp_keypair(host.replace(".", "_")) - if not temp_key: - cerror("Не удалось сгенерировать ключ.") - return None - if ssh_copy_id(host, user, port, temp_key["public"]): - ok, _, _ = check_ssh_connectivity(host, user, port, key_path=temp_key["private"]) + temp = generate_temp_keypair(host.replace(".", "_")) + if not temp: + raise RuntimeError("Не удалось сгенерировать ключ") + if ssh_copy_id(host, user, port, temp["public"]): + ok, _ = check_ssh_keyless(host, user, port, temp["private"]) if ok: - success("Временный ключ настроен и работает!") - return temp_key["private"] - # ssh-copy-id не сработал — показываем инструкции - manual_copy_instructions(host, user, port, temp_key["public"]) - raise RuntimeError("ssh-copy-id не удался. Настройте ключ вручную и запустите: docker-migrate --resume") + success("Временный ключ работает!") + return temp["private"] + manual_copy_instructions(host, user, port, temp["public"]) + raise RuntimeError("ssh-copy-id не удался. Настройте вручную и запустите --resume") - elif choice == "1": - # Пароль — caller запросит его позже - return None + if choice == "1": + return None # caller запросит пароль - else: - raise RuntimeError("Перенос отменён пользователем.") + raise RuntimeError("Перенос отменён пользователем.")