HEX
Server: LiteSpeed
System: Linux linux31.centraldnserver.com 4.18.0-553.83.1.lve.el8.x86_64 #1 SMP Wed Nov 12 10:04:12 UTC 2025 x86_64
User: salamatk (1501)
PHP: 8.1.33
Disabled: show_source, system, shell_exec, passthru, exec, popen, proc_open
Upload Files
File: //opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/net_transport.py
"""Networking transport helpers for urllib.

This module provides a small abstraction on top of urllib.request so that
callers can keep using urllib.request.Request, but routing of connections
can be customized:

- hostname resolution is handled in user code;
- selected IP may be randomized or chosen using any complex logic;
- for HTTPS: connects to a chosen IP but keeps correct SNI and certificate
  hostname validation for the original hostname (NOT the IP).

Examples:

Default behavior (plain urllib):

    from defence360agent.utils.net_transport import UrlTransport

    transport = UrlTransport()
    req = urllib.request.Request(
        "https://files.imunify360.com/static/sigs/v1/description.json"
    )
    with transport.open(req, timeout=10) as resp:
        body = resp.read()

Randomize target IP on each connection (A/AAAA -> random choice):

    from defence360agent.utils.net_transport import UrlTransport, RandomIpChooser

    chooser = RandomIpChooser()
    transport = UrlTransport(ip_chooser=chooser)

    req = urllib.request.Request(
        "https://files.imunify360.com/static/sigs/v1/description.json"
    )
    with transport.open(req, timeout=10) as resp:
        body = resp.read()

Notes:

- HTTPS: connects to the chosen IP but keeps SNI/cert checks against original
  hostname.
- HTTP: Host header stays original hostname because urllib builds it from the
  URL.

"""

import http.client
import ipaddress
import random
import socket
import threading
import time
import urllib.request
from abc import ABC, abstractmethod
from logging import getLogger
from typing import Dict, Optional, Tuple, TYPE_CHECKING

if TYPE_CHECKING:
    import ssl

logger = getLogger(__name__)

#: default cache TTL for DNS responses
_DNS_DEFAULT_TTL_SECONDS = 300.0


def _is_ipv4(ip: str) -> bool:
    """Return True if *ip* is an IPv4 address string.

    Implementation relies solely on ipaddress.ip_address for correctness.
    """
    try:
        return isinstance(ipaddress.ip_address(ip), ipaddress.IPv4Address)
    except ValueError:
        return False


class IpChooser(ABC):
    """Select an IP address to connect to for a given hostname and port.

    Implementations may be stateful and can keep caches/metrics inside.
    """

    @abstractmethod
    def choose(self, hostname: str, port: int) -> str:
        """Return an IP address (v4 or v6) for *hostname*:*port*."""
        raise NotImplementedError

    def __call__(self, hostname: str, port: int) -> str:
        return self.choose(hostname, port)


class DnsCacheResolver:
    """DNS cache for socket.getaddrinfo() results.

    It caches per (hostname, port, family). This is intentionally small and
    local: it is meant only to avoid excessive getaddrinfo() calls.
    """

    def __init__(
        self,
        *,
        family: int = socket.AF_UNSPEC,
        ttl_seconds: float = _DNS_DEFAULT_TTL_SECONDS,
    ):
        self._family = family
        self._ttl_seconds = ttl_seconds
        self._cache: Dict[
            Tuple[str, int, int], Tuple[float, Tuple[str, ...]]
        ] = {}
        self._lock = threading.Lock()

    def get_ips(self, hostname: str, port: int) -> Tuple[str, ...]:
        key = (hostname, port, self._family)
        now = time.time()

        with self._lock:
            cached = self._cache.get(key)
            if cached is not None:
                expires_at, ips = cached
                if now < expires_at:
                    logger.debug(
                        "DnsCacheResolver cache hit for %s:%s (family=%s)",
                        hostname,
                        port,
                        self._family,
                    )
                    return ips

        logger.debug(
            "DnsCacheResolver cache miss/expired for %s:%s (family=%s)",
            hostname,
            port,
            self._family,
        )

        infos = socket.getaddrinfo(
            hostname,
            port,
            self._family,
            socket.SOCK_STREAM,
        )

        ips = []
        for _, _, _, _, sockaddr in infos:
            ip = sockaddr[0]
            if ip not in ips:
                ips.append(ip)

        if not ips:
            raise OSError("No IPs resolved for {}:{}".format(hostname, port))

        ips_t = tuple(ips)
        with self._lock:
            self._cache[key] = (now + self._ttl_seconds, ips_t)

        logger.debug(
            "DnsCacheResolver resolved %s:%s (family=%s) to %s",
            hostname,
            port,
            self._family,
            ips_t,
        )
        return ips_t


