Files
docker-migrate/core/fsm.py

109 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
fsm.py — Конечный автомат (Finite State Machine) для миграции.
Каждый mode (source/target) имеет свой pipeline steps.
"""
from core import state
from core.color import header, subheader, success, error as cerror, warn, info, step
class FSM:
SOURCE_STEPS = [
"INIT",
"SOURCE_DISCOVER",
"SOURCE_MANIFEST_REVIEW",
"SOURCE_PACK",
"SOURCE_STOP_SERVICE",
"TRANSFER",
"DONE",
]
TARGET_STEPS = [
"INIT",
"TARGET_PREFLIGHT",
"TARGET_INSTALL",
"TARGET_BACKUP",
"TARGET_RESTORE",
"TARGET_VERIFY",
"DONE",
]
def __init__(self, mode="source"):
self.mode = mode
self.steps = self.SOURCE_STEPS if mode == "source" else self.TARGET_STEPS
def _index(self, stage):
try:
return self.steps.index(stage)
except ValueError:
return -1
def resume_from(self, stage):
idx = self._index(stage)
if idx < 0:
warn(f"Неизвестный stage '{stage}', начинаем с начала")
idx = 0
for s in self.steps[idx:]:
if state.is_completed(s):
info(f"Шаг {s} уже выполнен, пропускаем")
continue
self._run_step(s)
def _run_step(self, step_name):
header(f"ШАГ: {step_name}")
try:
if self.mode == "source":
self._source_step(step_name)
else:
self._target_step(step_name)
state.mark_completed(step_name)
state.set_stage(step_name)
except Exception as e:
msg = str(e)
cerror(f"Ошибка на шаге {step_name}: {msg}")
state.set_error(step_name, "", msg, suggestion="Исправьте проблему и запустите: docker-migrate --resume")
raise
def _source_step(self, name):
if name == "INIT":
info("Инициализация source-режима")
elif name == "SOURCE_DISCOVER":
from source.source import do_discovery
do_discovery()
elif name == "SOURCE_MANIFEST_REVIEW":
from source.source import do_manifest_review
do_manifest_review()
elif name == "SOURCE_PACK":
from source.source import do_pack
do_pack()
elif name == "SOURCE_STOP_SERVICE":
from source.source import do_stop_service
do_stop_service()
elif name == "TRANSFER":
from source.source import do_transfer_offer
do_transfer_offer()
elif name == "DONE":
success("Source-режим завершён")
def _target_step(self, name):
if name == "INIT":
info("Инициализация target-режима")
elif name == "TARGET_PREFLIGHT":
from target.target import do_preflight
do_preflight()
elif name == "TARGET_INSTALL":
from target.target import do_install
do_install()
elif name == "TARGET_BACKUP":
from target.target import do_backup_existing
do_backup_existing()
elif name == "TARGET_RESTORE":
from target.target import do_restore
do_restore()
elif name == "TARGET_VERIFY":
from target.target import do_verify
do_verify()
elif name == "DONE":
success("Target-режим завершён")