"""Tests for the resume_pending session continuity path.

Covers the behaviour introduced to fix the ``Gateway shutting down ...
task will be interrupted`` follow-up bug (spec: PR #11852, builds on
PRs #9850, #9934, #7536):

1. When a gateway restart drain times out and agents are force-interrupted,
   the affected sessions are flagged ``resume_pending=True`` — not
   ``suspended`` — so the next user message on the same session_key
   auto-resumes from the existing transcript instead of getting routed
   through ``suspend_recently_active()`` and converted into a fresh
   session.

2. ``suspended=True`` (from ``/stop`` or stuck-loop escalation) still
   wins over ``resume_pending`` — the forced-wipe path is preserved.

3. The restart-resume system note injected into the next user message is
   a superset of the existing tool-tail auto-continue note (from
   PR #9934), using session-entry metadata rather than just transcript
   shape so it fires even when the interrupted transcript does NOT end
   with a ``tool`` role.

4. The existing ``.restart_failure_counts`` stuck-loop counter from
   PR #7536 remains the single source of escalation — no parallel
   counter is added on ``SessionEntry``.
"""

import asyncio
import time
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from gateway.config import GatewayConfig, HomeChannel, Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType, SendResult
from gateway.run import (
    _auto_continue_freshness_window,
    _coerce_gateway_timestamp,
    _is_fresh_gateway_interruption,
    _last_transcript_timestamp,
    _should_clear_resume_pending_after_turn,
)
from gateway.session import SessionEntry, SessionSource, SessionStore
from tests.gateway.restart_test_helpers import (
    make_restart_runner,
    make_restart_source,
)


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


def test_resume_pending_is_cleared_only_after_successful_turn():
    """Interrupted/failed drain results must keep the restart recovery marker.

    Regression for dogfood failure: during gateway restart the interrupted run
    returned an empty final response and was normalized into a user-facing
    fallback, but the gateway cleared ``resume_pending`` before startup could
    auto-resume it.
    """
    assert _should_clear_resume_pending_after_turn({"final_response": "done"}) is True
    assert _should_clear_resume_pending_after_turn({"completed": True}) is True
    assert _should_clear_resume_pending_after_turn({"interrupted": True}) is False
    assert _should_clear_resume_pending_after_turn({"completed": False}) is False
    assert _should_clear_resume_pending_after_turn({"failed": True}) is False
    assert _should_clear_resume_pending_after_turn({"partial": True}) is False
    assert _should_clear_resume_pending_after_turn({"error": "boom"}) is False


def _make_source(platform=Platform.TELEGRAM, chat_id="123", user_id="u1"):
    return SessionSource(platform=platform, chat_id=chat_id, user_id=user_id)


def _make_store(tmp_path):
    return SessionStore(sessions_dir=tmp_path, config=GatewayConfig())


def _build_agent_history(history: list) -> list:
    """Mirror gateway/run.py's ``history → agent_history`` conversion.

    This is the transformation that strips ``timestamp`` off tool/tool_call
    rows before the agent sees them.  Tests that check the freshness gate
    must go through this conversion so they exercise the *real* data the
    note-injection code sees.
    """
    agent_history: list = []
    for msg in history:
        role = msg.get("role")
        if not role or role in ("session_meta", "system"):
            continue
        has_tool_calls = "tool_calls" in msg
        has_tool_call_id = "tool_call_id" in msg
        is_tool_message = role == "tool"
        if has_tool_calls or has_tool_call_id or is_tool_message:
            agent_history.append({k: v for k, v in msg.items() if k != "timestamp"})
        else:
            content = msg.get("content")
            if content:
                agent_history.append({"role": role, "content": content})
    return agent_history


def _simulate_note_injection(
    history: list,
    user_message: str,
    resume_entry: SessionEntry | None,
    *,
    agent_history: list | None = None,
    window_secs: float | None = None,
) -> str:
    """Mirror the note-injection logic in gateway/run.py _run_agent().

    The freshness signal reads ``history[-1].timestamp`` (the raw transcript
    row), NOT ``agent_history[-1].timestamp`` (which has been stripped).
    Tests pass the raw ``history`` — ``agent_history`` is derived from it
    via the real conversion if not supplied explicitly.
    """
    if agent_history is None:
        agent_history = _build_agent_history(history)

    window = (
        float(window_secs)
        if window_secs is not None
        else _auto_continue_freshness_window()
    )
    interruption_is_fresh = _is_fresh_gateway_interruption(
        _last_transcript_timestamp(history),
        window_secs=window,
    )

    message = user_message
    is_resume_pending = bool(
        resume_entry is not None
        and getattr(resume_entry, "resume_pending", False)
        and interruption_is_fresh
    )
    has_fresh_tool_tail = bool(
        agent_history
        and agent_history[-1].get("role") == "tool"
        and interruption_is_fresh
    )

    if is_resume_pending:
        reason = getattr(resume_entry, "resume_reason", None) or "restart_timeout"
        reason_phrase = (
            "a gateway restart"
            if reason == "restart_timeout"
            else "a gateway shutdown"
            if reason == "shutdown_timeout"
            else "a gateway interruption"
        )
        message = (
            f"[System note: Your previous turn in this session was interrupted "
            f"by {reason_phrase}. The conversation history below is intact. "
            f"If it contains unfinished tool result(s), process them first and "
            f"summarize what was accomplished, then address the user's new "
            f"message below.]\n\n"
            + message
        )
    elif has_fresh_tool_tail:
        message = (
            "[System note: Your previous turn was interrupted before you could "
            "process the last tool result(s). The conversation history contains "
            "tool outputs you haven't responded to yet. Please finish processing "
            "those results and summarize what was accomplished, then address the "
            "user's new message below.]\n\n"
            + message
        )
    return message


# ---------------------------------------------------------------------------
# SessionEntry field + serialization
# ---------------------------------------------------------------------------


