Files
docker-migrate/transfer/ssh.py

125 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
ssh.py — SSH-подготовка для переноса на новый сервер.
НЕ ищем ключи на старом сервере — спрашиваем пользователя как подключиться к новому.
"""
import os
from core.color import info, warn, success, error as cerror, prompt, confirm
from core.runner import run, exists
def check_ssh_keyless(host, user, port, key_path, timeout=10):
"""Проверяет подключение к новому серверу с данным ключом."""
if not exists("ssh"):
return (False, "ssh не найден")
identity = f"-i '{key_path}'" if key_path and os.path.isfile(key_path) else ""
cmd = f"ssh {identity} -p {port} -o ConnectTimeout={timeout} -o BatchMode=yes -o StrictHostKeyChecking=accept-new {user}@{host} 'echo migrate-ok'"
r = run(cmd, check=False, capture=True, timeout=30)
if r.returncode == 0 and "migrate-ok" in r.stdout:
return (True, "OK")
err = (r.stderr or "").strip()
if "passphrase" in err.lower():
return (False, "PASSPHRASE")
return (False, err[:120])
def generate_temp_keypair(host_label):
"""Генерирует временную пару ed25519 в /tmp/."""
temp_dir = "/tmp/docker-migrate-ssh-keys"
os.makedirs(temp_dir, exist_ok=True)
priv = os.path.join(temp_dir, f"migrate_{host_label}")
pub = priv + ".pub"
info("Генерируем временную SSH-пару ed25519 ...")
run(f"ssh-keygen -t ed25519 -C 'docker-migrate-temp' -f '{priv}' -N ''", check=False)
if os.path.isfile(priv) and os.path.isfile(pub):
os.chmod(priv, 0o600)
success(f"Временная пара создана: {priv}")
return {"private": priv, "public": pub, "type": "ed25519", "temp": True}
return None
def ssh_copy_id(host, user, port, pubkey_path):
"""ssh-copy-id на новый сервер. Интерактивно (пароль нового сервера)."""
if not exists("ssh-copy-id"):
return False
warn("⚠ Сейчас запустится ssh-copy-id — введите ПАРОЛЬ от НОВОГО сервера ↓")
r = run(
f"ssh-copy-id -p {port} -o ConnectTimeout=30 -o StrictHostKeyChecking=accept-new -i '{pubkey_path}' {user}@{host}",
check=False, capture=False, timeout=120
)
return r.returncode == 0
def ensure_sshpass():
"""Устанавливает sshpass если нужно."""
if exists("sshpass"):
return True
info("sshpass не найден. Устанавливаем ...")
r1 = run("apt-get update", check=False, timeout=60)
if r1.returncode != 0:
warn(f"apt-get update: {r1.stderr.strip()[:120]}")
r = run("apt-get install -y sshpass", check=False, timeout=120)
if r.returncode != 0:
warn(f"Не удалось установить sshpass: {r.stderr.strip()[:120]}")
return False
return exists("sshpass")
# ...
def pick_or_setup_ssh_key(host, user, port):
"""
Возвращает путь к приватному SSH-ключу для нового сервера.
Если None - значит нужен пароль нового сервера (sshpass).
"""
info("")
info("Как подключиться к новому серверу?")
print(" 1) У меня есть приватный SSH-ключ от нового сервера (файл .pem/.key/.ppk)")
print(" 2) У меня есть логин и пароль от нового сервера")
print(" 0) Отмена")
choice = prompt("Ваш выбор", default="1").strip()
if choice == "1":
# Пользователь даёт путь к ключу нового сервера
key_path = prompt("Путь к приватному SSH-ключу (например, /root/server.pem)").strip()
if not key_path or not os.path.isfile(key_path):
raise RuntimeError(f"Файл не найден: {key_path}")
ok, why = check_ssh_keyless(host, user, port, key_path)
if ok:
success("Приватный SSH-ключ подходит для нового сервера!")
return key_path
if why == "PASSPHRASE":
warn(f"Приватный SSH-ключ защищён passphrase. Разблокируйте: ssh-add {key_path}")
raise RuntimeError("Приватный SSH-ключ зашифрован. Разблокируйте через ssh-add и перезапустите.")
raise RuntimeError(f"Приватный SSH-ключ не подходит: {why}")
if choice == "2":
# Пароль нового сервера
info("")
info("Выберите способ:")
print(" 1) Ввести пароль от нового сервера — scp через sshpass")
print(" 2) Сгенерировать временный SSH-ключ + ssh-copy-id (новый сервер спросит пароль)")
sub = prompt("Ваш выбор", default="1").strip()
if sub == "2":
temp = generate_temp_keypair(host.replace(".", "_"))
if not temp:
raise RuntimeError("Не удалось сгенерировать SSH-ключ")
if ssh_copy_id(host, user, port, temp["public"]):
ok, _ = check_ssh_keyless(host, user, port, temp["private"])
if ok:
success("Временный SSH-ключ добавлен на новый сервер!")
return temp["private"]
# ssh-copy-id не сработал
warn("ssh-copy-id не удался. Варианты:")
print(f" 1) Вручную на новом сервере: mkdir -p ~/.ssh; echo '$(cat {temp['public']})' >> ~/.ssh/authorized_keys")
print(f" 2) Затем: docker-migrate --resume")
raise RuntimeError("ssh-copy-id не удался.")
# sub == "1" или любое другое — возвращаем None, caller запросит пароль
return None
raise RuntimeError("Перенос отменён пользователем.")