Files
docker-migrate/transfer/ssh.py

267 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
ssh.py — SSH-диагностика: поиск ключей, проверка подключения, генерация временных ключей
"""
import os
import glob
import re
from core.color import info, warn, success, error as cerror, prompt, confirm
from core.runner import run, exists
from core import state
_SSH_DIR = os.path.expanduser("~/.ssh")
def list_private_keys():
"""Ищет приватные SSH-ключи в ~/.ssh/"""
if not os.path.isdir(_SSH_DIR):
return []
keys = []
# Стандартные паттерны приватных ключей (не .pub)
for pattern in ("id_rsa", "id_ed25519", "id_ecdsa", "id_dsa"):
for f in glob.glob(os.path.join(_SSH_DIR, pattern)):
# Убедимся что это не .pub файл
if not f.endswith(".pub") and os.path.isfile(f):
# Проверим что есть соответствующий .pub
pub = f + ".pub"
has_pub = os.path.isfile(pub)
keys.append({
"private": f,
"public": pub if has_pub else None,
"type": os.path.basename(f).replace("id_", ""),
"has_pub": has_pub,
})
# Ищем и другие файлы без расширения (которые содержат PRIVATE KEY)
for f in glob.glob(os.path.join(_SSH_DIR, "*")):
if os.path.isfile(f) and not f.endswith(".pub") and not f.endswith(".config"):
try:
with open(f, "r", encoding="utf-8", errors="ignore") as fh:
first = fh.read(100)
if "PRIVATE KEY" in first or "OPENSSH" in first:
pub = f + ".pub"
has_pub = os.path.isfile(pub)
if f not in [k["private"] for k in keys]:
keys.append({
"private": f,
"public": pub if has_pub else None,
"type": "unknown",
"has_pub": has_pub,
})
except Exception:
pass
return keys
def check_ssh_connectivity(host, user, port, key_path=None, timeout=10):
"""Проверяет SSH-соединение. Возвращает (ok, stdout, stderr)."""
if not exists("ssh"):
return (False, "", "ssh не найден")
identity = f"-i '{key_path}'" if key_path and os.path.isfile(key_path) else ""
cmd = f"ssh {identity} -p {port} -o ConnectTimeout={timeout} -o BatchMode=yes -o StrictHostKeyChecking=accept-new {user}@{host} 'echo migrate-ok'"
r = run(cmd, check=False, capture=True, timeout=30)
if r.returncode == 0 and "migrate-ok" in r.stdout:
return (True, r.stdout, r.stderr)
return (False, r.stdout, r.stderr)
def test_keys_against_target(host, user, port, keys):
"""Перебирает ключи и проверяет какой работает с target."""
if not keys:
return None
info(f"Проверяем SSH-подключение к {user}@{host}:{port} с найденными ключами ...")
working = None
for k in keys:
label = os.path.basename(k["private"])
ok, out, err = check_ssh_connectivity(host, user, port, key_path=k["private"])
if ok:
success(f" ✓ Ключ '{label}' работает!")
working = k
break
else:
# Проверим, не passphrase ли
if "passphrase" in err.lower() or "password" in err.lower():
warn(f" ! Ключ '{label}' требует passphrase (зашифрован). Пропускаем (нужен ssh-agent).")
else:
info(f" ✗ Ключ '{label}' не подходит ({err.strip()[:60]}).")
return working
def generate_temp_keypair(host_label):
"""Генерирует временную пару ed25519 в /tmp/ для миграции."""
temp_dir = "/tmp/docker-migrate-ssh-keys"
os.makedirs(temp_dir, exist_ok=True)
priv = os.path.join(temp_dir, f"migrate_{host_label}")
pub = priv + ".pub"
info("Генерируем временную SSH-пару ed25519 ...")
run(f"ssh-keygen -t ed25519 -C 'docker-migrate-temp' -f '{priv}' -N ''", check=False)
if os.path.isfile(priv) and os.path.isfile(pub):
success(f"Временная пара создана: {priv}")
# Устанавливаем безопасные права
os.chmod(priv, 0o600)
return {"private": priv, "public": pub, "type": "ed25519", "temp": True}
return None
def ssh_copy_id(host, user, port, pubkey_path):
"""Копирует публичный ключ на target через ssh-copy-id или вручную."""
if exists("ssh-copy-id"):
info("Копируем публичный ключ через ssh-copy-id ...")
# Добавляем SSH опции чтобы избежать зависания
r = run(
f"ssh-copy-id -p {port} -o ConnectTimeout=10 -o StrictHostKeyChecking=accept-new -i '{pubkey_path}' {user}@{host}",
check=False,
timeout=60
)
if r.returncode == 0:
success("Ключ добавлен на target через ssh-copy-id")
return True
else:
warn(f"ssh-copy-id не удался: {r.stderr.strip()[:120]}")
return False
def manual_copy_instructions(host, user, port, pubkey_path):
"""Выводит инструкции по ручному добавлению ключа."""
print()
cerror("SSH-подключение к target не удалось ни с одним ключом.")
print()
info("Варианты решения:")
print(f" {yellow('1')} Добавьте публичный ключ на target вручную:")
try:
with open(pubkey_path, "r") as f:
pubkey_content = f.read().strip()
except Exception:
pubkey_content = "(не удалось прочитать)"
print(f" На {user}@{host} выполните:")
print(f" mkdir -p ~/.ssh && chmod 700 ~/.ssh")
print(f" echo '{pubkey_content}' >> ~/.ssh/authorized_keys")
print(f" chmod 600 ~/.ssh/authorized_keys")
print()
print(f" {yellow('2')} Или скопируйте через ssh-copy-id (если есть парольный доступ):")
print(f" ssh-copy-id -p {port} -i {pubkey_path} {user}@{host}")
print()
print(f" {yellow('3')} После добавления ключа запустите миграцию заново:")
print(f" docker-migrate --resume")
print()
def ensure_sshpass():
"""Проверяет наличие sshpass, при необходимости ставит через apt."""
if exists("sshpass"):
return True
info("sshpass не найден. Устанавливаем ...")
r1 = run("apt-get update", check=False, timeout=60)
if r1.returncode != 0:
warn(f"apt-get update не удался: {r1.stderr.strip()[:120]}")
r = run("apt-get install -y sshpass", check=False, timeout=120)
if r.returncode != 0:
warn(f"Не удалось установить sshpass: {r.stderr.strip()[:120]}")
return False
return exists("sshpass")
def try_key_with_passphrase(key_path, passphrase):
"""Проверяет, работает ли ключ с введённым passphrase через ssh-agent или expect."""
# Способ 1: ssh-agent + ssh-add
from core.runner import run
import subprocess
try:
# Запускаем ssh-agent и добавляем ключ
agent_out = subprocess.run(["ssh-agent", "-s"], capture_output=True, text=True)
if agent_out.returncode != 0:
return False
# Парсим переменные окружения из ssh-agent -s
env_lines = agent_out.stdout.strip().split(";")
agent_env = {}
for ln in env_lines:
if "=" in ln and "SSH_" in ln:
k, v = ln.strip().split("=", 1)
agent_env[k] = v
# ssh-add с passphrase через PIPE
add_proc = subprocess.Popen(
["ssh-add", key_path],
env={**os.environ, **agent_env},
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = add_proc.communicate(input=passphrase + "\n", timeout=10)
if add_proc.returncode == 0:
return True
# Если не сработало — убиваем ssh-agent
subprocess.run(["ssh-agent", "-k"], env={**os.environ, **agent_env}, capture_output=True)
except Exception:
pass
return False
def pick_or_setup_ssh_key(host, user, port):
"""
Главная функция SSH-подготовки. Возвращает путь к приватному ключу.
Если None — значит используем пароль (sshpass).
"""
# 1. Найти существующие ключи
keys = list_private_keys()
if keys:
info(f"Найдено SSH-ключей: {len(keys)}")
for k in keys:
label = os.path.basename(k["private"])
status = "(есть .pub)" if k["has_pub"] else "(❗ нет .pub!)"
print(f" - {label} {status}")
# 2. Проверить ключи БЕЗ passphrase
working_key = test_keys_against_target(host, user, port, keys)
if working_key:
return working_key["private"]
# 3. Проверить ключи С passphrase (если пользователь знает)
for k in keys:
label = os.path.basename(k["private"])
# Проверим, не passphrase ли мешает
ok, _, err = check_ssh_connectivity(host, user, port, key_path=k["private"])
if not ok and ("passphrase" in err.lower() or "password" in err.lower()):
if confirm(f"Ключ '{label}' зашифрован. Ввести passphrase и попробовать", default="y"):
pw = prompt(f"Passphrase для {label} (ввод скрыт)")
if pw and try_key_with_passphrase(k["private"], pw):
# После ssh-add ключ должен работать
ok2, _, _ = check_ssh_connectivity(host, user, port, key_path=k["private"])
if ok2:
success(f"Ключ '{label}' разблокирован и работает!")
return k["private"]
else:
warn(f"Passphrase введён, но ключ всё равно не подходит для {host}")
# 4. Ничего не работает — выбор метода
print()
info("Выберите способ подключения к target:")
print(" 1 Пароль (sshpass) — ввести пароль прямо сейчас")
print(" 2 Сгенерировать временный ed25519 ключ — скрипт сам сделает ssh-copy-id")
print(" 0 Отмена — передать архив вручную")
choice = prompt("Ваш выбор", default="1").strip()
if choice == "2":
# Генерация временного ключа
temp_key = generate_temp_keypair(host.replace(".", "_"))
if not temp_key:
cerror("Не удалось сгенерировать ключ.")
return None
if ssh_copy_id(host, user, port, temp_key["public"]):
ok, _, _ = check_ssh_connectivity(host, user, port, key_path=temp_key["private"])
if ok:
success("Временный ключ настроен и работает!")
return temp_key["private"]
# ssh-copy-id не сработал — показываем инструкции
manual_copy_instructions(host, user, port, temp_key["public"])
raise RuntimeError("ssh-copy-id не удался. Настройте ключ вручную и запустите: docker-migrate --resume")
elif choice == "1":
# Пароль — caller запросит его позже
return None
else:
raise RuntimeError("Перенос отменён пользователем.")