Source code for pat2vec.util.docker_elastic

import subprocess
import socket
import os
import time
import logging
import uuid
import requests
import atexit
import struct
from typing import Optional, Tuple

logger = logging.getLogger(__name__)


[docs] class ElasticContainer: """Manages a transient Elasticsearch Docker container for testing.""" def __init__(self, image: Optional[str] = None, port: int = 19200): # Use env var if set, otherwise argument, otherwise default # Note: We default to Docker Hub (elasticsearch:8.17.0) instead of docker.elastic.co # because corporate proxies often block the redirects to Cloudflare R2 used by # Elastic's own registry, whereas Docker Hub's infrastructure is usually permitted. self.image = os.environ.get( "PAT2VEC_ELASTIC_IMAGE", image or "elasticsearch:8.17.0" ) self.port = port self.container_name = f"pat2vec-test-elastic-{uuid.uuid4().hex[:8]}" self.password = "test_password_123" self.container_id: Optional[str] = None self.host = "127.0.0.1" def __enter__(self): if not self.start(): raise RuntimeError("Failed to start Elasticsearch container") return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop()
[docs] @staticmethod def cleanup_orphans(): """Finds and removes any orphaned pat2vec test containers.""" logger.info("Cleaning up any orphaned test containers...") try: # Find containers with the specific name pattern result = subprocess.run( [ "docker", "ps", "-a", "--filter", "name=pat2vec-test-elastic-", "--format", "{{.ID}}", ], capture_output=True, text=True, ) if result.returncode != 0: logger.warning(f"Could not list orphaned containers: {result.stderr}") return container_ids = [cid for cid in result.stdout.strip().split("\n") if cid] if not container_ids: logger.info("No orphaned containers found.") return logger.info( f"Found {len(container_ids)} orphaned container(s): {container_ids}. Removing..." ) # Remove them subprocess.run( ["docker", "rm", "-f"] + container_ids, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except FileNotFoundError: logger.warning( "Docker command not found, cannot clean up orphaned containers." ) except Exception as e: logger.error(f"An error occurred during orphan cleanup: {e}")
def _ensure_image(self) -> bool: """Ensures the Docker image exists locally, pulling with retries if needed.""" # Check if image exists check = subprocess.run( ["docker", "image", "inspect", self.image], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if check.returncode == 0: return True # Pull with retries retries = 3 for i in range(retries): logger.info(f"Pulling image {self.image} (attempt {i+1}/{retries})...") result = subprocess.run( ["docker", "pull", self.image], capture_output=True, text=True ) if result.returncode == 0: return True logger.warning(f"Pull failed: {result.stderr.strip()}") time.sleep(10) return False def _is_port_free(self, port: int) -> bool: """Checks if a local port is free.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: # connect_ex returns 0 if connection succeeds (port is busy) return s.connect_ex(("localhost", port)) != 0 def _get_mapped_port(self) -> int: """Retrieves the host port mapped to container port 9200.""" try: result = subprocess.run( ["docker", "port", self.container_name, "9200"], capture_output=True, text=True, ) if result.returncode == 0 and result.stdout: # Output format example: 0.0.0.0:32768 return int(result.stdout.strip().split(":")[-1]) except Exception as e: logger.warning(f"Could not retrieve mapped port: {e}") return self.port def _get_container_ip(self) -> Optional[str]: """Retrieves the internal IP address of the container.""" if not self.container_id: return None try: result = subprocess.run( [ "docker", "inspect", self.container_id, "-f", "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}", ], capture_output=True, text=True, ) return result.stdout.strip() except Exception: return None def _get_host_gateway_ip(self) -> str: """Attempts to find the default gateway IP (usually the host).""" try: # Method 1: Check /proc/net/route (standard Linux way, works even if 'ip' command is missing) if os.path.exists("/proc/net/route"): with open("/proc/net/route") as f: for line in f: fields = line.strip().split() if len(fields) > 2 and fields[1] == "00000000": # Default route return socket.inet_ntoa( struct.pack("<L", int(fields[2], 16)) ) except Exception: pass try: result = subprocess.run( ["ip", "route", "show", "default"], capture_output=True, text=True ) if result.returncode == 0 and "via" in result.stdout: return result.stdout.split("via")[1].split()[0] except Exception: pass return "127.0.0.1"
[docs] def start(self, timeout: int = 180) -> bool: """Starts an Elasticsearch container. Returns True if successful.""" # First, clean up any containers from previous failed runs self.cleanup_orphans() # Check if docker is available try: subprocess.run( ["docker", "--version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except (subprocess.CalledProcessError, FileNotFoundError): logger.warning("Docker not found. Skipping Elasticsearch container start.") return False if not self._ensure_image(): logger.error(f"Failed to pull Docker image: {self.image}") return False # Determine port mapping. Use random port if configured port is busy. port_mapping = f"{self.port}:9200" if not self._is_port_free(self.port): logger.warning( f"Port {self.port} is in use. Letting Docker assign a random port." ) port_mapping = "9200" cmd = [ "docker", "run", "-d", "--name", self.container_name, "-p", port_mapping, "--add-host=host.docker.internal:host-gateway", "-e", "discovery.type=single-node", "-e", "xpack.security.enabled=true", "-e", "xpack.security.http.ssl.enabled=false", "-e", f"ELASTIC_PASSWORD={self.password}", "-e", f"ELASTICSEARCH_PASSWORD={self.password}", # Compatibility with Bitnami images "-e", "ES_JAVA_OPTS=-Xms512m -Xmx512m", self.image, ] logger.info(f"Starting Elasticsearch container: {self.container_name}") try: result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logger.error(f"Failed to start container: {result.stderr}") return False self.container_id = result.stdout.strip() self.port = self._get_mapped_port() logger.info(f"Container started on port {self.port}") atexit.register(self.stop) self._wait_for_ready(timeout) return True except Exception as e: logger.error(f"Error starting container: {e}") try: logs = subprocess.run( ["docker", "logs", "--tail", "20", self.container_name], capture_output=True, text=True, ) logger.error(f"Container logs:\n{logs.stdout}\n{logs.stderr}") except Exception: pass self.stop() return False
[docs] def stop(self) -> None: """Stops and removes the container.""" try: # Safely unregister the exit handler atexit.unregister(self.stop) except Exception: pass if self.container_id or self.container_name: # Try stopping by name if ID is missing (cleanup) target = self.container_id or self.container_name logger.info(f"Stopping container: {target}") subprocess.run( ["docker", "rm", "-f", target], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) self.container_id = None
def _wait_for_ready(self, timeout: int) -> None: """Waits for Elasticsearch to be responsive.""" start_time = time.time() auth = ("elastic", self.password) logger.info("Waiting for Elasticsearch to become ready...") # Use a Session with trust_env=False to completely bypass environment proxy settings. # This ensures the health check hits the local container loopback even if # http_proxy is set, which is common in self-hosted CI runners. session = requests.Session() session.trust_env = False # Determine connection targets: Localhost, Gateway, and internal Container IP # Determine connection targets container_ip = self._get_container_ip() gateway_ip = self._get_host_gateway_ip() # Target list: (host, port) targets = [ ("127.0.0.1", self.port), (gateway_ip, self.port), ("host.docker.internal", self.port), ] if container_ip: targets.append((container_ip, 9200)) logger.info(f"Waiting for Elasticsearch health on targets: {targets}") while time.time() - start_time < timeout: for test_host, test_port in targets: test_url = f"http://{test_host}:{test_port}/_cluster/health" try: response = session.get(test_url, auth=auth, timeout=2) if response.status_code == 200: status = response.json().get("status") if status in ["green", "yellow"]: logger.info( f"✅ Elasticsearch is ready at {test_url} (status: {status})." ) # Update instance state to the successful connection info self.host = test_host self.port = test_port return except (requests.exceptions.RequestException, ConnectionError): continue except Exception as e: logger.warning( f"Unexpected health check error for {test_host}: {e}" ) time.sleep(5) raise TimeoutError("Elasticsearch container failed to start within timeout.")
[docs] def get_credentials(self) -> Tuple[str, str, str]: """Returns (host_url, username, password).""" return f"http://{self.host}:{self.port}", "elastic", self.password