HEX
Server: LiteSpeed
System: Linux linux31.centraldnserver.com 4.18.0-553.83.1.lve.el8.x86_64 #1 SMP Wed Nov 12 10:04:12 UTC 2025 x86_64
User: salamatk (1501)
PHP: 8.1.33
Disabled: show_source, system, shell_exec, passthru, exec, popen, proc_open
Upload Files
File: //opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/simple_rpc/wp_disabled_rules.py
"""RPC endpoints for WordPress disabled protection rules."""

import asyncio
import logging
import pwd
import time

from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import MessageSink
from defence360agent.files import Index, WP_RULES
from defence360agent.model.wp_disabled_rule import WPDisabledRule
from defence360agent.rpc_tools import ValidationError
from defence360agent.rpc_tools.lookup import CommonEndpoints, bind
from defence360agent.subsys.panels import hosting_panel
from defence360agent.utils import Scope, log_future_errors
from defence360agent.wordpress.changelog_processor import (
    ChangelogProcessor,
)
from defence360agent.wordpress.plugin import (
    redeploy_wp_rules,
    update_disabled_rules_on_sites,
)
from defence360agent.wordpress.site_repository import (
    get_installed_sites_by_domains,
)
from defence360agent.wordpress.wp_rules import get_wp_rules_data

logger = logging.getLogger(__name__)


async def _get_user_domains(user: str) -> list[str]:
    """
    Get domains for a user from the hosting panel.

    Returns:
        List of domains the user owns, or empty list on error.
    """
    try:
        hp = hosting_panel.HostingPanel()
        domains_per_user = await hp.get_domains_per_user()
        return domains_per_user.get(user, [])
    except Exception as e:
        logger.warning("Failed to get domains for user %s: %s", user, e)
        return []


async def _validate_user_domains(
    user: str, domains: list[str] | None
) -> list[str]:
    """
    Validate and filter domains for a non-root user.

    If no domains specified, returns all user's domains.
    If domains specified, filters to only those the user owns.

    Args:
        user: Username to validate domains for
        domains: Requested domains, or None for all user's domains

    Returns:
        List of validated domains the user can access

    Raises:
        ValidationError: If user has no domains or no access to requested domains
    """
    user_domains = await _get_user_domains(user)
    if not domains:
        if not user_domains:
            raise ValidationError("No domains found for user")
        return user_domains

    authorized_domains = [d for d in domains if d in user_domains]
    if not authorized_domains:
        raise ValidationError(
            "You don't have access to any of the specified domains"
        )
    return authorized_domains


def _enrich_with_metadata(
    disabled_rules: list[dict], wp_rules_data: dict | None
) -> list[dict]:
    """
    Enrich disabled rules with metadata from wp-rules.yaml.

    Args:
        disabled_rules: List of disabled rule dicts from WPDisabledRule.fetch()
        wp_rules_data: Parsed wp-rules.yaml data, or None if unavailable

    Returns:
        List of enriched rule dicts with component and versions added
    """
    enriched = []
    for rule in disabled_rules:
        rule_id = rule["rule_id"]
        metadata = wp_rules_data.get(rule_id, {}) if wp_rules_data else {}

        enriched.append(
            {
                **rule,
                "component": metadata.get("target"),
                "versions": metadata.get("versions"),
            }
        )

    return enriched


async def _jit_sync_changelogs(
    domains: list[str], sink: MessageSink | None = None
) -> None:
    """Process pending changelog files for the given domains before an API change.

    This "Just-in-Time" sync ensures the database reflects any WordPress-side
    changes before the agent applies its own disable/enable operation.
    File regeneration (disabled-rules.php) is intentionally skipped here because
    the calling API endpoint will regenerate files after its own DB mutation.
    """
    try:
        sites = get_installed_sites_by_domains(domains)
        if not sites:
            return
        await ChangelogProcessor().process_changelogs_for_sites(
            sites, sink=sink
        )
    except Exception as e:
        logger.warning("JIT changelog sync failed: %s", e, exc_info=True)