class RandomIpChooserWithIPv6Toggle(IpChooser):
    """Resolve hostname and select a random IP.

    IPv6 selection can be enabled/disabled at runtime:
    - when IPv6 is enabled: choose from IPv4 + IPv6 candidates
    - when IPv6 is disabled: choose from IPv4-only candidates
    """

    def __init__(
        self,
        *,
        resolver: Optional[DnsCacheResolver] = None,
        rng: Optional[random.Random] = None,
        ipv6_enabled: bool = True,
    ):
        self._resolver = resolver or DnsCacheResolver()
        self._rng = rng or random.Random()
        self._ipv6_enabled = ipv6_enabled
        self._last_ip: Optional[str] = None

    def enable_ipv6(self) -> None:
        self._ipv6_enabled = True

    def disable_ipv6(self) -> None:
        self._ipv6_enabled = False

    def is_ipv6_enabled(self) -> bool:
        return self._ipv6_enabled

    def last_ip(self) -> Optional[str]:
        return self._last_ip

    def last_ip_was_ipv6(self) -> bool:
        return bool(self._last_ip) and (":" in self._last_ip)

    def choose(self, hostname: str, port: int) -> str:
        ips = self._resolver.get_ips(hostname, port)
        if self._ipv6_enabled:
            chosen = self._rng.choice(ips)
            self._last_ip = chosen
            logger.debug(
                "RandomIpChooserWithIPv6Toggle selected IP %s for %s:%s "
                "(IPv6 enabled)",
                chosen,
                hostname,
                port,
            )
            return chosen

        ipv4_ips = tuple(ip for ip in ips if _is_ipv4(ip))
        if not ipv4_ips:
            raise OSError(
                "No IPv4 IPs resolved for {}:{}".format(hostname, port)
            )

        chosen = self._rng.choice(ipv4_ips)
        self._last_ip = chosen
        logger.debug(
            "RandomIpChooserWithIPv6Toggle selected IPv4 IP %s for %s:%s "
            "(IPv6 disabled)",
            chosen,
            hostname,
            port,
        )
        return chosen


class RandomIpChooser(IpChooser):
    """Resolve hostname and select a random IP from resolved candidates."""

    def __init__(
        self,
        *,
        resolver: Optional[DnsCacheResolver] = None,
        rng: Optional[random.Random] = None,
    ):
        self._resolver = resolver or DnsCacheResolver()
        self._rng = rng or random.Random()

    def choose(self, hostname: str, port: int) -> str:
        ips = self._resolver.get_ips(hostname, port)
        chosen = self._rng.choice(ips)
        logger.debug(
            "RandomIpChooser selected IP %s for %s:%s",
            chosen,
            hostname,
            port,
        )
        return chosen


class ForcedIPHTTPConnection(http.client.HTTPConnection):
    """HTTPConnection that connects to a chosen IP.

    Important: urllib builds the request URL with the original hostname,
    therefore the Host header stays correct.
    """

    def __init__(
        self,
        hostname: str,
        port: Optional[int] = None,
        *,
        ip_chooser: IpChooser,
        timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
        source_address=None,
    ):
        super().__init__(
            hostname,
            port=port,
            timeout=timeout,
            source_address=source_address,
        )
        self._ip_chooser = ip_chooser

    def connect(self) -> None:
        port = self.port or 80
        ip = self._ip_chooser.choose(self.host, port)
        logger.debug(
            "ForcedIPHTTPConnection connecting to %s:%s for hostname %s",
            ip,
            port,
            self.host,
        )

        self.sock = socket.create_connection(
            (ip, port),
            self.timeout,
            self.source_address,
        )