class TestSessionEntryResumeFields:
    def test_defaults(self):
        now = datetime.now()
        entry = SessionEntry(
            session_key="agent:main:telegram:dm:1",
            session_id="sid",
            created_at=now,
            updated_at=now,
        )
        assert entry.resume_pending is False
        assert entry.resume_reason is None
        assert entry.last_resume_marked_at is None

    def test_roundtrip_with_resume_fields(self):
        now = datetime(2026, 4, 18, 12, 0, 0)
        entry = SessionEntry(
            session_key="agent:main:telegram:dm:1",
            session_id="sid",
            created_at=now,
            updated_at=now,
            resume_pending=True,
            resume_reason="restart_timeout",
            last_resume_marked_at=now,
        )
        restored = SessionEntry.from_dict(entry.to_dict())
        assert restored.resume_pending is True
        assert restored.resume_reason == "restart_timeout"
        assert restored.last_resume_marked_at == now

    def test_from_dict_legacy_without_resume_fields(self):
        """Old sessions.json without the new fields deserialize cleanly."""
        now = datetime.now()
        legacy = {
            "session_key": "agent:main:telegram:dm:1",
            "session_id": "sid",
            "created_at": now.isoformat(),
            "updated_at": now.isoformat(),
            "chat_type": "dm",
        }
        restored = SessionEntry.from_dict(legacy)
        assert restored.resume_pending is False
        assert restored.resume_reason is None
        assert restored.last_resume_marked_at is None

    def test_malformed_timestamp_is_tolerated(self):
        now = datetime.now()
        data = {
            "session_key": "k",
            "session_id": "sid",
            "created_at": now.isoformat(),
            "updated_at": now.isoformat(),
            "resume_pending": True,
            "resume_reason": "restart_timeout",
            "last_resume_marked_at": "not-a-timestamp",
        }
        restored = SessionEntry.from_dict(data)
        # resume_pending still honoured, only the broken timestamp drops
        assert restored.resume_pending is True
        assert restored.resume_reason == "restart_timeout"
        assert restored.last_resume_marked_at is None


# ---------------------------------------------------------------------------
# SessionStore.mark_resume_pending / clear_resume_pending
# ---------------------------------------------------------------------------


class TestMarkResumePending:
    def test_marks_existing_session(self, tmp_path):
        store = _make_store(tmp_path)
        source = _make_source()
        entry = store.get_or_create_session(source)

        assert store.mark_resume_pending(entry.session_key) is True
        refreshed = store._entries[entry.session_key]
        assert refreshed.resume_pending is True
        assert refreshed.resume_reason == "restart_timeout"
        assert refreshed.last_resume_marked_at is not None

    def test_custom_reason_persists(self, tmp_path):
        store = _make_store(tmp_path)
        source = _make_source()
        entry = store.get_or_create_session(source)

        store.mark_resume_pending(entry.session_key, reason="shutdown_timeout")
        assert store._entries[entry.session_key].resume_reason == "shutdown_timeout"

    def test_returns_false_for_unknown_key(self, tmp_path):
        store = _make_store(tmp_path)
        assert store.mark_resume_pending("no-such-key") is False

    def test_does_not_override_suspended(self, tmp_path):
        """suspended wins — mark_resume_pending is a no-op on a suspended entry."""
        store = _make_store(tmp_path)
        source = _make_source()
        entry = store.get_or_create_session(source)
        store.suspend_session(entry.session_key)

        assert store.mark_resume_pending(entry.session_key) is False
        e = store._entries[entry.session_key]
        assert e.suspended is True
        assert e.resume_pending is False

    def test_survives_roundtrip_through_json(self, tmp_path):
        store = _make_store(tmp_path)
        source = _make_source()
        entry = store.get_or_create_session(source)
        store.mark_resume_pending(entry.session_key, reason="restart_timeout")

        # Reload from disk
        store2 = _make_store(tmp_path)
        store2._ensure_loaded()
        reloaded = store2._entries[entry.session_key]
        assert reloaded.resume_pending is True
        assert reloaded.resume_reason == "restart_timeout"


class TestClearResumePending:
    def test_clears_flag(self, tmp_path):
        store = _make_store(tmp_path)
        source = _make_source()
        entry = store.get_or_create_session(source)
        store.mark_resume_pending(entry.session_key)

        assert store.clear_resume_pending(entry.session_key) is True
        e = store._entries[entry.session_key]
        assert e.resume_pending is False
        assert e.resume_reason is None
        assert e.last_resume_marked_at is None

    def test_returns_false_when_not_pending(self, tmp_path):
        store = _make_store(tmp_path)
        source = _make_source()
        entry = store.get_or_create_session(source)
        # Not marked
        assert store.clear_resume_pending(entry.session_key) is False

    def test_returns_false_for_unknown_key(self, tmp_path):
        store = _make_store(tmp_path)
        assert store.clear_resume_pending("no-such-key") is False


# ---------------------------------------------------------------------------
# SessionStore.get_or_create_session resume_pending behaviour
# ---------------------------------------------------------------------------


class TestGetOrCreateResumePending:
    def test_resume_pending_preserves_session_id(self, tmp_path):
        """This is THE core behavioural fix — resume_pending ≠ new session."""
        store = _make_store(tmp_path)
        source = _make_source()
        first = store.get_or_create_session(source)
        original_sid = first.session_id
        store.mark_resume_pending(first.session_key)

        second = store.get_or_create_session(source)
        assert second.session_id == original_sid
        assert second.was_auto_reset is False
        assert second.auto_reset_reason is None
        # Flag is NOT cleared on read — only on successful turn completion.
        assert second.resume_pending is True

    def test_suspended_still_creates_new_session(self, tmp_path):
        """Regression guard — suspended must still force a clean slate."""
        store = _make_store(tmp_path)
        source = _make_source()
        first = store.get_or_create_session(source)
        original_sid = first.session_id
        store.suspend_session(first.session_key)

        second = store.get_or_create_session(source)
        assert second.session_id != original_sid
        assert second.was_auto_reset is True
        assert second.auto_reset_reason == "suspended"

    def test_suspended_overrides_resume_pending(self, tmp_path):
        """Terminal escalation: a session that somehow has BOTH flags must
        behave like ``suspended`` — forced wipe + auto_reset_reason."""
        store = _make_store(tmp_path)
        source = _make_source()
        first = store.get_or_create_session(source)
        original_sid = first.session_id

        # Force the pathological state directly (normally mark_resume_pending
        # refuses to run when suspended=True, but a stuck-loop escalation
        # can set suspended=True AFTER resume_pending is set).
        with store._lock:
            e = store._entries[first.session_key]
            e.resume_pending = True
            e.resume_reason = "restart_timeout"
            e.suspended = True
            store._save()

        second = store.get_or_create_session(source)
        assert second.session_id != original_sid
        assert second.was_auto_reset is True
        assert second.auto_reset_reason == "suspended"


