import os import shutil from typing import List, Dict from langchain.schema.document import Document from src.utils.utils import prepare_helpscout_data def remove_directory(dir_path: str): for root, dirs, files in os.walk(dir_path, topdown=False): for name in files: os.remove(os.path.join(root, name)) for name in dirs: os.rmdir(os.path.join(root, name)) os.rmdir(dir_path) def process_xlsx(content: str, file_name: str, db_type: str, root_dir: str, text_splitter,databases) -> Dict[str, List[Document]]: temp_xlsx_path = os.path.join(root_dir, file_name) with open(temp_xlsx_path, "wb") as f: f.write(content.encode('latin1')) # Assuming content is base64 encoded json_path, dir_path = prepare_helpscout_data(temp_xlsx_path) try: # Save the xlsx content to a temporary file print(f"received json path: {json_path}") print(f"received dir path: {dir_path}") documents = get_docs_from_json(json_path,file_name, db_type, text_splitter,databases) print("docs received") os.remove(temp_xlsx_path) remove_directory(dir_path) print("deleted files") except Exception as e: print(e) os.remove(temp_xlsx_path) remove_directory(dir_path) documents={} return documents def get_docs_from_json(file_path: str,file_name, db_type: str, text_splitter, database) -> Dict[str, List[Document]]: import pandas as pd print("getting docs from json") email_data = pd.read_json(file_path) print("json loaded successfully") documents = {} for index, row in email_data.iterrows(): tags_list = row['tags'] tags_str = ', '.join(tags_list) id_str = str(row['id']) documents[id_str] = [] if len(row['pageContent']) > 1500: doc = Document( page_content=row['pageContent'], metadata={ "database":database, "db_type": db_type, "title": file_name, "type": "xlsx", 'Summary': row['Summary'], 'Subject': str(row['subject']), 'createdAt': row['createdAt'], 'tags': tags_str, "web_url": "null", "parent": "null", "source": "null" } ) chunks = text_splitter.split_documents([doc]) for chunk_no, chunk in enumerate(chunks, start=1): chunk.page_content = f"{row['Summary']}: \n {chunk.page_content}" chunk.metadata['chunk_no'] = str(chunk_no) # Convert chunk_no to string documents[id_str].append(chunk) else: doc = Document( page_content=f"{row['Summary']}: \n {row['pageContent']}", metadata={ "database":database, "db_type": db_type, "type": "xlsx", 'Summary': row['Summary'], 'Subject': str(row['subject']), 'createdAt': row['createdAt'], 'tags': tags_str, "web_url": "null", "title": file_name, "parent": "null", "source": "null" } ) documents[id_str].append(doc) for doc_id, doc_list in documents.items(): for chunk in doc_list: metadata = chunk.metadata for key, value in metadata.items(): if value == '': print(f"Key: {key}, Value: {value}") chunk.metadata[key] = "null" return documents