Files
docker-migrate/transfer/ssh.py

182 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
ssh.py — SSH-подготовка для transfer на target-сервер.
Логика: найти ключ → проверить подключение к target.
Если ключ зашифрован (passphrase) — пропускаем (пользователь должен был разблокировать через ssh-agent).
Если нет ключа или не подходит — меню: пароль target / новый ключ.
"""
import os
import glob
from core.color import info, warn, success, error as cerror, prompt, confirm
from core.runner import run, exists
_SSH_DIR = os.path.expanduser("~/.ssh")
def list_private_keys():
"""Ищет приватные SSH-ключи в ~/.ssh/ (id_* без .pub)."""
if not os.path.isdir(_SSH_DIR):
return []
keys = []
for pattern in ("id_rsa", "id_ed25519", "id_ecdsa", "id_dsa"):
for f in glob.glob(os.path.join(_SSH_DIR, pattern)):
if not f.endswith(".pub") and os.path.isfile(f):
pub = f + ".pub"
keys.append({
"private": f,
"public": pub if os.path.isfile(pub) else None,
"type": os.path.basename(f).replace("id_", ""),
})
return keys
def check_ssh_keyless(host, user, port, key_path, timeout=10):
"""Проверяет подключение к target с данным ключом. БЕЗ passphrase-промптов."""
if not exists("ssh"):
return (False, "ssh не найден")
identity = f"-i '{key_path}'" if key_path and os.path.isfile(key_path) else ""
cmd = f"ssh {identity} -p {port} -o ConnectTimeout={timeout} -o BatchMode=yes -o StrictHostKeyChecking=accept-new {user}@{host} 'echo migrate-ok'"
r = run(cmd, check=False, capture=True, timeout=30)
if r.returncode == 0 and "migrate-ok" in r.stdout:
return (True, "OK")
# Разбираем ошибку
err = (r.stderr or "").strip()
if "passphrase" in err.lower():
return (False, "PASSPHRASE")
if "permission denied" in err.lower() or "too many auth" in err.lower():
return (False, "DENIED")
return (False, err[:100])
def generate_temp_keypair(host_label):
"""Генерирует временную пару ed25519 в /tmp/."""
temp_dir = "/tmp/docker-migrate-ssh-keys"
os.makedirs(temp_dir, exist_ok=True)
priv = os.path.join(temp_dir, f"migrate_{host_label}")
pub = priv + ".pub"
info("Генерируем временную SSH-пару ed25519 ...")
run(f"ssh-keygen -t ed25519 -C 'docker-migrate-temp' -f '{priv}' -N ''", check=False)
if os.path.isfile(priv) and os.path.isfile(pub):
os.chmod(priv, 0o600)
success(f"Временная пара создана: {priv}")
return {"private": priv, "public": pub, "type": "ed25519", "temp": True}
return None
def ssh_copy_id(host, user, port, pubkey_path):
"""ssh-copy-id на target. Интерактивно (пользователь вводит пароль target)."""
if not exists("ssh-copy-id"):
return False
warn("⚠ Сейчас запустится ssh-copy-id — введите ПАРОЛЬ от target-сервера ↓")
r = run(
f"ssh-copy-id -p {port} -o ConnectTimeout=30 -o StrictHostKeyChecking=accept-new -i '{pubkey_path}' {user}@{host}",
check=False, capture=False, timeout=120
)
if r.returncode == 0:
success("Ключ добавлен на target")
return True
warn(f"ssh-copy-id не удался (код: {r.returncode})")
return False
def manual_copy_instructions(host, user, port, pubkey_path):
"""Инструкции по ручному добавлению ключа на target."""
print()
cerror("SSH-подключение к target не удалось.")
info("Варианты:")
try:
with open(pubkey_path, "r") as f:
pk = f.read().strip()
except Exception:
pk = "(не удалось прочитать)"
print(f" 1) Вручную на target: mkdir -p ~/.ssh; echo '{pk}' >> ~/.ssh/authorized_keys")
print(f" 2) Или: ssh-copy-id -p {port} -i {pubkey_path} {user}@{host}")
print(f" 3) Потом: docker-migrate --resume")
print()
def ensure_sshpass():
"""Устанавливает sshpass если нужно."""
if exists("sshpass"):
return True
info("sshpass не найден. Устанавливаем ...")
r1 = run("apt-get update", check=False, timeout=60)
if r1.returncode != 0:
warn(f"apt-get update: {r1.stderr.strip()[:120]}")
r = run("apt-get install -y sshpass", check=False, timeout=120)
if r.returncode != 0:
warn(f"Не удалось установить sshpass: {r.stderr.strip()[:120]}")
return False
return exists("sshpass")
def pick_or_setup_ssh_key(host, user, port):
"""
Возвращает путь к рабочему приватному ключу для target.
Если None — значит нужен пароль target (sshpass).
"""
keys = list_private_keys()
if keys:
info(f"Найдено SSH-ключей на этом сервере: {len(keys)}")
for k in keys:
pub_ok = "✓ .pub" if k["public"] else "✗ нет .pub"
print(f" - {os.path.basename(k['private'])} ({pub_ok})")
# Проверяем каждый ключ
usable = None
encrypted = 0
denied = 0
for k in keys:
label = os.path.basename(k["private"])
ok, why = check_ssh_keyless(host, user, port, k["private"])
if ok:
success(f" ✓ Ключ '{label}' подходит для {host}")
usable = k["private"]
break
if why == "PASSPHRASE":
warn(f" ! Ключ '{label}' зашифрован passphrase (разблокируйте через ssh-add, если нужен)")
encrypted += 1
elif why == "DENIED":
info(f" ✗ Ключ '{label}' не принят target (Permission denied)")
denied += 1
else:
info(f" ✗ Ключ '{label}': {why[:60]}")
if usable:
return usable
# Выводим диагностику
print()
if encrypted and not denied:
info(f"Все ключи зашифрованы passphrase. Если хотите использовать их — разблокируйте:")
info(f" eval $(ssh-agent) && ssh-add")
info(f" Затем перезапустите: docker-migrate --resume")
elif denied:
info(f"Ключи есть, но target их не принимает (не добавлены в ~/.ssh/authorized_keys).")
# Меню выбора
info("")
info("Как подключиться к target-серверу?")
print(" 1) Пароль — ввести пароль от target (sshpass)")
print(" 2) Новый ключ — сгенерировать ed25519 + ssh-copy-id на target")
print(" 0) Отмена")
choice = prompt("Ваш выбор", default="1").strip()
if choice == "2":
temp = generate_temp_keypair(host.replace(".", "_"))
if not temp:
raise RuntimeError("Не удалось сгенерировать ключ")
if ssh_copy_id(host, user, port, temp["public"]):
ok, _ = check_ssh_keyless(host, user, port, temp["private"])
if ok:
success("Временный ключ работает!")
return temp["private"]
manual_copy_instructions(host, user, port, temp["public"])
raise RuntimeError("ssh-copy-id не удался. Настройте вручную и запустите --resume")
if choice == "1":
return None # caller запросит пароль
raise RuntimeError("Перенос отменён пользователем.")