# ---------------------------------------------------------------------------
# SessionStore.suspend_recently_active skip behaviour
# ---------------------------------------------------------------------------


class TestSuspendRecentlyActiveSkipsResumePending:
    def test_resume_pending_entries_not_suspended(self, tmp_path):
        store = _make_store(tmp_path)
        source = _make_source()
        entry = store.get_or_create_session(source)
        store.mark_resume_pending(entry.session_key)

        count = store.suspend_recently_active()
        assert count == 0
        e = store._entries[entry.session_key]
        assert e.suspended is False
        assert e.resume_pending is True

    def test_non_resume_pending_gets_resume_pending(self, tmp_path):
        """Non-resume sessions are now marked resume_pending (not suspended)."""
        store = _make_store(tmp_path)
        source_a = _make_source(chat_id="a")
        source_b = _make_source(chat_id="b")
        entry_a = store.get_or_create_session(source_a)
        entry_b = store.get_or_create_session(source_b)
        store.mark_resume_pending(entry_a.session_key)

        count = store.suspend_recently_active()
        # entry_a is already resume_pending → skipped. entry_b gets marked.
        assert count == 1
        assert store._entries[entry_a.session_key].suspended is False
        assert store._entries[entry_b.session_key].resume_pending is True
        assert store._entries[entry_b.session_key].suspended is False


# ---------------------------------------------------------------------------
# Restart-resume system-note injection
# ---------------------------------------------------------------------------


