Source code for pat2vec.util.elasticsearch_methods

from getpass import getpass
import pandas as pd
from typing import Any, Dict, List, Optional
from .credentials import *
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from IPython.display import display
from tqdm import tqdm
from elasticsearch.helpers import BulkIndexError
from elasticsearch import Elasticsearch, helpers
import logging

logger = logging.getLogger(__name__)


[docs] def ingest_data_to_elasticsearch( temp_df: pd.DataFrame, index_name: str, index_mapping: Optional[Dict[str, Any]] = None, replace_index: bool = False, ) -> Dict[str, int]: """Ingests data from a DataFrame into Elasticsearch with error handling. Args: temp_df: The DataFrame containing the data to be ingested. index_name: Name of the Elasticsearch index. index_mapping: Optional mapping for the index. replace_index: Whether to replace the index if it exists. Returns: A summary containing the number of successful and failed operations. Raises: ConnectionError: If the Elasticsearch server is not reachable. """ # Set default index mapping if none is provided index_mapping = index_mapping or { # type: ignore "settings": { "number_of_shards": 1, "number_of_replicas": 1, "index.mapping.ignore_malformed": True, "index.mapping.total_fields.limit": 100000, } } # Initialize Elasticsearch client es = Elasticsearch( [{"host": host_name, "port": port, "scheme": scheme}], verify_certs=False, http_auth=(username, password), ) # Check connection try: if not es.ping(): raise ConnectionError("Elasticsearch server not reachable.") except Exception as e: logger.error(f"Error connecting to Elasticsearch: {e}") raise # Replace index if requested if es.indices.exists(index=index_name) and replace_index: response = es.indices.delete(index=index_name) logger.info(f"Index {index_name} deleted successfully.") logger.info(response) # Create the index if it does not exist if not es.indices.exists(index=index_name): try: es.indices.create(index=index_name, body=index_mapping) logger.info(f"Index {index_name} created successfully.") except Exception as e: logger.error(f"Error creating index: {e}") raise # Prepare documents for bulk indexing docs = temp_df.to_dict(orient="records") actions = [ {"_op_type": "index", "_index": index_name, "_source": doc} for doc in docs ] success_count = 0 failed_docs = [] problematic_fields = {} try: for ok, result in helpers.streaming_bulk(es, actions): if not ok: failed_docs.append(result) # Extract error details error_info = result.get("index", {}).get("error", {}) doc_id = result.get("index", {}).get("_id", "N/A") field_name = ( error_info.get("reason", "").split("field [")[1].split("]")[0] if "field [" in error_info.get("reason", "") else "Unknown" ) # Log problematic field if field_name not in problematic_fields: problematic_fields[field_name] = [] problematic_fields[field_name].append( error_info.get("reason", "Unknown") ) else: success_count += 1 logger.info(f"Successfully ingested {success_count} documents.") logger.warning(f"Failed to ingest {len(failed_docs)} documents.") # Log details of failed documents if failed_docs: logger.warning("\nDetails of failed documents:") for fail in failed_docs: error_info = fail.get("index", {}).get("error", {}) doc_id = fail.get("index", {}).get("_id", "N/A") logger.warning(f"Failed Document ID: {doc_id}") logger.warning(f"Error Type: {error_info.get('type', 'Unknown')}") logger.warning(f"Error Reason: {error_info.get('reason', 'Unknown')}") # Log problematic fields summary logger.warning("\nProblematic fields summary:") for field, issues in problematic_fields.items(): logger.warning(f"Field: {field}") for issue in set(issues): logger.warning(f" - {issue}") return {"success": success_count, "failed": len(failed_docs)} except BulkIndexError as bulk_error: # Handle BulkIndexError and log detailed information logger.error(f"BulkIndexError: {bulk_error}") failed_docs = bulk_error.errors # Log failed documents logger.error("\nDetailed failure report:") for error in failed_docs: error_info = error.get("index", {}).get("error", {}) doc_id = error.get("index", {}).get("_id", "N/A") logger.warning(f"Failed Document ID: {doc_id}") logger.warning(f"Error Type: {error_info.get('type', 'Unknown')}") logger.warning(f"Error Reason: {error_info.get('reason', 'Unknown')}") return {"success": success_count, "failed": len(failed_docs)} except Exception as e: logger.critical(f"Unexpected error during bulk ingestion: {e}") raise
# Example usage: # ingest_data_to_elasticsearch(temp_df, "annotations_myeloma")
[docs] def handle_inconsistent_dtypes(df: pd.DataFrame) -> pd.DataFrame: """Handles inconsistent data types in a DataFrame's columns. Iterates through each column, determines the majority data type (datetime, string, int, or float), and casts the entire column to that type. This is useful for cleaning data before ingestion into systems with strict schemas like Elasticsearch. Args: df: The DataFrame to process. Returns: The DataFrame with columns cast to their majority data type. """ for column in tqdm(df.columns, desc="Processing columns"): non_null_values = df[column].dropna() dt_count = ( non_null_values.apply(pd.to_datetime, errors="coerce").notnull().sum() ) str_count = non_null_values.apply(type).eq(str).sum() int_count = non_null_values.apply(type).eq(int).sum() float_count = non_null_values.apply(type).eq(float).sum() total_valid = dt_count + str_count + int_count + float_count if total_valid == 0: logger.warning(f"No valid data types found in column '{column}'") continue dt_percent = dt_count / total_valid str_percent = str_count / total_valid int_percent = int_count / total_valid float_percent = float_count / total_valid majority_dtype = max(dt_percent, str_percent, int_percent, float_percent) if dt_percent > 0.5: logger.info(f"Casting column '{column}' to datetime...") df[column] = pd.to_datetime(df[column], errors="ignore") else: logger.info(f"Casting column '{column}' to majority datatype...") if majority_dtype == dt_percent: df[column] = pd.to_datetime(df[column], errors="ignore") elif majority_dtype == str_percent: df[column] = df[column].astype(str, errors="ignore") elif majority_dtype == int_percent: df[column] = df[column].astype(int, errors="ignore") elif majority_dtype == float_percent: df[column] = df[column].astype(float, errors="ignore") return df
[docs] def guess_datetime_columns(df: pd.DataFrame, threshold: float = 0.5) -> List[str]: """Guesses which columns in a DataFrame are datetime columns. It iterates through each column and attempts to parse its values as datetimes. If the percentage of parsable values exceeds a given threshold, the column is considered a datetime column. Args: df: The DataFrame to analyze. threshold: The minimum percentage of values in a column that must be parsable as datetime for it to be considered a datetime column. Defaults to 0.5. Returns: A list of column names that are likely to be datetime columns. """ datetime_columns = [] for column in tqdm(df.columns, desc="Processing Columns"): parse_count = 0 total_count = 0 for value in df[column]: total_count += 1 # Skip parsing if the value is NaN if pd.isna(value): continue # Check if the value is a string, int, or float before attempting to parse as datetime if isinstance(value, str): try: pd.to_datetime(value) parse_count += 1 except ValueError: continue if parse_count / total_count >= threshold: datetime_columns.append(column) return datetime_columns
[docs] def get_guess_datetime_column( df: pd.DataFrame, threshold: float = 0.2 ) -> Optional[str]: """Finds the single column most likely to be a datetime column. This function iterates through all columns and calculates the ratio of values that can be parsed as a datetime. It returns the name of the column with the highest ratio, provided that ratio is above the specified threshold. Args: df: The DataFrame to analyze. threshold: The minimum percentage of values that must be parsable as datetime for a column to be considered. Defaults to 0.2. Returns: The name of the column most likely to contain datetimes, or None if no column meets the threshold. """ highest_ratio = 0 highest_column = None for column in tqdm(df.columns, desc="Processing Columns"): parse_count = 0 total_count = 0 for value in df[column]: total_count += 1 # Skip parsing if the value is NaN if pd.isna(value): continue # Check if the value is a string, int, or float before attempting to parse as datetime if isinstance(value, str): try: pd.to_datetime(value) parse_count += 1 except ValueError: continue if total_count > 0: parse_ratio = parse_count / total_count if parse_ratio >= threshold and parse_ratio > highest_ratio: highest_ratio = parse_ratio highest_column = column return highest_column