File: //opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/model/tls_check.py
import logging
import threading
import time
import traceback
from playhouse.sqlite_ext import SqliteExtDatabase
from defence360agent.internals.global_scope import g
class OverridingReset(Exception):
"""
Overriding reset could be a signal of logic error
thus need to be explicitly handled in all places where
this exception is expected to occur.
"""
pass
logger = logging.getLogger(__name__)
_thread_local_storage = threading.local()
_SLOW_TXN_THRESHOLD_S = 5.0
class _TimedAtomic:
def __init__(self, inner: object):
self._inner = inner
self._start: float = 0.0
self._caller: str = ""
def __enter__(self):
self._start = time.monotonic()
self._caller = "".join(traceback.format_stack(limit=4)[:-1])
return self._inner.__enter__()
def __exit__(self, *args):
result = self._inner.__exit__(*args)
elapsed = time.monotonic() - self._start
if elapsed > _SLOW_TXN_THRESHOLD_S:
logger.warning(
"Slow transaction held for %.2fs\n%s",
elapsed,
self._caller,
)
return result
class SqliteDatabaseWrapper(SqliteExtDatabase):
def execute_sql(self, *args, **kwargs):
_validate(*args, **kwargs)
return super().execute_sql(*args, **kwargs)
def atomic(self, lock_type: str = "IMMEDIATE"):
inner = super().atomic(lock_type)
if g.get("DEBUG"):
return _TimedAtomic(inner)
return inner
def reset(new_value=None):
if hasattr(_thread_local_storage, "thread_ident_memo"):
raise OverridingReset()
_thread_local_storage.thread_ident_memo = (
new_value or threading.get_ident()
)
def _validate(*args, **kwargs):
thread_ident_memo = getattr(
_thread_local_storage, "thread_ident_memo", None
)
if thread_ident_memo is None:
logger.error("wrong thread or _validate() was not preceded by reset()")
elif thread_ident_memo != threading.get_ident():
logger.error(
"thread_ident_memo check failed [%r != %r]\n"
"context:\nargs: %s\nkwargs: %s",
thread_ident_memo,
threading.get_ident(),
args,
kwargs,
)