class TestResumePendingSystemNote:
    def _pending_entry(self, reason="restart_timeout") -> SessionEntry:
        now = datetime.now()
        return SessionEntry(
            session_key="agent:main:telegram:dm:1",
            session_id="sid",
            created_at=now,
            updated_at=now,
            resume_pending=True,
            resume_reason=reason,
            last_resume_marked_at=now,
        )

    def test_resume_pending_restart_note_mentions_restart(self):
        entry = self._pending_entry(reason="restart_timeout")
        result = _simulate_note_injection(
            history=[
                {"role": "assistant", "content": "in progress", "timestamp": time.time()},
            ],
            user_message="what happened?",
            resume_entry=entry,
        )
        assert "[System note:" in result
        assert "gateway restart" in result
        assert "what happened?" in result

    def test_resume_pending_shutdown_note_mentions_shutdown(self):
        entry = self._pending_entry(reason="shutdown_timeout")
        result = _simulate_note_injection(
            history=[
                {"role": "assistant", "content": "in progress", "timestamp": time.time()},
            ],
            user_message="ping",
            resume_entry=entry,
        )
        assert "gateway shutdown" in result

    def test_resume_pending_fires_without_tool_tail(self):
        """Key improvement over PR #9934: the restart-resume note fires
        even when the transcript's last role is NOT ``tool``."""
        entry = self._pending_entry()
        history = [
            {"role": "user", "content": "run a long thing", "timestamp": time.time() - 10},
            {"role": "assistant", "content": "ok, starting...", "timestamp": time.time()},
        ]
        result = _simulate_note_injection(history, "ping", resume_entry=entry)
        assert "[System note:" in result
        assert "gateway restart" in result

    def test_resume_pending_subsumes_tool_tail_note(self):
        """When BOTH conditions are true, the restart-resume note wins —
        no duplicate notes."""
        entry = self._pending_entry()
        history = [
            {"role": "assistant", "content": None, "tool_calls": [
                {"id": "c1", "function": {"name": "x", "arguments": "{}"}},
            ], "timestamp": time.time() - 1},
            {"role": "tool", "tool_call_id": "c1", "content": "result",
             "timestamp": time.time()},
        ]
        result = _simulate_note_injection(history, "ping", resume_entry=entry)
        assert result.count("[System note:") == 1
        assert "gateway restart" in result
        # Old tool-tail wording absent
        assert "haven't responded to yet" not in result

    def test_no_resume_pending_preserves_tool_tail_note(self):
        """Regression: the old PR #9934 tool-tail behaviour is unchanged."""
        history = [
            {"role": "assistant", "content": None, "tool_calls": [
                {"id": "c1", "function": {"name": "x", "arguments": "{}"}},
            ], "timestamp": time.time() - 1},
            {"role": "tool", "tool_call_id": "c1", "content": "result",
             "timestamp": time.time()},
        ]
        result = _simulate_note_injection(history, "ping", resume_entry=None)
        assert "[System note:" in result
        assert "tool result" in result

    def test_stale_resume_pending_does_not_inject_restart_note(self):
        """Old restart markers must not revive an unrelated stale task.

        The transcript's last row is from an hour ago — well outside the
        default 1h freshness window (fixture uses window=1800 to exercise
        the stale path without tying the test to the production default).
        """
        entry = self._pending_entry()
        entry.last_resume_marked_at = datetime.now() - timedelta(hours=1)

        history = [
            {"role": "assistant", "content": "old in progress",
             "timestamp": time.time() - 3600},
        ]
        result = _simulate_note_injection(
            history=history,
            user_message="start a new task",
            resume_entry=entry,
            window_secs=1800,
        )
        assert result == "start a new task"

    def test_fresh_tool_tail_preserves_auto_continue_note(self):
        history = [
            {"role": "assistant", "content": None, "tool_calls": [
                {"id": "c1", "function": {"name": "x", "arguments": "{}"}},
            ], "timestamp": time.time() - 1},
            {
                "role": "tool",
                "tool_call_id": "c1",
                "content": "result",
                "timestamp": time.time(),
            },
        ]
        result = _simulate_note_injection(history, "ping", resume_entry=None)
        assert "[System note:" in result
        assert "tool result" in result

    def test_stale_tool_tail_does_not_inject_auto_continue_note(self):
        """The core bug fix: stale tool-tail must not revive a dead task.

        Uses window_secs=1800 (30 min) to verify the gate fires at 1h —
        keeps the test stable regardless of the production default.
        """
        history = [
            {"role": "assistant", "content": None, "tool_calls": [
                {"id": "c1", "function": {"name": "x", "arguments": "{}"}},
            ], "timestamp": time.time() - 3601},
            {
                "role": "tool",
                "tool_call_id": "c1",
                "content": "stale result",
                "timestamp": time.time() - 3600,
            },
        ]
        result = _simulate_note_injection(
            history,
            "start a new task",
            resume_entry=None,
            window_secs=1800,
        )
        assert result == "start a new task"

    def test_stale_tool_tail_with_production_data_shape(self):
        """Regression guard for #16802: exercise the REAL production path
        where ``agent_history`` has been stripped of timestamps.

        The original PR #16802 fix read ``agent_history[-1].get("timestamp")``
        — which is always ``None`` at runtime because the gateway strips
        ``timestamp`` off tool/tool_call rows in ``history → agent_history``.
        This test builds a stale history, runs it through the real
        ``_build_agent_history`` conversion, then asserts:

          1. The stripped ``agent_history`` carries NO timestamp (protects
             against someone "fixing" the original PR by re-adding the
             stripped field — which would break the API contract).
          2. The freshness gate still correctly classifies the transcript
             as stale because the signal is read from ``history`` BEFORE
             the strip.
          3. No auto-continue note is injected.
        """
        history = [
            {"role": "assistant", "content": None, "tool_calls": [
                {"id": "c1", "function": {"name": "x", "arguments": "{}"}},
            ], "timestamp": time.time() - 7201},
            {
                "role": "tool",
                "tool_call_id": "c1",
                "content": "stale result",
                "timestamp": time.time() - 7200,  # 2 hours old
            },
        ]
        agent_history = _build_agent_history(history)

        # Invariant 1: strip contract preserved
        assert agent_history[-1]["role"] == "tool"
        assert "timestamp" not in agent_history[-1], (
            "agent_history tool rows must NOT carry a timestamp — the "
            "freshness gate must read from raw history, not agent_history"
        )

        # Invariant 2+3: stale classification, no note injection
        result = _simulate_note_injection(
            history,
            "start a new task",
            resume_entry=None,
            agent_history=agent_history,
        )
        assert result == "start a new task"

    def test_freshness_gate_disabled_via_zero_window(self):
        """window_secs=0 restores pre-fix behaviour (always inject)."""
        history = [
            {"role": "assistant", "content": None, "tool_calls": [
                {"id": "c1", "function": {"name": "x", "arguments": "{}"}},
            ], "timestamp": time.time() - 86400},
            {
                "role": "tool",
                "tool_call_id": "c1",
                "content": "day-old result",
                "timestamp": time.time() - 86400,  # 24 hours old
            },
        ]
        result = _simulate_note_injection(
            history, "ping", resume_entry=None, window_secs=0,
        )
        assert "[System note:" in result
        assert "tool result" in result

    def test_legacy_history_without_timestamps_still_injects(self):
        """Transcripts predating timestamp persistence must keep the old
        behaviour — freshness unknown → treat as fresh."""
        history = [
            {"role": "assistant", "content": None, "tool_calls": [
                {"id": "c1", "function": {"name": "x", "arguments": "{}"}},
            ]},
            {"role": "tool", "tool_call_id": "c1", "content": "result"},
        ]
        result = _simulate_note_injection(history, "ping", resume_entry=None)
        assert "[System note:" in result
        assert "tool result" in result

    def test_no_note_when_nothing_to_resume(self):
        history = [
            {"role": "user", "content": "hello", "timestamp": time.time() - 2},
            {"role": "assistant", "content": "hi", "timestamp": time.time() - 1},
        ]
        result = _simulate_note_injection(history, "ping", resume_entry=None)
        assert result == "ping"


# ---------------------------------------------------------------------------
# Freshness helpers
# ---------------------------------------------------------------------------


