HEX
Server: LiteSpeed
System: Linux linux31.centraldnserver.com 4.18.0-553.83.1.lve.el8.x86_64 #1 SMP Wed Nov 12 10:04:12 UTC 2025 x86_64
User: salamatk (1501)
PHP: 8.1.33
Disabled: show_source, system, shell_exec, passthru, exec, popen, proc_open
Upload Files
File: //opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/model/tls_check.py
import logging
import threading
import time
import traceback

from playhouse.sqlite_ext import SqliteExtDatabase

from defence360agent.internals.global_scope import g


class OverridingReset(Exception):
    """
    Overriding reset could be a signal of logic error
    thus need to be explicitly handled in all places where
    this exception is expected to occur.
    """

    pass


logger = logging.getLogger(__name__)
_thread_local_storage = threading.local()

_SLOW_TXN_THRESHOLD_S = 5.0


class _TimedAtomic:
    def __init__(self, inner: object):
        self._inner = inner
        self._start: float = 0.0
        self._caller: str = ""

    def __enter__(self):
        self._start = time.monotonic()
        self._caller = "".join(traceback.format_stack(limit=4)[:-1])
        return self._inner.__enter__()

    def __exit__(self, *args):
        result = self._inner.__exit__(*args)
        elapsed = time.monotonic() - self._start
        if elapsed > _SLOW_TXN_THRESHOLD_S:
            logger.warning(
                "Slow transaction held for %.2fs\n%s",
                elapsed,
                self._caller,
            )
        return result


class SqliteDatabaseWrapper(SqliteExtDatabase):
    def execute_sql(self, *args, **kwargs):
        _validate(*args, **kwargs)
        return super().execute_sql(*args, **kwargs)

    def atomic(self, lock_type: str = "IMMEDIATE"):
        inner = super().atomic(lock_type)
        if g.get("DEBUG"):
            return _TimedAtomic(inner)
        return inner


def reset(new_value=None):
    if hasattr(_thread_local_storage, "thread_ident_memo"):
        raise OverridingReset()

    _thread_local_storage.thread_ident_memo = (
        new_value or threading.get_ident()
    )


def _validate(*args, **kwargs):
    thread_ident_memo = getattr(
        _thread_local_storage, "thread_ident_memo", None
    )

    if thread_ident_memo is None:
        logger.error("wrong thread or _validate() was not preceded by reset()")

    elif thread_ident_memo != threading.get_ident():
        logger.error(
            "thread_ident_memo check failed [%r != %r]\n"
            "context:\nargs: %s\nkwargs: %s",
            thread_ident_memo,
            threading.get_ident(),
            args,
            kwargs,
        )