Files
docker-migrate/core/state.py

255 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
state.py — Управление состоянием (FSM) и resume
Состояние хранится в JSON файле рядом со скриптом (vars/lib)
"""
import os
import json
import sys
import signal
import atexit
from datetime import datetime
STATE_FILE = None
_LAST_STATE = None
# Жёсткий путь для state.json — один, навсегда
STATE_PATH = os.path.join(os.path.expanduser("~"), ".migrate", "state.json")
def _state_path():
global STATE_FILE
if STATE_FILE:
return STATE_FILE
# Всегда используем ~/.migrate/state.json
d = os.path.dirname(STATE_PATH)
try:
if not os.path.isdir(d):
os.makedirs(d, exist_ok=True)
except Exception:
pass
STATE_FILE = STATE_PATH
return STATE_FILE
def load_state():
global _LAST_STATE
sp = _state_path()
if os.path.isfile(sp):
try:
with open(sp, "r", encoding="utf-8") as f:
data = json.load(f)
_LAST_STATE = data
return data
except json.JSONDecodeError:
pass
_LAST_STATE = _default_state()
return _LAST_STATE
def _default_state():
return {
"stage": "INIT",
"mode": None,
"manifest_path": None,
"archive_path": None,
"target_host": None,
"target_user": None,
"target_port": 22,
"last_error": None,
"completed_steps": [],
"interrupted_at": None,
"paused": False,
"created_at": datetime.now().isoformat(),
}
def flush_state():
"""Сохраняет текущее _LAST_STATE на диск."""
global _LAST_STATE
sp = _state_path()
with open(sp, "w", encoding="utf-8") as f:
json.dump(_LAST_STATE, f, indent=2, ensure_ascii=False)
def save_state(state):
global _LAST_STATE
sp = _state_path()
state["updated_at"] = datetime.now().isoformat()
_LAST_STATE = state
try:
with open(sp, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2, ensure_ascii=False)
except Exception as e:
# Fallback: stderr
import sys
print(f"[STATE ERROR] Не удалось сохранить state в {sp}: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
def set_stage(stage, **kwargs):
state = load_state()
state["stage"] = stage
for k, v in kwargs.items():
state[k] = v
save_state(state)
return state
def mark_completed(step):
state = load_state()
if step not in state["completed_steps"]:
state["completed_steps"].append(step)
save_state(state)
def is_completed(step):
return step in load_state().get("completed_steps", [])
def set_error(step, stdout, stderr, suggestion=""):
state = load_state()
state["paused"] = True
state["last_error"] = {
"step": step,
"timestamp": datetime.now().isoformat(),
"stdout": stdout,
"stderr": stderr,
"suggestion": suggestion,
}
save_state(state)
def reset_state(mode=None):
"""Сбрасывает состояние для нового запуска (source, target или полностью)."""
global _LAST_STATE
state = load_state()
state["stage"] = "INIT"
state["mode"] = mode
state["completed_steps"] = []
state["interrupted_at"] = None
state["resumable_hint"] = None
state["last_error"] = None
state["paused"] = False
state["updated_at"] = datetime.now().isoformat()
_LAST_STATE = state
save_state(state)
def mark_completed(step):
state = load_state()
if step not in state["completed_steps"]:
state["completed_steps"].append(step)
save_state(state)
def is_completed(step):
return step in load_state().get("completed_steps", [])
def set_error(step, stdout, stderr, suggestion=""):
state = load_state()
state["paused"] = True
state["last_error"] = {
"step": step,
"timestamp": datetime.now().isoformat(),
"stdout": stdout,
"stderr": stderr,
"suggestion": suggestion,
}
save_state(state)
def clear_error():
state = load_state()
state["paused"] = False
state["last_error"] = None
save_state(state)
def reset_state(mode=None):
"""Сбрасывает состояние для нового запуска (source, target или полностью)."""
global _LAST_STATE
# Загружаем текущее, чтобы не потерять пути к архивам если пользователь хочет
# Но completed_steps и stage сбрасываем
state = load_state()
state["stage"] = "INIT"
state["mode"] = mode
state["completed_steps"] = []
state["interrupted_at"] = None
state["resumable_hint"] = None
state["last_error"] = None
state["paused"] = False
state["updated_at"] = datetime.now().isoformat()
_LAST_STATE = state
save_state(state)
def set_interrupted():
"""Помечает, что скрипт был прерван (Ctrl+C, SIGTERM)."""
state = load_state()
state["interrupted_at"] = datetime.now().isoformat()
state["resumable_hint"] = f"Скрипт был прерван на шаге '{state.get('stage', 'INIT')}'. Запустите: docker-migrate --resume"
save_state(state)
def recover_after_interrupt():
"""Проверяет, был ли предыдущий запуск прерван. Если да — сбрасывает флаг."""
state = load_state()
if state.get("interrupted_at"):
return {
"stage": state.get("stage"),
"mode": state.get("mode"),
"hint": state.pop("resumable_hint", ""),
}
return None
def show_status():
from core.color import info, warn, error as cerror, bold, divider
state = load_state()
div = divider
print("\n")
div()
info(f"Текущий stage: {bold(state.get('stage', 'INIT'))}")
info(f"Режим: {bold(state.get('mode') or 'не выбран')}")
info(f"Completed steps: {', '.join(state.get('completed_steps', [])) or '(none)'}")
if state.get("paused"):
warn("Скрипт на паузе из-за ошибки")
if state.get("interrupted_at"):
warn(f"Последний запуск был прерван в {state['interrupted_at']} на шаге '{state.get('stage', 'INIT')}'")
print(f" Запустите: docker-migrate --resume")
if state.get("manifest_path"):
info(f"Manifest: {state['manifest_path']}")
if state.get("archive_path"):
info(f"Archive: {state['archive_path']}")
if state.get("last_error"):
err = state["last_error"]
cerror(f"Последняя ошибка на шаге: {bold(err['step'])}")
print(f" Время: {err['timestamp']}")
if err.get("suggestion"):
print(f" Рекомендация: {err['suggestion']}")
div()
def show_logs():
state = load_state()
err = state.get("last_error")
if not err:
print("Нет сохранённых ошибок")
return
from core.color import error as cerror, bold
cerror(f"=== Лог ошибки (шаг: {bold(err['step'])}) ===")
print(f"Время: {err['timestamp']}\n")
if err.get("stdout"):
print("STDOUT:")
print(err["stdout"])
if err.get("stderr"):
print("\nSTDERR:")
print(err["stderr"])
if err.get("suggestion"):
print(f"\nРекомендация:")
print(err["suggestion"])