HEX
Server: LiteSpeed
System: Linux linux31.centraldnserver.com 4.18.0-553.83.1.lve.el8.x86_64 #1 SMP Wed Nov 12 10:04:12 UTC 2025 x86_64
User: salamatk (1501)
PHP: 8.1.33
Disabled: show_source, system, shell_exec, passthru, exec, popen, proc_open
Upload Files
File: //opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/wordpress/changelog_processor.py
"""Processor for WordPress rule disable/enable changelog files.

The PHP WordPress plugin writes rule change actions to changelog.php when a user
disables or enables protection rules from the WordPress admin panel. This module
reads, parses, and applies those actions to the agent database.

The changelog.php file uses the same format as incident files:
    <?php __halt_compiler();
    #{base64-encoded JSON for action 1}
    #{base64-encoded JSON for action 2}

Each JSON action has the form:
    {"action": "disable"|"enable", "rule_id": "xyz", "ts": ...}

The user_id stored with each action is the system UID of the WordPress site
owner (site.uid).
"""

import logging
from pathlib import Path

from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import MessageSink
from defence360agent.model.wordpress import WPSite, WordpressSite
from defence360agent.model.wp_disabled_rule import WPDisabledRule
from defence360agent.wordpress.cli import get_data_dir
from defence360agent.wordpress.incident_parser import IncidentFileParser
from defence360agent.wordpress.utils import parse_php_with_embedded_json

logger = logging.getLogger(__name__)

CHANGELOG_FILENAME = "changelog.php"
DISABLED_RULES_FILENAME = "disabled-rules.php"

ACTION_DISABLE = "disable"
ACTION_ENABLE = "enable"