class TestFreshnessHelpers:
    def test_coerce_datetime(self):
        now = datetime.now()
        assert _coerce_gateway_timestamp(now) == pytest.approx(now.timestamp(), abs=1e-3)

    def test_coerce_epoch_seconds(self):
        assert _coerce_gateway_timestamp(1_700_000_000) == 1_700_000_000.0
        assert _coerce_gateway_timestamp(1_700_000_000.5) == 1_700_000_000.5

    def test_coerce_epoch_milliseconds(self):
        # Values > 10^10 treated as ms
        assert _coerce_gateway_timestamp(1_700_000_000_000) == 1_700_000_000.0

    def test_coerce_iso_string(self):
        iso = "2026-04-18T12:00:00+00:00"
        expected = datetime.fromisoformat(iso).timestamp()
        assert _coerce_gateway_timestamp(iso) == pytest.approx(expected, abs=1e-3)

    def test_coerce_iso_string_with_z_suffix(self):
        iso_z = "2026-04-18T12:00:00Z"
        expected = datetime.fromisoformat("2026-04-18T12:00:00+00:00").timestamp()
        assert _coerce_gateway_timestamp(iso_z) == pytest.approx(expected, abs=1e-3)

    def test_coerce_numeric_string(self):
        assert _coerce_gateway_timestamp("1700000000") == 1_700_000_000.0

    def test_coerce_rejects_garbage(self):
        assert _coerce_gateway_timestamp(None) is None
        assert _coerce_gateway_timestamp("") is None
        assert _coerce_gateway_timestamp("not-a-timestamp") is None
        assert _coerce_gateway_timestamp(True) is None  # bool rejected
        assert _coerce_gateway_timestamp(False) is None
        assert _coerce_gateway_timestamp([1, 2, 3]) is None

    def test_is_fresh_unknown_is_fresh(self):
        """Legacy-compat: unknown timestamp → fresh."""
        assert _is_fresh_gateway_interruption(None) is True
        assert _is_fresh_gateway_interruption("not-a-timestamp") is True

    def test_is_fresh_window_bounds(self):
        now = 1_700_000_000.0
        # 1h window, 30min old → fresh
        assert _is_fresh_gateway_interruption(
            now - 1800, now=now, window_secs=3600,
        ) is True
        # 1h window, 2h old → stale
        assert _is_fresh_gateway_interruption(
            now - 7200, now=now, window_secs=3600,
        ) is False
        # 1h window, exactly at boundary → fresh (<=)
        assert _is_fresh_gateway_interruption(
            now - 3600, now=now, window_secs=3600,
        ) is True

    def test_is_fresh_zero_window_always_fresh(self):
        """Opt-out: window_secs=0 disables the gate entirely."""
        assert _is_fresh_gateway_interruption(
            0.0, now=1_700_000_000.0, window_secs=0,
        ) is True
        assert _is_fresh_gateway_interruption(
            -1.0, now=1_700_000_000.0, window_secs=-5,
        ) is True

    def test_last_transcript_timestamp_skips_meta(self):
        history = [
            {"role": "user", "content": "hi", "timestamp": 100.0},
            {"role": "assistant", "content": "hey", "timestamp": 200.0},
            {"role": "session_meta", "content": "tools:{}", "timestamp": 999.0},
            {"role": "system", "content": "ignore", "timestamp": 999.0},
        ]
        assert _last_transcript_timestamp(history) == 200.0

    def test_last_transcript_timestamp_empty(self):
        assert _last_transcript_timestamp([]) is None
        assert _last_transcript_timestamp(None) is None

    def test_last_transcript_timestamp_row_without_timestamp(self):
        """Legacy transcript row (no timestamp) returns None → caller
        treats as fresh."""
        history = [
            {"role": "user", "content": "hi"},
            {"role": "assistant", "content": "hey"},
        ]
        assert _last_transcript_timestamp(history) is None

    def test_auto_continue_freshness_window_reads_env(self, monkeypatch):
        monkeypatch.setenv("HERMES_AUTO_CONTINUE_FRESHNESS", "7200")
        assert _auto_continue_freshness_window() == 7200.0

    def test_auto_continue_freshness_window_default_when_unset(self, monkeypatch):
        monkeypatch.delenv("HERMES_AUTO_CONTINUE_FRESHNESS", raising=False)
        # Default is 1 hour
        assert _auto_continue_freshness_window() == 3600.0

    def test_auto_continue_freshness_window_malformed_falls_back(self, monkeypatch):
        monkeypatch.setenv("HERMES_AUTO_CONTINUE_FRESHNESS", "not-a-number")
        assert _auto_continue_freshness_window() == 3600.0

    def test_auto_continue_freshness_window_empty_falls_back(self, monkeypatch):
        monkeypatch.setenv("HERMES_AUTO_CONTINUE_FRESHNESS", "")
        assert _auto_continue_freshness_window() == 3600.0


# ---------------------------------------------------------------------------
# Drain-timeout path marks sessions resume_pending
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_drain_timeout_marks_resume_pending():
    """End-to-end: a drain timeout during gateway stop should flag every
    active session as resume_pending BEFORE the interrupt fires, so the
    next startup's suspend_recently_active() does not destroy them."""
    runner, adapter = make_restart_runner()
    adapter.disconnect = AsyncMock()
    runner._restart_drain_timeout = 0.05

    running_agent = MagicMock()
    session_key_one = "agent:main:telegram:dm:A"
    session_key_two = "agent:main:telegram:dm:B"
    runner._running_agents = {
        session_key_one: running_agent,
        session_key_two: MagicMock(),
    }

    # Plug a mock session_store that records marks.
    session_store = MagicMock()
    session_store.mark_resume_pending = MagicMock(return_value=True)
    runner.session_store = session_store

    with patch("gateway.status.remove_pid_file"), patch(
        "gateway.status.write_runtime_status"
    ):
        await runner.stop()

    # Both active sessions were marked with the shutdown_timeout reason.
    calls = session_store.mark_resume_pending.call_args_list
    marked = {args[0][0] for args in calls}
    assert marked == {session_key_one, session_key_two}
    for args in calls:
        assert args[0][1] == "shutdown_timeout"


@pytest.mark.asyncio
async def test_drain_timeout_uses_restart_reason_when_restarting():
    runner, adapter = make_restart_runner()
    adapter.disconnect = AsyncMock()
    runner._restart_drain_timeout = 0.05
    runner._restart_requested = True

    running_agent = MagicMock()
    runner._running_agents = {"agent:main:telegram:dm:A": running_agent}

    session_store = MagicMock()
    session_store.mark_resume_pending = MagicMock(return_value=True)
    runner.session_store = session_store

    with patch("gateway.status.remove_pid_file"), patch(
        "gateway.status.write_runtime_status"
    ):
        await runner.stop(restart=True, detached_restart=False, service_restart=True)

    calls = session_store.mark_resume_pending.call_args_list
    assert calls, "expected at least one mark_resume_pending call"
    for args in calls:
        assert args[0][1] == "restart_timeout"