class ForcedIPHTTPSConnection(http.client.HTTPSConnection):
    """HTTPSConnection that connects to a chosen IP.

    TLS details:
    - Uses original hostname for SNI (server_hostname in wrap_socket)
    - Certificate hostname validation is performed for the original hostname
    """

    def __init__(
        self,
        hostname: str,
        port: Optional[int] = None,
        *,
        ip_chooser: IpChooser,
        context: "ssl.SSLContext",
        timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
        source_address=None,
    ):
        super().__init__(
            hostname,
            port=port,
            context=context,
            timeout=timeout,
            source_address=source_address,
        )
        self._ip_chooser = ip_chooser

    def connect(self) -> None:
        port = self.port or 443
        ip = self._ip_chooser.choose(self.host, port)
        logger.debug(
            "ForcedIPHTTPSConnection connecting to %s:%s for hostname %s",
            ip,
            port,
            self.host,
        )

        raw_sock = socket.create_connection(
            (ip, port),
            self.timeout,
            self.source_address,
        )

        if self._tunnel_host:
            self.sock = raw_sock
            self._tunnel()
            raw_sock = self.sock

        self.sock = self._context.wrap_socket(
            raw_sock,
            server_hostname=self.host,
        )


class ForcedIPHTTPHandler(urllib.request.HTTPHandler):
    """urllib handler that creates ForcedIPHTTPConnection."""

    def __init__(self, *, ip_chooser: IpChooser):
        super().__init__()
        self._ip_chooser = ip_chooser

    def http_open(self, req) -> http.client.HTTPResponse:
        def factory(host, **kwargs):
            return ForcedIPHTTPConnection(
                host,
                ip_chooser=self._ip_chooser,
                timeout=kwargs.get("timeout"),
            )

        return self.do_open(factory, req)


class ForcedIPHTTPSHandler(urllib.request.HTTPSHandler):
    """urllib handler that creates ForcedIPHTTPSConnection."""

    def __init__(self, *, ip_chooser: IpChooser, context: "ssl.SSLContext"):
        super().__init__(context=context)
        self._ip_chooser = ip_chooser
        self._context = context

    def https_open(self, req) -> http.client.HTTPResponse:
        def factory(host, **kwargs):
            return ForcedIPHTTPSConnection(
                host,
                ip_chooser=self._ip_chooser,
                context=self._context,
                timeout=kwargs.get("timeout"),
            )

        return self.do_open(factory, req)


class UrlTransport:
    """Single entrypoint for opening urllib requests.

    If *ip_chooser* is provided, the transport will connect to the selected IP
    address, while keeping correct Host/SNI/cert validation for the original
    hostname. If *ip_chooser* is not provided, it behaves like plain urllib.
    """

    def __init__(
        self,
        *,
        ip_chooser: Optional[IpChooser] = None,
        ssl_context: Optional["ssl.SSLContext"] = None,
    ):
        import ssl as _ssl

        self._ssl_context = ssl_context or _ssl.create_default_context()

        if ip_chooser is None:
            self._opener = urllib.request.build_opener()
        else:
            self._opener = urllib.request.build_opener(
                ForcedIPHTTPHandler(ip_chooser=ip_chooser),
                ForcedIPHTTPSHandler(
                    ip_chooser=ip_chooser,
                    context=self._ssl_context,
                ),
            )

    def open(
        self,
        req: urllib.request.Request,
        *,
        timeout: Optional[float] = None,
    ) -> http.client.HTTPResponse:
        if timeout is None:
            return self._opener.open(req)
        return self._opener.open(req, timeout=timeout)