import time from collections import defaultdict class HistoryManager: _instance = None @classmethod def get_instance(cls): """Get the singleton instance of HistoryManager.""" if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self): if self._instance is not None: raise Exception("This class is a singleton!") self.user_histories = defaultdict(list) self.expiration_time = 1800 def add_history(self, user_id, query, response): """Add a user query and AI response to the history.""" timestamp = time.time() print(f"adding history {user_id} {query} {response}") self.user_histories[user_id].append({'timestamp': timestamp, 'query': query, 'response': response}) def get_history(self, user_id, limit=10, offset=0): """Retrieve a limited number of history entries for a given user ID with offset.""" if user_id not in self.user_histories.keys(): return [] histories = self.user_histories[user_id] return histories[-(offset + limit):-offset] if offset else histories[-limit:] # Get entries based on limit and offset def cleanup_histories(self): """Remove histories older than the expiration time.""" print("Cleaning") current_time = time.time() expired_keys = [user_id for user_id, histories in self.user_histories.items() if all((current_time - entry['timestamp']) > self.expiration_time for entry in histories)] print("Start Cleaning") for user_id in expired_keys: del self.user_histories[user_id] print(f"Removed history for user ID: {user_id}") # Debugging line