@pytest.mark.asyncio
async def test_clean_drain_does_not_mark_resume_pending():
    """If the drain completes within timeout (no force-interrupt), no
    sessions should be flagged — the normal shutdown path is unchanged."""
    runner, adapter = make_restart_runner()
    adapter.disconnect = AsyncMock()

    running_agent = MagicMock()
    runner._running_agents = {"agent:main:telegram:dm:A": running_agent}

    # Finish the agent before the (generous) drain deadline
    async def finish_agent():
        await asyncio.sleep(0.05)
        runner._running_agents.clear()

    asyncio.create_task(finish_agent())

    session_store = MagicMock()
    session_store.mark_resume_pending = MagicMock(return_value=True)
    runner.session_store = session_store

    with patch("gateway.status.remove_pid_file"), patch(
        "gateway.status.write_runtime_status"
    ):
        await runner.stop()

    session_store.mark_resume_pending.assert_not_called()
    running_agent.interrupt.assert_not_called()


@pytest.mark.asyncio
async def test_drain_timeout_only_marks_still_running_sessions():
    """A session that finished gracefully during the drain window must
    NOT be marked ``resume_pending`` — it completed cleanly and its
    next turn should be a normal fresh turn, not one prefixed with the
    restart-interruption system note.

    Regression guard for using ``self._running_agents`` at timeout
    rather than the ``active_agents`` drain-start snapshot.
    """
    runner, adapter = make_restart_runner()
    adapter.disconnect = AsyncMock()
    # Long enough for the finisher to exit, short enough to still time out
    # with the stuck session still present.
    runner._restart_drain_timeout = 0.3

    session_key_finisher = "agent:main:telegram:dm:A"
    session_key_stuck = "agent:main:telegram:dm:B"
    runner._running_agents = {
        session_key_finisher: MagicMock(),
        session_key_stuck: MagicMock(),
    }

    async def finish_one():
        await asyncio.sleep(0.05)
        runner._running_agents.pop(session_key_finisher, None)

    asyncio.create_task(finish_one())

    session_store = MagicMock()
    session_store.mark_resume_pending = MagicMock(return_value=True)
    runner.session_store = session_store

    with patch("gateway.status.remove_pid_file"), patch(
        "gateway.status.write_runtime_status"
    ):
        await runner.stop()

    calls = session_store.mark_resume_pending.call_args_list
    marked = {args[0][0] for args in calls}
    # Only the session still running at timeout is marked; the finisher is not.
    assert marked == {session_key_stuck}


@pytest.mark.asyncio
async def test_drain_timeout_skips_pending_sentinel_sessions():
    """Pending sentinels — sessions whose AIAgent construction hasn't
    produced a real agent yet — are skipped by
    ``_interrupt_running_agents()``.  The resume_pending marking must
    mirror that: no agent started means no turn was interrupted.
    """
    from gateway.run import _AGENT_PENDING_SENTINEL

    runner, adapter = make_restart_runner()
    adapter.disconnect = AsyncMock()
    runner._restart_drain_timeout = 0.05

    session_key_real = "agent:main:telegram:dm:A"
    session_key_sentinel = "agent:main:telegram:dm:B"
    runner._running_agents = {
        session_key_real: MagicMock(),
        session_key_sentinel: _AGENT_PENDING_SENTINEL,
    }

    session_store = MagicMock()
    session_store.mark_resume_pending = MagicMock(return_value=True)
    runner.session_store = session_store

    with patch("gateway.status.remove_pid_file"), patch(
        "gateway.status.write_runtime_status"
    ):
        await runner.stop()

    calls = session_store.mark_resume_pending.call_args_list
    marked = {args[0][0] for args in calls}
    assert marked == {session_key_real}


# ---------------------------------------------------------------------------
# Gateway startup auto-resume
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_startup_auto_resume_schedules_fresh_pending_sessions():
    """Fresh resume_pending sessions should continue automatically after startup.

    This closes the UX gap where restart recovery only happened if the user sent
    another message after the gateway came back.
    """
    runner, adapter = make_restart_runner()
    source = make_restart_source(chat_id="resume-chat", thread_id="topic-1")
    pending_entry = SessionEntry(
        session_key="agent:main:telegram:group:resume-chat:topic-1",
        session_id="sid",
        created_at=datetime.now(),
        updated_at=datetime.now(),
        origin=source,
        platform=Platform.TELEGRAM,
        chat_type="group",
        resume_pending=True,
        resume_reason="restart_timeout",
        last_resume_marked_at=datetime.now(),
    )
    runner.session_store._entries = {pending_entry.session_key: pending_entry}
    adapter.handle_message = AsyncMock()

    scheduled = runner._schedule_resume_pending_sessions()
    await asyncio.sleep(0)

    assert scheduled == 1
    adapter.handle_message.assert_awaited_once()
    event = adapter.handle_message.await_args.args[0]
    assert isinstance(event, MessageEvent)
    assert event.internal is True
    assert event.message_type == MessageType.TEXT
    assert event.source == source
    # Text is empty — the existing _is_resume_pending branch in
    # _handle_message_with_agent owns the system-note injection so we don't
    # double it up.
    assert event.text == ""


@pytest.mark.asyncio
async def test_startup_auto_resume_includes_crash_recovery():
    """Crash-recovered sessions (reason=restart_interrupted) are also auto-resumed.

    suspend_recently_active() marks in-flight sessions with resume_reason
    "restart_interrupted" when the previous gateway exit was not clean
    (crash/SIGKILL/OOM).  These should get the same magic continuation as
    drain-timeout interruptions.
    """
    runner, adapter = make_restart_runner()
    source = make_restart_source(chat_id="crash-chat")
    pending_entry = SessionEntry(
        session_key="agent:main:telegram:dm:crash-chat",
        session_id="sid",
        created_at=datetime.now(),
        updated_at=datetime.now(),
        origin=source,
        platform=Platform.TELEGRAM,
        chat_type="dm",
        resume_pending=True,
        resume_reason="restart_interrupted",
        last_resume_marked_at=datetime.now(),
    )
    runner.session_store._entries = {pending_entry.session_key: pending_entry}
    adapter.handle_message = AsyncMock()

    scheduled = runner._schedule_resume_pending_sessions()
    await asyncio.sleep(0)

    assert scheduled == 1
    adapter.handle_message.assert_awaited_once()