class ChangelogProcessor:
    """Process WordPress rule disable/enable changelog files.

    Reads changelog.php from each site's data directory, applies
    disable/enable actions to the WPDisabledRule database, reports events
    to the correlation server, and deletes the file after processing.

    If no changelog exists (or no new entries), checks whether
    disabled-rules.php has been modified externally (e.g. backup restore)
    and flags the domain for regeneration.
    """

    def __init__(self) -> None:
        # changelog.php uses the same format as incident files
        # (base64-encoded JSON lines wrapped in PHP), so we reuse the parser
        self.parser = IncidentFileParser()

    async def process_changelogs_for_sites(
        self,
        sites: list[WPSite],
        sink: MessageSink | None,
    ) -> list[WPSite]:
        """Process changelog.php for all given sites.

        Args:
            sites: WordPress sites to process.
            sink: MessageSink for sending correlation events.

        Returns:
            Sites whose disabled rules were affected
            (needing disabled-rules.php regeneration).
        """
        affected: list[WPSite] = []

        for site in sites:
            if await self._process_site(site, sink):
                affected.append(site)

        if affected:
            logger.info(
                "Changelog processing affected %d site(s)",
                len(affected),
            )

        return affected

    async def _process_site(
        self,
        site: WPSite,
        sink: MessageSink | None,
    ) -> bool:
        """Process changelog.php for a single site.

        Args:
            site: WordPress site to process.
            sink: MessageSink for sending correlation events.

        Returns:
            True if the site's disabled rules were affected.
        """
        try:
            data_dir = await get_data_dir(site)
            if not data_dir.exists():
                return False

            changelog_path = data_dir / CHANGELOG_FILENAME
            if changelog_path.exists():
                if await self._process_changelog_file(
                    changelog_path, site, sink
                ):
                    return True

            if self._is_disabled_rules_file_stale(site, data_dir):
                return True

        except Exception as e:
            logger.error(
                "Error processing changelog for site %s: %s",
                site.docroot,
                e,
            )

        return False

    def _consume_changelog(
        self, changelog_path: Path, site: WPSite
    ) -> list[dict]:
        """Parse a changelog file and delete it.

        The file is deleted regardless of whether parsing succeeds.
        """
        try:
            return self.parser.parse_file(changelog_path)
        except (OSError, ValueError) as e:
            logger.error(
                "Failed to parse changelog for site %s: %s",
                site.docroot,
                e,
            )
            return []
        finally:
            try:
                changelog_path.unlink(missing_ok=True)
            except OSError as e:
                logger.error(
                    "Failed to delete changelog for site %s: %s",
                    site.docroot,
                    e,
                )

    async def _process_changelog_file(
        self,
        changelog_path: Path,
        site: WPSite,
        sink: MessageSink | None,
    ) -> bool:
        """Parse and apply actions from a changelog file.

        The file is always deleted after reading, even on parse errors.
        Actions older than the last sync timestamp are skipped to prevent
        stale changelog files (e.g. from backup restores) from undoing
        more recent changes.

        Returns:
            True if any DB changes occurred.
        """
        actions = self._consume_changelog(changelog_path, site)
        if not actions:
            return False

        last_sync_ts = self._get_last_sync_ts(site)

        changed = False
        for action in actions:
            try:
                timestamp = float(action.get("ts", 0))
                if timestamp <= 0:
                    raise ValueError(
                        "Missing or invalid timestamp in changelog action"
                        f" for rule {action.get('rule_id', '?')}"
                        f" on site {site.docroot}"
                    )
                if last_sync_ts is not None and timestamp <= last_sync_ts:
                    logger.info(
                        "Skipping stale changelog action for rule %s"
                        " on site %s (ts=%.0f <= sync_ts=%.0f)",
                        action.get("rule_id", "?"),
                        site.docroot,
                        timestamp,
                        last_sync_ts,
                    )
                    continue
                if self._process_action(action, site, timestamp):
                    changed = True
                await self._report_action(action, site, sink, timestamp)
            except ValueError as e:
                logger.warning("Skipping invalid changelog entry: %s", e)
            except Exception as e:
                logger.error(
                    "Failed to process changelog action %s for site %s: %s",
                    action,
                    site.docroot,
                    e,
                )

        logger.info(
            "Processed changelog for site %s: %d action(s), changed=%s",
            site.docroot,
            len(actions),
            changed,
        )
        return changed

    def _process_action(
        self, action: dict, site: WPSite, timestamp: float
    ) -> bool:
        """Apply a single changelog action to the database.

        Args:
            action: Parsed action dict with keys: action, rule_id, ts.
            site: The WordPress site the action belongs to.
            timestamp: Pre-resolved Unix timestamp for this action.

        Returns:
            True if the database state was modified.

        Raises:
            ValueError: If the action is missing required fields or has
                an unknown action type.
        """
        action_type = action.get("action")
        rule_id = action.get("rule_id")
        if not action_type or not rule_id:
            raise ValueError(
                f"Missing action or rule_id in changelog entry: {action}"
            )

        if action_type == ACTION_DISABLE:
            return self._apply_disable(rule_id, site, timestamp)
        elif action_type == ACTION_ENABLE:
            return self._apply_enable(rule_id, site)
        else:
            raise ValueError(
                f"Unknown changelog action '{action_type}'"
                f" for rule {rule_id} on site {site.docroot}"
            )

    @staticmethod
    def _get_last_sync_ts(site: WPSite) -> float | None:
        """Get the last disabled-rules sync timestamp for a site.

        Returns None if the site has no DB record or no sync timestamp,
        meaning all actions should be processed.
        """
        try:
            db_site = WordpressSite.get_by_id(site.docroot)
            return db_site.disabled_rules_sync_ts
        except WordpressSite.DoesNotExist:
            return None

    @staticmethod
    def _apply_disable(rule_id: str, site: WPSite, timestamp: float) -> bool:
        """Apply a disable action from the changelog.

        Returns:
            True if a new disable entry was created (not a no-op).
        """
        count = WPDisabledRule.store(
            rule_id=rule_id,
            domains=[site.domain],
            source=WPDisabledRule.SOURCE_WORDPRESS,
            user_id=site.uid,
            timestamp=timestamp,
        )
        return count > 0

    def _apply_enable(self, rule_id: str, site: WPSite) -> bool:
        """Apply an enable action from the changelog.

        Returns:
            True if a disable entry was removed.
        """
        count = WPDisabledRule.remove(
            rule_id=rule_id,
            domains=[site.domain],
        )
        return count > 0

    @staticmethod
    async def _report_action(
        action: dict,
        site: WPSite,
        sink: MessageSink | None,
        timestamp: float,
    ) -> None:
        """Send a rule change event to the correlation server.

        Must only be called for valid actions (after _process_action succeeds).
        """
        if sink is None:
            return

        action_type = action["action"]
        rule_id = action["rule_id"]

        if action_type == ACTION_DISABLE:
            message_cls = MessageType.WPRuleDisabled
        elif action_type == ACTION_ENABLE:
            message_cls = MessageType.WPRuleEnabled
        else:
            return

        try:
            await sink.process_message(
                message_cls(
                    plugin_id="wordpress",
                    rule=rule_id,
                    domains=[site.domain],
                    timestamp=timestamp,
                    user_id=site.uid,
                    source=WPDisabledRule.SOURCE_WORDPRESS,
                )
            )
        except Exception as e:
            logger.error(
                "Failed to report changelog action for rule %s on site %s: %s",
                rule_id,
                site.docroot,
                e,
            )

    @staticmethod
    def _is_disabled_rules_file_stale(
        site: WPSite,
        data_dir: Path,
    ) -> bool:
        """Check if disabled-rules.php was modified externally.

        Reads the embedded timestamp from the file and compares it against
        the stored sync timestamp in the database. If they differ
        (e.g. file restored from backup), returns True to trigger regeneration.
        """
        disabled_rules_path = data_dir / DISABLED_RULES_FILENAME
        if not disabled_rules_path.exists():
            return False

        try:
            content = disabled_rules_path.read_text()
            data = parse_php_with_embedded_json(content)
            file_ts = float(data.get("ts", 0))
        except (OSError, ValueError) as e:
            logger.warning(
                "Cannot read disabled-rules.php for site %s: %s",
                site.docroot,
                e,
            )
            return False

        try:
            db_site = WordpressSite.get_by_id(site.docroot)
        except WordpressSite.DoesNotExist:
            return False

        db_ts = db_site.disabled_rules_sync_ts
        return db_ts is None or abs(file_ts - db_ts) > 1.0