File: //opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/simple_rpc/wp_disabled_rules.py
"""RPC endpoints for WordPress disabled protection rules."""
import asyncio
import logging
import pwd
import time
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import MessageSink
from defence360agent.files import Index, WP_RULES
from defence360agent.model.wp_disabled_rule import WPDisabledRule
from defence360agent.rpc_tools import ValidationError
from defence360agent.rpc_tools.lookup import CommonEndpoints, bind
from defence360agent.subsys.panels import hosting_panel
from defence360agent.utils import Scope, log_future_errors
from defence360agent.wordpress.changelog_processor import (
ChangelogProcessor,
)
from defence360agent.wordpress.plugin import (
redeploy_wp_rules,
update_disabled_rules_on_sites,
)
from defence360agent.wordpress.site_repository import (
get_installed_sites_by_domains,
)
from defence360agent.wordpress.wp_rules import get_wp_rules_data
logger = logging.getLogger(__name__)
async def _get_user_domains(user: str) -> list[str]:
"""
Get domains for a user from the hosting panel.
Returns:
List of domains the user owns, or empty list on error.
"""
try:
hp = hosting_panel.HostingPanel()
domains_per_user = await hp.get_domains_per_user()
return domains_per_user.get(user, [])
except Exception as e:
logger.warning("Failed to get domains for user %s: %s", user, e)
return []
async def _validate_user_domains(
user: str, domains: list[str] | None
) -> list[str]:
"""
Validate and filter domains for a non-root user.
If no domains specified, returns all user's domains.
If domains specified, filters to only those the user owns.
Args:
user: Username to validate domains for
domains: Requested domains, or None for all user's domains
Returns:
List of validated domains the user can access
Raises:
ValidationError: If user has no domains or no access to requested domains
"""
user_domains = await _get_user_domains(user)
if not domains:
if not user_domains:
raise ValidationError("No domains found for user")
return user_domains
authorized_domains = [d for d in domains if d in user_domains]
if not authorized_domains:
raise ValidationError(
"You don't have access to any of the specified domains"
)
return authorized_domains
def _enrich_with_metadata(
disabled_rules: list[dict], wp_rules_data: dict | None
) -> list[dict]:
"""
Enrich disabled rules with metadata from wp-rules.yaml.
Args:
disabled_rules: List of disabled rule dicts from WPDisabledRule.fetch()
wp_rules_data: Parsed wp-rules.yaml data, or None if unavailable
Returns:
List of enriched rule dicts with component and versions added
"""
enriched = []
for rule in disabled_rules:
rule_id = rule["rule_id"]
metadata = wp_rules_data.get(rule_id, {}) if wp_rules_data else {}
enriched.append(
{
**rule,
"component": metadata.get("target"),
"versions": metadata.get("versions"),
}
)
return enriched
async def _jit_sync_changelogs(
domains: list[str], sink: MessageSink | None = None
) -> None:
"""Process pending changelog files for the given domains before an API change.
This "Just-in-Time" sync ensures the database reflects any WordPress-side
changes before the agent applies its own disable/enable operation.
File regeneration (disabled-rules.php) is intentionally skipped here because
the calling API endpoint will regenerate files after its own DB mutation.
"""
try:
sites = get_installed_sites_by_domains(domains)
if not sites:
return
await ChangelogProcessor().process_changelogs_for_sites(
sites, sink=sink
)
except Exception as e:
logger.warning("JIT changelog sync failed: %s", e, exc_info=True)
class WPDisabledRulesEndpoints(CommonEndpoints):
"""Endpoints for listing disabled WordPress protection rules."""
SCOPE = Scope.AV_IM360
@bind("wordpress-plugin", "rules", "list-disabled")
async def list_disabled_rules(
self,
limit: int = 50,
offset: int = 0,
domains: list[str] | None = None,
user: str | None = None,
) -> tuple[int, list[dict]]:
"""
List disabled WordPress protection rules with metadata.
When user is provided, returns rules for that user's domains.
Otherwise, returns all disabled rules.
Args:
limit: Maximum number of rules to return
offset: Number of rules to skip
domains: Filter by specific domains (optional)
user: Username (populated by middleware)
Returns:
Tuple of (total_count, list of enriched rule dicts)
"""
if user:
user_domains = await _get_user_domains(user)
if not domains:
domains = user_domains
else:
domains = [d for d in domains if d in user_domains]
# if user cannot access any of the requested domains, return empty list
if not domains:
return 0, []
# Fetch disabled rules from database
# Root users see all rules (including global), non-root only see their domain rules
total_count, disabled_rules = WPDisabledRule.fetch(
limit=limit,
offset=offset,
user_domains=domains,
include_global=user is None,
)
# Load wp-rules metadata for enrichment
try:
wp_rules_index = Index(WP_RULES, integrity_check=False)
wp_rules_data = get_wp_rules_data(wp_rules_index)
except Exception as e:
logger.warning("Failed to load wp-rules data: %s", e)
wp_rules_data = None
# Enrich with metadata
enriched_rules = _enrich_with_metadata(disabled_rules, wp_rules_data)
return total_count, enriched_rules
async def _toggle_rule(
self,
action: str,
rule: str,
domains: list[str] | None,
user: str | None,
) -> dict:
"""Shared implementation for disable/enable rule endpoints."""
if user is None:
user_id = 0
else:
try:
user_id = pwd.getpwnam(user).pw_uid
except KeyError:
raise ValidationError(f"User '{user}' not found")
if user:
domains = await _validate_user_domains(user, domains)
# JIT Sync: process pending changelogs before applying API changes.
# Skipped for global operations (domains=None) because global and
# domain-level disables are independent scopes and cannot conflict.
if domains:
await _jit_sync_changelogs(domains, self._sink)
if action == "disable":
WPDisabledRule.store(
rule_id=rule,
domains=domains,
source=WPDisabledRule.SOURCE_AGENT,
user_id=user_id,
)
message_cls = MessageType.WPRuleDisabled
else:
WPDisabledRule.remove(rule_id=rule, domains=domains)
message_cls = MessageType.WPRuleEnabled
try:
await self._sink.process_message(
message_cls(
plugin_id="wordpress",
rule=rule,
domains=domains or [],
timestamp=time.time(),
user_id=user_id,
source=WPDisabledRule.SOURCE_AGENT,
)
)
except Exception as e:
logger.error(
"Failed to report rule %s for %s: %s", action, rule, e
)
if domains:
# Domain-specific: update disabled-rules.php for affected sites
task = asyncio.create_task(
update_disabled_rules_on_sites(domains=domains)
)
else:
# Global: re-deploy rules.php with the rule filtered out
task = asyncio.create_task(redeploy_wp_rules())
task.add_done_callback(log_future_errors)
return {}
@bind("wordpress-plugin", "rules", "disable")
async def disable_rule(
self,
rule: str,
domains: list[str] | None = None,
user: str | None = None,
) -> dict:
"""
Disable a WordPress protection rule globally or for specific domains.
Root users can disable globally (no domains) or for specific domains.
Non-root users can disable for all their domains (by specifying no
domains) or for specific domains.
Non-root users can only disable for domains they own.
Args:
rule: The rule ID to disable (e.g., "CVE-2025-001")
domains: List of domains to disable the rule for, or None for global
user: Username (populated by middleware for non-root users)
Returns:
Empty dict on success.
"""
return await self._toggle_rule("disable", rule, domains, user)
@bind("wordpress-plugin", "rules", "enable")
async def enable_rule(
self,
rule: str,
domains: list[str] | None = None,
user: str | None = None,
) -> dict:
"""
Re-enable a WordPress protection rule globally or for specific domains.
Root users can enable globally (no domains) or for specific domains.
Non-root users can enable for all their domains (no domains) or
specific ones.
Non-root users can only enable for domains they own.
Note: Enabling at one scope doesn't affect the other scope.
E.g., enabling globally leaves domain-specific disables intact.
Args:
rule: The rule ID to enable (e.g., "CVE-2025-001")
domains: List of domains to enable the rule for, or None for global
user: Username (populated by middleware for non-root users)
Returns:
Empty dict on success
"""
return await self._toggle_rule("enable", rule, domains, user)