Files
docker-migrate/transfer/ssh.py

190 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
ssh.py — SSH-диагностика: поиск ключей, проверка подключения, генерация временных ключей
"""
import os
import glob
import re
from core.color import info, warn, success, error as cerror, prompt, confirm
from core.runner import run, exists
from core import state
_SSH_DIR = os.path.expanduser("~/.ssh")
def list_private_keys():
"""Ищет приватные SSH-ключи в ~/.ssh/"""
if not os.path.isdir(_SSH_DIR):
return []
keys = []
# Стандартные паттерны приватных ключей (не .pub)
for pattern in ("id_rsa", "id_ed25519", "id_ecdsa", "id_dsa"):
for f in glob.glob(os.path.join(_SSH_DIR, pattern)):
# Убедимся что это не .pub файл
if not f.endswith(".pub") and os.path.isfile(f):
# Проверим что есть соответствующий .pub
pub = f + ".pub"
has_pub = os.path.isfile(pub)
keys.append({
"private": f,
"public": pub if has_pub else None,
"type": os.path.basename(f).replace("id_", ""),
"has_pub": has_pub,
})
# Ищем и другие файлы без расширения (которые содержат PRIVATE KEY)
for f in glob.glob(os.path.join(_SSH_DIR, "*")):
if os.path.isfile(f) and not f.endswith(".pub") and not f.endswith(".config"):
try:
with open(f, "r", encoding="utf-8", errors="ignore") as fh:
first = fh.read(100)
if "PRIVATE KEY" in first or "OPENSSH" in first:
pub = f + ".pub"
has_pub = os.path.isfile(pub)
if f not in [k["private"] for k in keys]:
keys.append({
"private": f,
"public": pub if has_pub else None,
"type": "unknown",
"has_pub": has_pub,
})
except Exception:
pass
return keys
def check_ssh_connectivity(host, user, port, key_path=None, timeout=10):
"""Проверяет SSH-соединение. Возвращает (ok, stdout, stderr)."""
if not exists("ssh"):
return (False, "", "ssh не найден")
identity = f"-i '{key_path}'" if key_path and os.path.isfile(key_path) else ""
cmd = f"ssh {identity} -p {port} -o ConnectTimeout={timeout} -o BatchMode=yes -o StrictHostKeyChecking=accept-new {user}@{host} 'echo migrate-ok'"
r = run(cmd, check=False, capture=True, timeout=30)
if r.returncode == 0 and "migrate-ok" in r.stdout:
return (True, r.stdout, r.stderr)
return (False, r.stdout, r.stderr)
def test_keys_against_target(host, user, port, keys):
"""Перебирает ключи и проверяет какой работает с target."""
if not keys:
return None
info(f"Проверяем SSH-подключение к {user}@{host}:{port} с найденными ключами ...")
working = None
for k in keys:
label = os.path.basename(k["private"])
ok, out, err = check_ssh_connectivity(host, user, port, key_path=k["private"])
if ok:
success(f" ✓ Ключ '{label}' работает!")
working = k
break
else:
# Проверим, не passphrase ли
if "passphrase" in err.lower() or "password" in err.lower():
warn(f" ! Ключ '{label}' требует passphrase (зашифрован). Пропускаем (нужен ssh-agent).")
else:
info(f" ✗ Ключ '{label}' не подходит ({err.strip()[:60]}).")
return working
def generate_temp_keypair(host_label):
"""Генерирует временную пару ed25519 в /tmp/ для миграции."""
temp_dir = "/tmp/docker-migrate-ssh-keys"
os.makedirs(temp_dir, exist_ok=True)
priv = os.path.join(temp_dir, f"migrate_{host_label}")
pub = priv + ".pub"
info("Генерируем временную SSH-пару ed25519 ...")
run(f"ssh-keygen -t ed25519 -C 'docker-migrate-temp' -f '{priv}' -N ''", check=False)
if os.path.isfile(priv) and os.path.isfile(pub):
success(f"Временная пара создана: {priv}")
# Устанавливаем безопасные права
os.chmod(priv, 0o600)
return {"private": priv, "public": pub, "type": "ed25519", "temp": True}
return None
def ssh_copy_id(host, user, port, pubkey_path):
"""Копирует публичный ключ на target через ssh-copy-id или вручную."""
if exists("ssh-copy-id"):
info("Копируем публичный ключ через ssh-copy-id ...")
# Добавляем SSH опции чтобы избежать зависания
r = run(
f"ssh-copy-id -p {port} -o ConnectTimeout=10 -o StrictHostKeyChecking=accept-new -i '{pubkey_path}' {user}@{host}",
check=False,
timeout=60
)
if r.returncode == 0:
success("Ключ добавлен на target через ssh-copy-id")
return True
else:
warn(f"ssh-copy-id не удался: {r.stderr.strip()[:120]}")
return False
def manual_copy_instructions(host, user, port, pubkey_path):
"""Выводит инструкции по ручному добавлению ключа."""
print()
cerror("SSH-подключение к target не удалось ни с одним ключом.")
print()
info("Варианты решения:")
print(f" {yellow('1')} Добавьте публичный ключ на target вручную:")
try:
with open(pubkey_path, "r") as f:
pubkey_content = f.read().strip()
except Exception:
pubkey_content = "(не удалось прочитать)"
print(f" На {user}@{host} выполните:")
print(f" mkdir -p ~/.ssh && chmod 700 ~/.ssh")
print(f" echo '{pubkey_content}' >> ~/.ssh/authorized_keys")
print(f" chmod 600 ~/.ssh/authorized_keys")
print()
print(f" {yellow('2')} Или скопируйте через ssh-copy-id (если есть парольный доступ):")
print(f" ssh-copy-id -p {port} -i {pubkey_path} {user}@{host}")
print()
print(f" {yellow('3')} После добавления ключа запустите миграцию заново:")
print(f" docker-migrate --resume")
print()
def pick_or_setup_ssh_key(host, user, port):
"""
Главная функция SSH-подготовки.
Возвращает путь к приватному ключу для работы с target.
Если None — значит надо продолжить через пароль (или пользователь отказался).
"""
# 1. Найти существующие ключи
keys = list_private_keys()
if keys:
info(f"Найдено SSH-ключей: {len(keys)}")
for k in keys:
label = os.path.basename(k["private"])
status = "(есть .pub)" if k["has_pub"] else "(❗ нет .pub!)"
print(f" - {label} {status}")
# 2. Проверить подключение с существующими ключами
working_key = test_keys_against_target(host, user, port, keys)
if working_key:
return working_key["private"]
# 3. Если не удалось — спросить о генерации временного ключа
if not confirm("SSH-подключение не удалось. Сгенерировать временную пару ed25519", default="y"):
warn("Продолжаем без ключевой пары. Передача через пароль (или выберите 'нет' и перенесите архив вручную).")
return None
temp_key = generate_temp_keypair(host.replace(".", "_"))
if not temp_key:
cerror("Не удалось сгенерировать ключ.")
return None
# 4. Попробовать ssh-copy-id с временным ключом
if ssh_copy_id(host, user, port, temp_key["public"]):
# Проверим ещё раз
ok, _, _ = check_ssh_connectivity(host, user, port, key_path=temp_key["private"])
if ok:
return temp_key["private"]
# 5. Fallback на инструкции
manual_copy_instructions(host, user, port, temp_key["public"])
raise RuntimeError("SSH-подключение не настроено. Выполните инструкции выше и запустите: docker-migrate --resume")