File: //opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/utils/net_transport.py
"""Networking transport helpers for urllib.
This module provides a small abstraction on top of urllib.request so that
callers can keep using urllib.request.Request, but routing of connections
can be customized:
- hostname resolution is handled in user code;
- selected IP may be randomized or chosen using any complex logic;
- for HTTPS: connects to a chosen IP but keeps correct SNI and certificate
hostname validation for the original hostname (NOT the IP).
Examples:
Default behavior (plain urllib):
from defence360agent.utils.net_transport import UrlTransport
transport = UrlTransport()
req = urllib.request.Request(
"https://files.imunify360.com/static/sigs/v1/description.json"
)
with transport.open(req, timeout=10) as resp:
body = resp.read()
Randomize target IP on each connection (A/AAAA -> random choice):
from defence360agent.utils.net_transport import UrlTransport, RandomIpChooser
chooser = RandomIpChooser()
transport = UrlTransport(ip_chooser=chooser)
req = urllib.request.Request(
"https://files.imunify360.com/static/sigs/v1/description.json"
)
with transport.open(req, timeout=10) as resp:
body = resp.read()
Notes:
- HTTPS: connects to the chosen IP but keeps SNI/cert checks against original
hostname.
- HTTP: Host header stays original hostname because urllib builds it from the
URL.
"""
import http.client
import ipaddress
import random
import socket
import threading
import time
import urllib.request
from abc import ABC, abstractmethod
from logging import getLogger
from typing import Dict, Optional, Tuple, TYPE_CHECKING
if TYPE_CHECKING:
import ssl
logger = getLogger(__name__)
#: default cache TTL for DNS responses
_DNS_DEFAULT_TTL_SECONDS = 300.0
def _is_ipv4(ip: str) -> bool:
"""Return True if *ip* is an IPv4 address string.
Implementation relies solely on ipaddress.ip_address for correctness.
"""
try:
return isinstance(ipaddress.ip_address(ip), ipaddress.IPv4Address)
except ValueError:
return False
class IpChooser(ABC):
"""Select an IP address to connect to for a given hostname and port.
Implementations may be stateful and can keep caches/metrics inside.
"""
@abstractmethod
def choose(self, hostname: str, port: int) -> str:
"""Return an IP address (v4 or v6) for *hostname*:*port*."""
raise NotImplementedError
def __call__(self, hostname: str, port: int) -> str:
return self.choose(hostname, port)
class DnsCacheResolver:
"""DNS cache for socket.getaddrinfo() results.
It caches per (hostname, port, family). This is intentionally small and
local: it is meant only to avoid excessive getaddrinfo() calls.
"""
def __init__(
self,
*,
family: int = socket.AF_UNSPEC,
ttl_seconds: float = _DNS_DEFAULT_TTL_SECONDS,
):
self._family = family
self._ttl_seconds = ttl_seconds
self._cache: Dict[
Tuple[str, int, int], Tuple[float, Tuple[str, ...]]
] = {}
self._lock = threading.Lock()
def get_ips(self, hostname: str, port: int) -> Tuple[str, ...]:
key = (hostname, port, self._family)
now = time.time()
with self._lock:
cached = self._cache.get(key)
if cached is not None:
expires_at, ips = cached
if now < expires_at:
logger.debug(
"DnsCacheResolver cache hit for %s:%s (family=%s)",
hostname,
port,
self._family,
)
return ips
logger.debug(
"DnsCacheResolver cache miss/expired for %s:%s (family=%s)",
hostname,
port,
self._family,
)
infos = socket.getaddrinfo(
hostname,
port,
self._family,
socket.SOCK_STREAM,
)
ips = []
for _, _, _, _, sockaddr in infos:
ip = sockaddr[0]
if ip not in ips:
ips.append(ip)
if not ips:
raise OSError("No IPs resolved for {}:{}".format(hostname, port))
ips_t = tuple(ips)
with self._lock:
self._cache[key] = (now + self._ttl_seconds, ips_t)
logger.debug(
"DnsCacheResolver resolved %s:%s (family=%s) to %s",
hostname,
port,
self._family,
ips_t,
)
return ips_t
class RandomIpChooserWithIPv6Toggle(IpChooser):
"""Resolve hostname and select a random IP.
IPv6 selection can be enabled/disabled at runtime:
- when IPv6 is enabled: choose from IPv4 + IPv6 candidates
- when IPv6 is disabled: choose from IPv4-only candidates
"""
def __init__(
self,
*,
resolver: Optional[DnsCacheResolver] = None,
rng: Optional[random.Random] = None,
ipv6_enabled: bool = True,
):
self._resolver = resolver or DnsCacheResolver()
self._rng = rng or random.Random()
self._ipv6_enabled = ipv6_enabled
self._last_ip: Optional[str] = None
def enable_ipv6(self) -> None:
self._ipv6_enabled = True
def disable_ipv6(self) -> None:
self._ipv6_enabled = False
def is_ipv6_enabled(self) -> bool:
return self._ipv6_enabled
def last_ip(self) -> Optional[str]:
return self._last_ip
def last_ip_was_ipv6(self) -> bool:
return bool(self._last_ip) and (":" in self._last_ip)
def choose(self, hostname: str, port: int) -> str:
ips = self._resolver.get_ips(hostname, port)
if self._ipv6_enabled:
chosen = self._rng.choice(ips)
self._last_ip = chosen
logger.debug(
"RandomIpChooserWithIPv6Toggle selected IP %s for %s:%s "
"(IPv6 enabled)",
chosen,
hostname,
port,
)
return chosen
ipv4_ips = tuple(ip for ip in ips if _is_ipv4(ip))
if not ipv4_ips:
raise OSError(
"No IPv4 IPs resolved for {}:{}".format(hostname, port)
)
chosen = self._rng.choice(ipv4_ips)
self._last_ip = chosen
logger.debug(
"RandomIpChooserWithIPv6Toggle selected IPv4 IP %s for %s:%s "
"(IPv6 disabled)",
chosen,
hostname,
port,
)
return chosen
class RandomIpChooser(IpChooser):
"""Resolve hostname and select a random IP from resolved candidates."""
def __init__(
self,
*,
resolver: Optional[DnsCacheResolver] = None,
rng: Optional[random.Random] = None,
):
self._resolver = resolver or DnsCacheResolver()
self._rng = rng or random.Random()
def choose(self, hostname: str, port: int) -> str:
ips = self._resolver.get_ips(hostname, port)
chosen = self._rng.choice(ips)
logger.debug(
"RandomIpChooser selected IP %s for %s:%s",
chosen,
hostname,
port,
)
return chosen
class ForcedIPHTTPConnection(http.client.HTTPConnection):
"""HTTPConnection that connects to a chosen IP.
Important: urllib builds the request URL with the original hostname,
therefore the Host header stays correct.
"""
def __init__(
self,
hostname: str,
port: Optional[int] = None,
*,
ip_chooser: IpChooser,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None,
):
super().__init__(
hostname,
port=port,
timeout=timeout,
source_address=source_address,
)
self._ip_chooser = ip_chooser
def connect(self) -> None:
port = self.port or 80
ip = self._ip_chooser.choose(self.host, port)
logger.debug(
"ForcedIPHTTPConnection connecting to %s:%s for hostname %s",
ip,
port,
self.host,
)
self.sock = socket.create_connection(
(ip, port),
self.timeout,
self.source_address,
)
class ForcedIPHTTPSConnection(http.client.HTTPSConnection):
"""HTTPSConnection that connects to a chosen IP.
TLS details:
- Uses original hostname for SNI (server_hostname in wrap_socket)
- Certificate hostname validation is performed for the original hostname
"""
def __init__(
self,
hostname: str,
port: Optional[int] = None,
*,
ip_chooser: IpChooser,
context: "ssl.SSLContext",
timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None,
):
super().__init__(
hostname,
port=port,
context=context,
timeout=timeout,
source_address=source_address,
)
self._ip_chooser = ip_chooser
def connect(self) -> None:
port = self.port or 443
ip = self._ip_chooser.choose(self.host, port)
logger.debug(
"ForcedIPHTTPSConnection connecting to %s:%s for hostname %s",
ip,
port,
self.host,
)
raw_sock = socket.create_connection(
(ip, port),
self.timeout,
self.source_address,
)
if self._tunnel_host:
self.sock = raw_sock
self._tunnel()
raw_sock = self.sock
self.sock = self._context.wrap_socket(
raw_sock,
server_hostname=self.host,
)
class ForcedIPHTTPHandler(urllib.request.HTTPHandler):
"""urllib handler that creates ForcedIPHTTPConnection."""
def __init__(self, *, ip_chooser: IpChooser):
super().__init__()
self._ip_chooser = ip_chooser
def http_open(self, req) -> http.client.HTTPResponse:
def factory(host, **kwargs):
return ForcedIPHTTPConnection(
host,
ip_chooser=self._ip_chooser,
timeout=kwargs.get("timeout"),
)
return self.do_open(factory, req)
class ForcedIPHTTPSHandler(urllib.request.HTTPSHandler):
"""urllib handler that creates ForcedIPHTTPSConnection."""
def __init__(self, *, ip_chooser: IpChooser, context: "ssl.SSLContext"):
super().__init__(context=context)
self._ip_chooser = ip_chooser
self._context = context
def https_open(self, req) -> http.client.HTTPResponse:
def factory(host, **kwargs):
return ForcedIPHTTPSConnection(
host,
ip_chooser=self._ip_chooser,
context=self._context,
timeout=kwargs.get("timeout"),
)
return self.do_open(factory, req)
class UrlTransport:
"""Single entrypoint for opening urllib requests.
If *ip_chooser* is provided, the transport will connect to the selected IP
address, while keeping correct Host/SNI/cert validation for the original
hostname. If *ip_chooser* is not provided, it behaves like plain urllib.
"""
def __init__(
self,
*,
ip_chooser: Optional[IpChooser] = None,
ssl_context: Optional["ssl.SSLContext"] = None,
):
import ssl as _ssl
self._ssl_context = ssl_context or _ssl.create_default_context()
if ip_chooser is None:
self._opener = urllib.request.build_opener()
else:
self._opener = urllib.request.build_opener(
ForcedIPHTTPHandler(ip_chooser=ip_chooser),
ForcedIPHTTPSHandler(
ip_chooser=ip_chooser,
context=self._ssl_context,
),
)
def open(
self,
req: urllib.request.Request,
*,
timeout: Optional[float] = None,
) -> http.client.HTTPResponse:
if timeout is None:
return self._opener.open(req)
return self._opener.open(req, timeout=timeout)