class WPDisabledRulesEndpoints(CommonEndpoints):
    """Endpoints for listing disabled WordPress protection rules."""

    SCOPE = Scope.AV_IM360

    @bind("wordpress-plugin", "rules", "list-disabled")
    async def list_disabled_rules(
        self,
        limit: int = 50,
        offset: int = 0,
        domains: list[str] | None = None,
        user: str | None = None,
    ) -> tuple[int, list[dict]]:
        """
        List disabled WordPress protection rules with metadata.

        When user is provided, returns rules for that user's domains.
        Otherwise, returns all disabled rules.

        Args:
            limit: Maximum number of rules to return
            offset: Number of rules to skip
            domains: Filter by specific domains (optional)
            user: Username (populated by middleware)

        Returns:
            Tuple of (total_count, list of enriched rule dicts)
        """

        if user:
            user_domains = await _get_user_domains(user)

            if not domains:
                domains = user_domains
            else:
                domains = [d for d in domains if d in user_domains]
                # if user cannot access any of the requested domains, return empty list
                if not domains:
                    return 0, []

        # Fetch disabled rules from database
        # Root users see all rules (including global), non-root only see their domain rules
        total_count, disabled_rules = WPDisabledRule.fetch(
            limit=limit,
            offset=offset,
            user_domains=domains,
            include_global=user is None,
        )

        # Load wp-rules metadata for enrichment
        try:
            wp_rules_index = Index(WP_RULES, integrity_check=False)
            wp_rules_data = get_wp_rules_data(wp_rules_index)
        except Exception as e:
            logger.warning("Failed to load wp-rules data: %s", e)
            wp_rules_data = None

        # Enrich with metadata
        enriched_rules = _enrich_with_metadata(disabled_rules, wp_rules_data)

        return total_count, enriched_rules

    async def _toggle_rule(
        self,
        action: str,
        rule: str,
        domains: list[str] | None,
        user: str | None,
    ) -> dict:
        """Shared implementation for disable/enable rule endpoints."""
        if user is None:
            user_id = 0
        else:
            try:
                user_id = pwd.getpwnam(user).pw_uid
            except KeyError:
                raise ValidationError(f"User '{user}' not found")

        if user:
            domains = await _validate_user_domains(user, domains)

        # JIT Sync: process pending changelogs before applying API changes.
        # Skipped for global operations (domains=None) because global and
        # domain-level disables are independent scopes and cannot conflict.
        if domains:
            await _jit_sync_changelogs(domains, self._sink)

        if action == "disable":
            WPDisabledRule.store(
                rule_id=rule,
                domains=domains,
                source=WPDisabledRule.SOURCE_AGENT,
                user_id=user_id,
            )
            message_cls = MessageType.WPRuleDisabled
        else:
            WPDisabledRule.remove(rule_id=rule, domains=domains)
            message_cls = MessageType.WPRuleEnabled

        try:
            await self._sink.process_message(
                message_cls(
                    plugin_id="wordpress",
                    rule=rule,
                    domains=domains or [],
                    timestamp=time.time(),
                    user_id=user_id,
                    source=WPDisabledRule.SOURCE_AGENT,
                )
            )
        except Exception as e:
            logger.error(
                "Failed to report rule %s for %s: %s", action, rule, e
            )

        if domains:
            # Domain-specific: update disabled-rules.php for affected sites
            task = asyncio.create_task(
                update_disabled_rules_on_sites(domains=domains)
            )
        else:
            # Global: re-deploy rules.php with the rule filtered out
            task = asyncio.create_task(redeploy_wp_rules())
        task.add_done_callback(log_future_errors)

        return {}

    @bind("wordpress-plugin", "rules", "disable")
    async def disable_rule(
        self,
        rule: str,
        domains: list[str] | None = None,
        user: str | None = None,
    ) -> dict:
        """
        Disable a WordPress protection rule globally or for specific domains.

        Root users can disable globally (no domains) or for specific domains.
        Non-root users can disable for all their domains (by specifying no
        domains) or for specific domains.
        Non-root users can only disable for domains they own.

        Args:
            rule: The rule ID to disable (e.g., "CVE-2025-001")
            domains: List of domains to disable the rule for, or None for global
            user: Username (populated by middleware for non-root users)

        Returns:
            Empty dict on success.
        """
        return await self._toggle_rule("disable", rule, domains, user)

    @bind("wordpress-plugin", "rules", "enable")
    async def enable_rule(
        self,
        rule: str,
        domains: list[str] | None = None,
        user: str | None = None,
    ) -> dict:
        """
        Re-enable a WordPress protection rule globally or for specific domains.

        Root users can enable globally (no domains) or for specific domains.
        Non-root users can enable for all their domains (no domains) or
        specific ones.
        Non-root users can only enable for domains they own.

        Note: Enabling at one scope doesn't affect the other scope.
        E.g., enabling globally leaves domain-specific disables intact.

        Args:
            rule: The rule ID to enable (e.g., "CVE-2025-001")
            domains: List of domains to enable the rule for, or None for global
            user: Username (populated by middleware for non-root users)

        Returns:
            Empty dict on success
        """
        return await self._toggle_rule("enable", rule, domains, user)