HEX
Server: LiteSpeed
System: Linux linux31.centraldnserver.com 4.18.0-553.83.1.lve.el8.x86_64 #1 SMP Wed Nov 12 10:04:12 UTC 2025 x86_64
User: salamatk (1501)
PHP: 8.1.33
Disabled: show_source, system, shell_exec, passthru, exec, popen, proc_open
Upload Files
File: //opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/contracts/myimunify_id.py
import pwd
import uuid
from pathlib import Path
from typing import Dict, List, Optional

from defence360agent.contracts.permissions import logger
from defence360agent.model import instance
from defence360agent.myimunify.model import MyImunify, update_users_protection
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.utils import safe_fileops

MYIMUNIFY_ID_FILE_NAME = ".myimunify_id"

_BANNER = (
    "# DO NOT EDIT\n# This file contains MyImunify id unique to this user\n\n"
)
_ID_LEN = 32
_HEX = frozenset("0123456789abcdef")


class MyImunifyIdError(Exception):
    """Exception representing issues related to MyImunify id"""


async def add_myimunify_user(
    sink, user: str, protection: bool
) -> Optional[str]:
    """Save subscription type to the DB and generate id file"""

    myimunify, _ = MyImunify.get_or_create(user=user)
    myimunify.save()
    await update_users_protection(sink, [user], protection)
    logger.info("Applied setting MyImunify=%s for user %s", protection, user)

    try:
        myimunify_id = await _get_or_generate_id(user)
    except MyImunifyIdError:
        # User no longer exists
        return None

    return myimunify_id


async def get_myimunify_users() -> List[Dict]:
    """
    Get a list of MyImunify users, their subscription types and unique ids
    """

    users = []
    user_details = await HostingPanel().get_user_details()
    myimunify_user_to_id = await _myimunify_user_to_id()
    with instance.db.transaction():
        for user, myimunify_uid in sorted(myimunify_user_to_id.items()):
            record, _ = MyImunify.get_or_create(user=user)
            users.append(
                {
                    "email": user_details.get(user, {}).get("email", ""),
                    "username": user,
                    "myimunify_id": myimunify_uid,
                    "protection": record.protection,
                    "locale": user_details.get(user, {}).get("locale", ""),
                }
            )
    return users


async def _myimunify_user_to_id() -> Dict[str, str]:
    """Get a list of users and their MyImunify ids"""

    user_to_id = {}
    for user in await HostingPanel().get_users():
        try:
            user_to_id[user] = await _get_or_generate_id(user)
        except MyImunifyIdError:
            # User does not exist
            continue
        except safe_fileops.UnsafeFileOperation as e:
            logger.error(
                "Unable to generate id for user=%s, error=%s", user, str(e)
            )
            continue
    return user_to_id


async def _get_or_generate_id(user: str) -> str:
    """
    Read MyImunify id if exists and valid, or generate a new one and write into the file.
    Malformed files are regenerated.
    """
    id_file = await _get_myimunify_id_file(user)
    try:
        return _read_id(id_file)
    except (FileNotFoundError, MyImunifyIdError):
        myimunify_id = uuid.uuid1().hex
        return await _write_id(myimunify_id, id_file)


async def _write_id(myimunify_id: str, id_file: Path) -> str:
    """Write MyImunify id to file"""
    text = _BANNER + myimunify_id + "\n"
    try:
        await safe_fileops.write_text(str(id_file), text)
    except (OSError, PermissionError) as e:
        logger.error("Unable to write myimunify_id in user home dir: %s", e)
        raise MyImunifyIdError from e
    return myimunify_id


def _read_id(id_file: Path) -> str:
    """Read and validate MyImunify id from file. Raises MyImunifyIdError if malformed."""
    try:
        text = id_file.read_text(encoding="utf-8")
    except UnicodeDecodeError:
        raise MyImunifyIdError
    return _parse_id(text)


def _parse_id(text: str) -> str:
    """Read line by line: skip comments (#). First non-comment line must be valid id; nothing after it."""
    id_line = None
    for line in text.splitlines():
        s = line.strip()
        if not s:
            continue
        if s.startswith("#"):
            continue
        if id_line is not None:
            raise MyImunifyIdError
        if len(s) != _ID_LEN or not all(c in _HEX for c in s):
            raise MyImunifyIdError
        id_line = s
    if id_line is None:
        raise MyImunifyIdError
    return id_line


async def _get_myimunify_id_file(user: str) -> Path:
    """Get a file with MyImunify id and create it if does not exist"""

    try:
        user_pwd = pwd.getpwnam(user)
    except KeyError as e:
        logger.error("No such user: %s", user)
        raise MyImunifyIdError from e
    else:
        id_file = Path(user_pwd.pw_dir) / MYIMUNIFY_ID_FILE_NAME
        if not id_file.exists():
            if not id_file.parent.exists():
                logger.error("No such user homedir: %s", user)
                raise MyImunifyIdError
            try:
                await safe_fileops.touch(str(id_file))
            except (PermissionError, OSError) as e:
                logger.error(
                    "Unable to put myimunify_id in user home dir: %s", e
                )
                raise MyImunifyIdError from e
    return id_file