@pytest.mark.asyncio
async def test_startup_auto_resume_skips_stale_entries():
    """Entries older than the freshness window must not be auto-resumed."""
    runner, adapter = make_restart_runner()
    source = make_restart_source(chat_id="stale-chat")
    stale_marker = datetime.now() - timedelta(
        seconds=_auto_continue_freshness_window() + 60
    )
    stale_entry = SessionEntry(
        session_key="agent:main:telegram:dm:stale-chat",
        session_id="sid",
        created_at=stale_marker,
        updated_at=stale_marker,
        origin=source,
        platform=Platform.TELEGRAM,
        chat_type="dm",
        resume_pending=True,
        resume_reason="restart_timeout",
        last_resume_marked_at=stale_marker,
    )
    runner.session_store._entries = {stale_entry.session_key: stale_entry}
    adapter.handle_message = AsyncMock()

    scheduled = runner._schedule_resume_pending_sessions()

    assert scheduled == 0
    adapter.handle_message.assert_not_called()


@pytest.mark.asyncio
async def test_startup_auto_resume_skips_suspended_and_originless():
    """suspended entries and entries with no origin are excluded."""
    runner, adapter = make_restart_runner()
    source = make_restart_source(chat_id="ok")
    suspended_entry = SessionEntry(
        session_key="agent:main:telegram:dm:suspended",
        session_id="sid-s",
        created_at=datetime.now(),
        updated_at=datetime.now(),
        origin=source,
        platform=Platform.TELEGRAM,
        chat_type="dm",
        resume_pending=True,
        resume_reason="restart_timeout",
        suspended=True,
        last_resume_marked_at=datetime.now(),
    )
    originless = SessionEntry(
        session_key="agent:main:telegram:dm:originless",
        session_id="sid-o",
        created_at=datetime.now(),
        updated_at=datetime.now(),
        origin=None,
        platform=Platform.TELEGRAM,
        chat_type="dm",
        resume_pending=True,
        resume_reason="restart_timeout",
        last_resume_marked_at=datetime.now(),
    )
    runner.session_store._entries = {
        suspended_entry.session_key: suspended_entry,
        originless.session_key: originless,
    }
    adapter.handle_message = AsyncMock()

    scheduled = runner._schedule_resume_pending_sessions()

    assert scheduled == 0
    adapter.handle_message.assert_not_called()


@pytest.mark.asyncio
async def test_startup_auto_resume_skips_disallowed_reasons():
    """Reasons outside the auto-resume set (e.g. a future custom reason) are skipped.

    These sessions still auto-resume on the next real user message via the
    existing _is_resume_pending branch — we just don't synthesize a turn
    for them at startup.
    """
    runner, adapter = make_restart_runner()
    source = make_restart_source(chat_id="other")
    other_entry = SessionEntry(
        session_key="agent:main:telegram:dm:other",
        session_id="sid",
        created_at=datetime.now(),
        updated_at=datetime.now(),
        origin=source,
        platform=Platform.TELEGRAM,
        chat_type="dm",
        resume_pending=True,
        resume_reason="manual_resume_request",
        last_resume_marked_at=datetime.now(),
    )
    runner.session_store._entries = {other_entry.session_key: other_entry}
    adapter.handle_message = AsyncMock()

    scheduled = runner._schedule_resume_pending_sessions()

    assert scheduled == 0
    adapter.handle_message.assert_not_called()


@pytest.mark.asyncio
async def test_startup_auto_resume_skips_when_adapter_unavailable():
    runner, adapter = make_restart_runner()
    source = make_restart_source(chat_id="resume-chat")
    pending_entry = SessionEntry(
        session_key="agent:main:telegram:dm:resume-chat",
        session_id="sid",
        created_at=datetime.now(),
        updated_at=datetime.now(),
        origin=source,
        platform=Platform.TELEGRAM,
        chat_type="dm",
        resume_pending=True,
        resume_reason="restart_timeout",
        last_resume_marked_at=datetime.now(),
    )
    runner.session_store._entries = {pending_entry.session_key: pending_entry}
    runner.adapters = {}
    adapter.handle_message = AsyncMock()

    scheduled = runner._schedule_resume_pending_sessions()

    assert scheduled == 0
    adapter.handle_message.assert_not_called()


# ---------------------------------------------------------------------------
# Shutdown banner wording
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_restart_banner_uses_try_to_resume_wording():
    """The notification sent before drain should hedge the resume promise
    — the session-continuity fix is best-effort (stuck-loop counter can
    still escalate to suspended)."""
    runner, adapter = make_restart_runner()
    runner._restart_requested = True
    runner._running_agents["agent:main:telegram:dm:999"] = MagicMock()

    await runner._notify_active_sessions_of_shutdown()

    assert len(adapter.sent) == 1
    msg = adapter.sent[0]
    assert "restarting" in msg
    assert "try to resume" in msg


@pytest.mark.asyncio
async def test_restart_notifies_home_channel_even_without_active_sessions():
    runner, adapter = make_restart_runner()
    runner._restart_requested = True
    runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
        platform=Platform.TELEGRAM,
        chat_id="home-42",
        name="Ops Home",
    )

    await runner._notify_active_sessions_of_shutdown()

    assert adapter.sent == [
        "⚠️ Gateway restarting — Your current task will be interrupted. "
        "Send any message after restart and I'll try to resume where you left off."
    ]


@pytest.mark.asyncio
async def test_restart_home_channel_notification_dedupes_active_chat():
    runner, adapter = make_restart_runner()
    runner._restart_requested = True
    runner._running_agents["agent:main:telegram:dm:999"] = MagicMock()
    runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
        platform=Platform.TELEGRAM,
        chat_id="999",
        name="Ops Home",
    )

    await runner._notify_active_sessions_of_shutdown()

    assert len(adapter.sent) == 1


