224 lines
8.2 KiB
Python
224 lines
8.2 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
nginx.py — Discovery nginx-конфигов через nginx -T, lsof, ss
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import json
|
||
from core.runner import run, run_json, exists
|
||
from core.color import info, warn, success, log_cmd
|
||
|
||
|
||
def find_nginx_processes():
|
||
"""Ищет запущенные nginx master/worker процессы"""
|
||
procs = []
|
||
if exists("pgrep"):
|
||
out = run("pgrep -a nginx", check=False)
|
||
for ln in out.stdout.strip().splitlines():
|
||
parts = ln.strip().split(None, 1)
|
||
if parts:
|
||
pid = parts[0]
|
||
cmd = parts[1] if len(parts) > 1 else ""
|
||
procs.append({"pid": pid, "cmd": cmd})
|
||
return procs
|
||
|
||
|
||
def get_nginx_full_config():
|
||
"""
|
||
Получает раскрытый конфиг через nginx -T.
|
||
Если nginx не запущен/невалиден — ищем через find в /etc/nginx.
|
||
"""
|
||
if not exists("nginx"):
|
||
warn("nginx не установлен")
|
||
return None
|
||
|
||
# Пробуем nginx -T
|
||
result = run("nginx -T 2>&1", check=False)
|
||
if result.returncode == 0:
|
||
return result.stdout
|
||
|
||
warn("nginx -T не удалось (возможно, конфиг невалиден). Fallback на ручной поиск конфигов.")
|
||
return None
|
||
|
||
|
||
def find_nginx_conf_files():
|
||
"""Находит все .conf и конфиги nginx для ручного анализа"""
|
||
candidates = ["/etc/nginx", "/usr/local/etc/nginx", "/opt/local/etc/nginx"]
|
||
files = []
|
||
for base in candidates:
|
||
if os.path.isdir(base):
|
||
for root, _, fnames in os.walk(base):
|
||
for f in fnames:
|
||
if f.endswith(".conf") or f.endswith(".inc"):
|
||
files.append(os.path.join(root, f))
|
||
return files
|
||
|
||
|
||
def parse_nginx_config_dump(raw_text, service_ports, service_domain_hints):
|
||
"""
|
||
Парсит выхлоп nginx -T (раскрытый конфиг).
|
||
Ищет server{} блоки, связанные с сервисом по:
|
||
- proxy_pass на localhost/127.0.0.1 + наш порт
|
||
- server_name совпадает с доменом из env
|
||
Возвращает список: {"file": ..., "server_name": ..., "proxy_pass": ..., "ssl_cert": ..., "ssl_key": ...}
|
||
"""
|
||
# nginx -T выводит каждый файл с комментарием вида:
|
||
# # configuration file /etc/nginx/conf.d/site.conf:
|
||
file_comment_re = re.compile(r"#\s*configuration file\s+(.+?):")
|
||
|
||
blocks = []
|
||
current_file = None
|
||
current_block_lines = []
|
||
|
||
lines = raw_text.splitlines()
|
||
for ln in lines:
|
||
m = file_comment_re.match(ln)
|
||
if m:
|
||
current_file = m.group(1)
|
||
continue
|
||
current_block_lines.append(ln)
|
||
|
||
# Простая регулярка: ищем server{ ... } — ловим всё между server { и }
|
||
# Это не 100%, но для раскрытого конфига достаточно.
|
||
text = raw_text
|
||
servers = []
|
||
# Ищем server-блоки более грубо: поиск от server{ до следующей закрывающей } на нулевом уровне
|
||
i = 0
|
||
while i < len(text):
|
||
idx = text.find("server {", i)
|
||
if idx == -1:
|
||
break
|
||
# найдем закрывающую }
|
||
start = idx
|
||
depth = 0
|
||
j = start
|
||
while j < len(text):
|
||
if text[j] == '{':
|
||
depth += 1
|
||
elif text[j] == '}':
|
||
depth -= 1
|
||
if depth == 0:
|
||
break
|
||
j += 1
|
||
block = text[start:j+1]
|
||
|
||
# определим файл: ищем ближайший file comment до start
|
||
file_match = None
|
||
for m in file_comment_re.finditer(text[:start]):
|
||
file_match = m.group(1)
|
||
servers.append({"file": file_match, "block": block})
|
||
i = j + 1
|
||
|
||
related = []
|
||
# port hints (например, 8000, 443)
|
||
# domain hints (например, example.com)
|
||
for s in servers:
|
||
block = s["block"]
|
||
score = 0
|
||
proxy_pass_match = re.search(r"proxy_pass\s+(\S+)", block)
|
||
sn_match = re.search(r"server_name\s+([^;]+)", block)
|
||
ssl_cert_match = re.search(r"ssl_certificate\s+([^;]+)", block)
|
||
ssl_key_match = re.search(r"ssl_certificate_key\s+([^;]+)", block)
|
||
|
||
proxy_pass = proxy_pass_match.group(1) if proxy_pass_match else None
|
||
server_name = sn_match.group(1).strip() if sn_match else None
|
||
ssl_cert = ssl_cert_match.group(1).strip() if ssl_cert_match else None
|
||
ssl_key = ssl_key_match.group(1).strip() if ssl_key_match else None
|
||
|
||
# Если proxy_pass указывает на 127.0.0.1 или localhost и порт — проверяем
|
||
if proxy_pass:
|
||
for hp in service_ports:
|
||
if str(hp) in proxy_pass:
|
||
score += 100
|
||
if "127.0.0.1" in proxy_pass or "localhost" in proxy_pass:
|
||
score += 20
|
||
|
||
# server_name совпадает с доменом
|
||
if server_name:
|
||
for hint in service_domain_hints:
|
||
if hint in server_name:
|
||
score += 80
|
||
# wildcard *.domain
|
||
if hint.startswith("*."):
|
||
domain = hint.lstrip("*.")
|
||
if domain in server_name:
|
||
score += 80
|
||
|
||
if score > 0:
|
||
related.append({
|
||
"file": s["file"],
|
||
"server_name": server_name,
|
||
"proxy_pass": proxy_pass,
|
||
"ssl_certificate": ssl_cert,
|
||
"ssl_certificate_key": ssl_key,
|
||
"raw_block": block,
|
||
"score": score,
|
||
})
|
||
|
||
return related
|
||
|
||
|
||
def discover_nginx(service_ports, service_domain_hints):
|
||
"""
|
||
Главная функция discovery nginx.
|
||
service_ports: список портов из Docker (например, [8000, 443])
|
||
service_domain_hints: список доменов из .env или labels.
|
||
Возвращает list dict.
|
||
"""
|
||
info("Ищем связанные nginx-конфиги ...")
|
||
procs = find_nginx_processes()
|
||
if procs:
|
||
success(f"Найден nginx: {len(procs)} процесс(ов)")
|
||
else:
|
||
warn("nginx-просессы не найдены")
|
||
|
||
raw = get_nginx_full_config()
|
||
if raw:
|
||
related = parse_nginx_config_dump(raw, service_ports, service_domain_hints)
|
||
if related:
|
||
success(f"Найдено связанных nginx server-блоков: {len(related)}")
|
||
return related
|
||
else:
|
||
# fallback: поиск файлов и grep proxy_pass/server_name
|
||
files = find_nginx_conf_files()
|
||
related = []
|
||
for f in files:
|
||
try:
|
||
content = open(f, "r", encoding="utf-8", errors="ignore").read()
|
||
except Exception:
|
||
continue
|
||
score = 0
|
||
for hp in service_ports:
|
||
if f":{hp}" in content or f"127.0.0.1:{hp}" in content:
|
||
score += 100
|
||
for d in service_domain_hints:
|
||
if d in content:
|
||
score += 80
|
||
if score > 0:
|
||
related.append({
|
||
"file": f,
|
||
"score": score,
|
||
"method": "fallback_grep",
|
||
})
|
||
return related
|
||
|
||
|
||
def get_nginx_systemd_unit():
|
||
"""Ищет unit-файл nginx, возвращает уникальные (deduplicated по пути)"""
|
||
units = []
|
||
seen_paths = set()
|
||
for unit in ("nginx.service", "nginx"):
|
||
out = run(f"systemctl is-active {unit}", check=False)
|
||
if out.returncode == 0 or out.stdout.strip() in ("active", "inactive"):
|
||
# Найдём файл unit
|
||
try:
|
||
uout = run(f"systemctl show {unit} -p FragmentPath --value", check=False)
|
||
path = uout.stdout.strip()
|
||
if path and path not in seen_paths:
|
||
seen_paths.add(path)
|
||
units.append({"unit": unit, "path": path})
|
||
except Exception:
|
||
pass
|
||
return units
|