Files
docker-migrate/transfer/ssh.py

122 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
ssh.py — SSH-подготовка для transfer на target-сервер.
НЕ ищем ключи на source — спрашиваем пользователя как подключиться к target.
"""
import os
from core.color import info, warn, success, error as cerror, prompt, confirm
from core.runner import run, exists
def check_ssh_keyless(host, user, port, key_path, timeout=10):
"""Проверяет подключение к target с данным ключом."""
if not exists("ssh"):
return (False, "ssh не найден")
identity = f"-i '{key_path}'" if key_path and os.path.isfile(key_path) else ""
cmd = f"ssh {identity} -p {port} -o ConnectTimeout={timeout} -o BatchMode=yes -o StrictHostKeyChecking=accept-new {user}@{host} 'echo migrate-ok'"
r = run(cmd, check=False, capture=True, timeout=30)
if r.returncode == 0 and "migrate-ok" in r.stdout:
return (True, "OK")
err = (r.stderr or "").strip()
if "passphrase" in err.lower():
return (False, "PASSPHRASE")
return (False, err[:120])
def generate_temp_keypair(host_label):
"""Генерирует временную пару ed25519 в /tmp/."""
temp_dir = "/tmp/docker-migrate-ssh-keys"
os.makedirs(temp_dir, exist_ok=True)
priv = os.path.join(temp_dir, f"migrate_{host_label}")
pub = priv + ".pub"
info("Генерируем временную SSH-пару ed25519 ...")
run(f"ssh-keygen -t ed25519 -C 'docker-migrate-temp' -f '{priv}' -N ''", check=False)
if os.path.isfile(priv) and os.path.isfile(pub):
os.chmod(priv, 0o600)
success(f"Временная пара создана: {priv}")
return {"private": priv, "public": pub, "type": "ed25519", "temp": True}
return None
def ssh_copy_id(host, user, port, pubkey_path):
"""ssh-copy-id на target. Интерактивно (пароль target)."""
if not exists("ssh-copy-id"):
return False
warn("⚠ Сейчас запустится ssh-copy-id — введите ПАРОЛЬ от target-сервера ↓")
r = run(
f"ssh-copy-id -p {port} -o ConnectTimeout=30 -o StrictHostKeyChecking=accept-new -i '{pubkey_path}' {user}@{host}",
check=False, capture=False, timeout=120
)
return r.returncode == 0
def ensure_sshpass():
"""Устанавливает sshpass если нужно."""
if exists("sshpass"):
return True
info("sshpass не найден. Устанавливаем ...")
r1 = run("apt-get update", check=False, timeout=60)
if r1.returncode != 0:
warn(f"apt-get update: {r1.stderr.strip()[:120]}")
r = run("apt-get install -y sshpass", check=False, timeout=120)
if r.returncode != 0:
warn(f"Не удалось установить sshpass: {r.stderr.strip()[:120]}")
return False
return exists("sshpass")
def pick_or_setup_ssh_key(host, user, port):
"""
Возвращает путь к приватному ключу для target.
Если None — значит нужен пароль target (sshpass).
"""
info("")
info("Как подключиться к target-серверу?")
print(" 1) У меня есть приватный ключ от target (файл .pem/.key/.ppk)")
print(" 2) У меня есть логин и пароль от target")
print(" 0) Отмена")
choice = prompt("Ваш выбор", default="1").strip()
if choice == "1":
# Пользователь даёт путь к ключу target
key_path = prompt("Путь к приватному ключу (например, /root/server.pem)").strip()
if not key_path or not os.path.isfile(key_path):
raise RuntimeError(f"Файл не найден: {key_path}")
ok, why = check_ssh_keyless(host, user, port, key_path)
if ok:
success("Ключ подходит для target!")
return key_path
if why == "PASSPHRASE":
warn(f"Ключ защищён passphrase. Разблокируйте: ssh-add {key_path}")
raise RuntimeError("Ключ зашифрован. Разблокируйте через ssh-add и перезапустите.")
raise RuntimeError(f"Ключ не подходит: {why}")
if choice == "2":
# Пароль target
info("")
info("Выберите способ:")
print(" 1) Ввести пароль от target — scp через sshpass")
print(" 2) Сгенерировать временный ключ + ssh-copy-id (target спросит пароль)")
sub = prompt("Ваш выбор", default="1").strip()
if sub == "2":
temp = generate_temp_keypair(host.replace(".", "_"))
if not temp:
raise RuntimeError("Не удалось сгенерировать ключ")
if ssh_copy_id(host, user, port, temp["public"]):
ok, _ = check_ssh_keyless(host, user, port, temp["private"])
if ok:
success("Временный ключ добавлен на target!")
return temp["private"]
# ssh-copy-id не сработал
warn("ssh-copy-id не удался. Варианты:")
print(f" 1) Вручную на target: mkdir -p ~/.ssh; echo '$(cat {temp['public']})' >> ~/.ssh/authorized_keys")
print(f" 2) Затем: docker-migrate --resume")
raise RuntimeError("ssh-copy-id не удался.")
# sub == "1" или любое другое — возвращаем None, caller запросит пароль
return None
raise RuntimeError("Перенос отменён пользователем.")