@pytest.mark.asyncio
async def test_restart_home_channel_notification_not_deduped_across_threads():
    runner, adapter = make_restart_runner()
    runner._restart_requested = True
    session_key = "agent:main:telegram:group:999"
    runner.session_store._entries[session_key] = MagicMock(
        origin=SessionSource(
            platform=Platform.TELEGRAM,
            chat_id="999",
            chat_type="group",
            user_id="u1",
            thread_id="topic-7",
        )
    )
    runner._running_agents[session_key] = MagicMock()
    runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
        platform=Platform.TELEGRAM,
        chat_id="999",
        name="Ops Home",
    )

    await runner._notify_active_sessions_of_shutdown()

    assert len(adapter.sent) == 2
    assert adapter.sent_calls[0][2] == {"thread_id": "topic-7"}
    assert adapter.sent_calls[1][2] is None


@pytest.mark.asyncio
async def test_restart_home_channel_notification_ignores_false_send_result():
    runner, adapter = make_restart_runner()
    runner._restart_requested = True
    runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
        platform=Platform.TELEGRAM,
        chat_id="home-42",
        name="Ops Home",
    )
    adapter.send = AsyncMock(return_value=SendResult(success=False, error="network down"))

    await runner._notify_active_sessions_of_shutdown()

    adapter.send.assert_called_once()


# ---------------------------------------------------------------------------
# Stuck-loop escalation integration
# ---------------------------------------------------------------------------


class TestStuckLoopEscalation:
    """The existing .restart_failure_counts counter (PR #7536) remains the
    single source of terminal escalation — no parallel counter on
    SessionEntry was added.  After the configured threshold, the startup
    path flips suspended=True which overrides resume_pending."""

    def test_escalation_via_stuck_loop_counter_overrides_resume_pending(
        self, tmp_path, monkeypatch
    ):
        """Simulate a session that keeps getting restart-interrupted and
        hits the stuck-loop threshold: next startup should force it to
        fresh-session despite resume_pending being set."""
        import json

        from gateway.run import GatewayRunner

        store = _make_store(tmp_path)
        source = _make_source()
        entry = store.get_or_create_session(source)
        store.mark_resume_pending(entry.session_key, reason="restart_timeout")

        # Simulate counter already at threshold (3 consecutive interrupted
        # restarts).  _suspend_stuck_loop_sessions will flip suspended=True.
        counts_file = tmp_path / ".restart_failure_counts"
        counts_file.write_text(json.dumps({entry.session_key: 3}))

        monkeypatch.setattr("gateway.run._hermes_home", tmp_path)
        runner = object.__new__(GatewayRunner)
        runner.session_store = store

        suspended_count = GatewayRunner._suspend_stuck_loop_sessions(runner)
        assert suspended_count == 1
        assert store._entries[entry.session_key].suspended is True
        # resume_pending is still set on the entry, but suspended wins in
        # get_or_create_session so the next message still gets a new sid.
        second = store.get_or_create_session(source)
        assert second.session_id != entry.session_id
        assert second.auto_reset_reason == "suspended"

    def test_successful_turn_flow_clears_both_counter_and_resume_pending(
        self, tmp_path, monkeypatch
    ):
        """The gateway's post-turn cleanup should clear both signals so a
        future restart-interrupt starts with a fresh counter."""
        import json

        from gateway.run import GatewayRunner

        store = _make_store(tmp_path)
        source = _make_source()
        entry = store.get_or_create_session(source)
        store.mark_resume_pending(entry.session_key, reason="restart_timeout")

        counts_file = tmp_path / ".restart_failure_counts"
        counts_file.write_text(json.dumps({entry.session_key: 2}))

        monkeypatch.setattr("gateway.run._hermes_home", tmp_path)
        runner = object.__new__(GatewayRunner)
        runner.session_store = store

        GatewayRunner._clear_restart_failure_count(runner, entry.session_key)
        store.clear_resume_pending(entry.session_key)

        assert store._entries[entry.session_key].resume_pending is False
        assert not counts_file.exists()

    def test_increment_restart_failure_counts_uses_atomic_json_write(
        self, tmp_path, monkeypatch
    ):
        from gateway.run import GatewayRunner

        source = _make_source()
        session_key = _make_store(tmp_path).get_or_create_session(source).session_key

        monkeypatch.setattr("gateway.run._hermes_home", tmp_path)
        calls = []

        def _fake_atomic_json_write(path, payload, **kwargs):
            calls.append((path, payload, kwargs))

        monkeypatch.setattr("gateway.run.atomic_json_write", _fake_atomic_json_write)

        runner = object.__new__(GatewayRunner)
        runner._increment_restart_failure_counts({session_key})

        assert calls == [
            (
                tmp_path / ".restart_failure_counts",
                {session_key: 1},
                {"indent": None},
            )
        ]

    def test_clear_restart_failure_count_uses_atomic_json_write_when_entries_remain(
        self, tmp_path, monkeypatch
    ):
        import json

        from gateway.run import GatewayRunner

        source = _make_source()
        session_key = _make_store(tmp_path).get_or_create_session(source).session_key
        other_key = "agent:main:telegram:dm:other"
        counts_file = tmp_path / ".restart_failure_counts"
        counts_file.write_text(
            json.dumps({session_key: 2, other_key: 1}),
            encoding="utf-8",
        )

        monkeypatch.setattr("gateway.run._hermes_home", tmp_path)
        calls = []

        def _fake_atomic_json_write(path, payload, **kwargs):
            calls.append((path, payload, kwargs))

        monkeypatch.setattr("gateway.run.atomic_json_write", _fake_atomic_json_write)

        runner = object.__new__(GatewayRunner)
        runner._clear_restart_failure_count(session_key)

        assert calls == [
            (
                tmp_path / ".restart_failure_counts",
                {other_key: 1},
                {"indent": None},
            )
        ]
