from pat2vec.util.filter_methods import (
apply_bloods_data_type_filter,
filter_dataframe_by_fuzzy_terms,
)
from pat2vec.util.helper_functions import get_df_from_db
from pat2vec.util.methods_get import exist_check
import pandas as pd
from sqlalchemy import text
import logging
import os
from typing import Any
[docs]
def get_pat_batch_bloods(
current_pat_client_id_code: str,
search_term: str,
config_obj: Any,
cohort_searcher_with_terms_and_search: Any,
) -> pd.DataFrame:
"""Retrieves a batch of blood test observations for a patient.
Args:
current_pat_client_id_code: The patient's unique identifier.
search_term: The term to search for (currently unused).
config_obj: The main configuration object.
cohort_searcher_with_terms_and_search: The search function to use.
Returns:
A DataFrame containing the batch of blood test observations.
"""
overwrite_stored_pat_observations = config_obj.overwrite_stored_pat_observations
store_pat_batch_observations = config_obj.store_pat_batch_observations
if config_obj is None or not all(
hasattr(config_obj, attr)
for attr in [
"global_start_year",
"global_start_month",
"global_end_year",
"global_end_month",
]
):
raise ValueError("Invalid or missing configuration object.")
global_start_year = config_obj.global_start_year
global_start_month = config_obj.global_start_month
global_end_year = config_obj.global_end_year
global_end_month = config_obj.global_end_month
global_start_day = config_obj.global_start_day
global_end_day = config_obj.global_end_day
bloods_time_field = config_obj.bloods_time_field
batch_target = pd.DataFrame()
if config_obj.storage_backend == "database":
try:
table_name = "raw_bloods"
schema_name = "raw_data"
if not overwrite_stored_pat_observations:
df = get_df_from_db(
config_obj,
schema_name,
table_name,
patient_ids=[current_pat_client_id_code],
)
if not df.empty:
return df
except Exception as e:
logging.error(
f"Error with database backend for bloods for patient {current_pat_client_id_code}: {e}"
)
return pd.DataFrame()
batch_obs_target_path = os.path.join(
config_obj.pre_bloods_batch_path, str(current_pat_client_id_code) + ".csv"
)
existence_check = exist_check(batch_obs_target_path, config_obj)
should_fetch = False
if config_obj.storage_backend == "database":
should_fetch = True
elif (
store_pat_batch_observations and not existence_check or existence_check is False
):
should_fetch = True
try:
if should_fetch:
batch_target = cohort_searcher_with_terms_and_search(
index_name="basic_observations",
fields_list=[
"client_idcode",
"basicobs_itemname_analysed",
"basicobs_value_numeric",
"basicobs_entered",
"clientvisit_serviceguid",
"updatetime",
],
term_name=config_obj.client_idcode_term_name,
entered_list=[current_pat_client_id_code],
search_string=f"basicobs_value_numeric:* AND "
f"{bloods_time_field}:[{global_start_year}-{global_start_month}-{global_start_day} TO {global_end_year}-{global_end_month}-{global_end_day}]",
)
if config_obj.data_type_filter_dict is not None:
if (
config_obj.data_type_filter_dict.get("filter_term_lists").get(
"bloods"
)
is not None
):
if config_obj.verbosity >= 1:
logging.info(
"applying doc type filter to bloods",
config_obj.data_type_filter_dict,
)
filter_term_list = config_obj.data_type_filter_dict.get(
"filter_term_lists"
).get("bloods")
batch_target = filter_dataframe_by_fuzzy_terms(
batch_target,
filter_term_list,
column_name="basicobs_itemname_analysed",
verbose=config_obj.verbosity,
)
batch_target = apply_bloods_data_type_filter(config_obj, batch_target)
if config_obj.store_pat_batch_docs or overwrite_stored_pat_observations:
if config_obj.storage_backend == "database":
try:
engine = config_obj.db_engine
if engine:
with engine.begin() as connection:
table_name = "raw_bloods"
schema_name = "raw_data"
db_table = (
f"{schema_name}_{table_name}"
if engine.name == "sqlite"
else table_name
)
db_schema = (
None if engine.name == "sqlite" else schema_name
)
if overwrite_stored_pat_observations:
del_query = text(
f"DELETE FROM {db_table if engine.name == 'sqlite' else f'{schema_name}.{table_name}'} WHERE client_idcode = :pat_id"
)
connection.execute(
del_query,
{"pat_id": current_pat_client_id_code},
)
batch_target.to_sql(
name=db_table,
con=connection,
schema=db_schema,
if_exists="append",
index=False,
)
except Exception as e:
logging.error(f"Failed to save bloods batch to DB: {e}")
else:
batch_target.to_csv(batch_obs_target_path)
else:
batch_target = pd.read_csv(batch_obs_target_path)
return batch_target
except Exception as e:
""""""
logging.error(f"Error retrieving batch blood test-related observations: {e}")
return pd.DataFrame()