Building a Multi-Agent AI System from scratch for Medical Text Processing

Plaban Nayak
The AI Forum
Published in
12 min readNov 16, 2024

Introduction

In the rapidly evolving field of AI, multi-agent systems are gaining traction for their ability to handle complex tasks through collaboration. This article explores the architecture, workflow, and future scope of a multi-agent AI application designed to process medical texts. Built using Streamlit for the frontend and leveraging the Llama-3.2:3b model via Ollama, this system includes agents for summarizing medical texts, writing research articles, and redacting Protected Health Information (PHI).

When deciding between building a multi-agent system from scratch or using existing agentic frameworks like CrewAI, Autogen, or OpenAI Swarm, it’s important to weigh the benefits and trade-offs of each approach. Here’s a breakdown of the advantages of building a system from scratch versus using a framework:

Building a Multi-Agent System from Scratch

Benefits:

  1. Customization and Flexibility:
  • Tailored Solutions: Building from scratch allows for a highly customized solution tailored to specific needs and requirements. You have full control over the architecture, design, and implementation details.
  • Flexibility: You can implement unique features and functionalities that may not be supported by existing frameworks.

2. Deep Understanding:

  • In-Depth Knowledge: Developing a system from the ground up provides a deep understanding of the underlying mechanics and architecture, which can be beneficial for troubleshooting and optimization.
  • Learning Opportunity: It’s a great learning experience for developers, enhancing their skills in system design, architecture, and AI.

3. No Dependency on External Frameworks:

  • Independence: You are not tied to the limitations or updates of a third-party framework, which can sometimes introduce breaking changes or deprecate features.
  • Security and Privacy: You have full control over data handling and security measures, which is crucial for sensitive applications like medical data processing.

4. Optimized Performance:

  • Performance Tuning: You can optimize the system for specific performance metrics, such as speed or resource usage, without the overhead that might come with a generic framework.

5. Cost Efficiency:

  • No Licensing Fees: Avoid potential costs associated with using commercial frameworks, which might charge for advanced features or enterprise support.

Using Agentic Frameworks (CrewAI, Autogen, OpenAI Swarm)

Benefits:

  1. Rapid Development:
  • Time-Saving: Frameworks provide pre-built components and functionalities, significantly reducing development time.
  • Proven Solutions: Leverage tried-and-tested solutions that have been optimized and debugged by a community or company.

2. Scalability:

  • Built-In Scalability: Many frameworks are designed to handle scalability issues, making it easier to expand the system as needed.
  • Cloud Integration: Often come with built-in support for cloud services, facilitating deployment and scaling.

3. Community and Support:

  • Active Community: Access to a community of developers and users who can provide support, share best practices, and contribute to the framework’s development.
  • Documentation and Tutorials: Comprehensive documentation and tutorials can help accelerate the learning curve.

4. Advanced Features:

  • State-of-the-Art Capabilities: Frameworks often include advanced features like natural language processing, machine learning integration, and multi-agent coordination out of the box.
  • Continuous Updates: Benefit from continuous improvements and updates from the framework’s maintainers.

5. Interoperability:

  • Integration with Other Tools: Frameworks often provide easy integration with other tools and services, enhancing the system’s capabilities.

Architecture Of the Workflow

The architecture of this application is modular, consisting of several key components:

  • Frontend (Streamlit): Provides an intuitive web interface for user interaction. Users can select tasks, input data, and view results.
  • Agent Manager: Acts as the central coordinator, delegating tasks to the appropriate main agents and their corresponding validator agents.

Main Agents:

  • Summarize Agent: Generates summaries of medical texts.
  • Write Article Agent: Creates drafts of research articles.
  • Redact Data Agent: Masks PHI in medical data.

Validator Agents:

  • Summarize Validator Agent: Assesses the quality of summaries.
  • Refiner Agent: Enhances drafts for better quality.
  • Redact PHI Validator Agent: Ensures all PHI has been properly masked.
  • Logger: Records all interactions, inputs, outputs, and errors for monitoring and debugging.

Technology Stack Used for Implementation

The application is built using a combination of modern technologies that ensure efficiency, scalability, and ease of use:

  • Streamlit: A powerful and easy-to-use framework for building web applications in Python. It provides an intuitive interface for users to interact with the AI system, making it accessible even to those without technical expertise.
  • Ollama: A platform that facilitates the use of large language models like Llama, Mistral, Gemma etc.. It allows seamless integration of AI models into applications, enabling complex natural language processing tasks.
  • Python: The primary programming language used for developing the application. Python’s rich ecosystem of libraries and frameworks makes it ideal for AI and web development.
  • Asyncio: A Python library used to write concurrent code using the async/await syntax. It helps in managing asynchronous tasks efficiently, which is crucial for handling multiple AI agents simultaneously.

Workflow

The workflow of the application is as follows:

  • User Interaction: Users interact with the system via the Streamlit interface, selecting tasks and providing input data.
  • Task Delegation: The Agent Manager receives the task request and delegates it to the appropriate main agent.
  • Processing: The main agent processes the input data using the LLaMA model to generate the desired output.
  • Validation: The output is then passed to the corresponding validator agent, which evaluates the quality and accuracy of the results. A validation score out of 5 is provided for quick assessment.
  • Result Display: The final validated content is displayed to the user, with options to export the results or return to the main page.

Code Implementation

Folder structure

medical_ai_agents/
├── agents/
│ ├── __init__.py
│ ├── base_agent.py
│ ├── main_agents.py
│ └── validator_agents.py
├── core/
│ ├── __init__.py
│ ├── agent_manager.py
│ └── logger.py
├── utils/
│ ├── __init__.py
│ └── ollama_utils.py
├── app.py
└── requirements.txt

Install required dependencies

pip install -r requirements.txt

Create Agents

base_agent.py

from abc import ABC, abstractmethod
from typing import Any, Dict
import ollama

class BaseAgent(ABC):
def __init__(self, model_name: str = "llama3.2:3b"):
self.model_name = model_name

async def get_completion(self, prompt: str) -> str:
try:
response = ollama.chat(model=self.model_name, messages=[
{'role': 'user', 'content': prompt}
])
return response['message']['content']
except Exception as e:
raise Exception(f"Error getting completion: {str(e)}")

class MainAgent(BaseAgent):
@abstractmethod
async def process(self, input_data: Any) -> Dict[str, Any]:
pass

class ValidatorAgent(BaseAgent):
@abstractmethod
async def validate(self, input_data: Any, output_data: Any) -> Dict[str, bool]:
pass

main_agent.py

from typing import Any, Dict
from .base_agent import MainAgent

class SummarizeAgent(MainAgent):
async def process(self, input_data: str) -> Dict[str, Any]:
prompt = f"Summarize the following medical text:\n\n{input_data}"
summary = await self.get_completion(prompt)
return {"summary": summary}

class WriteArticleAgent(MainAgent):
async def process(self, input_data: Dict[str, str]) -> Dict[str, Any]:
prompt = f"""Write a research article with the following:
Topic: {input_data['topic']}
Key points: {input_data['key_points']}"""
article = await self.get_completion(prompt)
return {"article": article}

class SanitizeDataAgent(MainAgent):
async def process(self, input_data: str) -> Dict[str, Any]:
prompt = """Mask all Protected Health Information (PHI) in the following text.
Replace with appropriate masks:
- Patient names with [PATIENT_NAME]
- Doctor/Provider names with [PROVIDER_NAME]
- Dates with [DATE]
- Locations/Addresses with [LOCATION]
- Phone numbers with [PHONE]
- Email addresses with [EMAIL]
- Medical record numbers with [MRN]
- Social Security numbers with [SSN]
- Device identifiers with [DEVICE_ID]
- Any other identifying numbers with [ID]
- Physical health conditions with [HEALTH_CONDITION]
- Medications with [MEDICATION]
- Lab results with [LAB_RESULT]
- Vital signs with [VITAL_SIGN]
- Procedures with [PROCEDURE]

Text to mask:\n\n""" + input_data
sanitized_data = await self.get_completion(prompt)
return {"sanitized_data": sanitized_data}

validator_agent.py

from typing import Any, Dict
from .base_agent import ValidatorAgent

class SummarizeValidatorAgent(ValidatorAgent):
async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
prompt = f"""Evaluate if this summary accurately represents the original text:
Original: {input_data}
Summary: {output_data['summary']}

Provide:
1. A score out of 5 (where 5 is perfect)
2. 'valid' or 'invalid'
3. Brief explanation

Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""

result = await self.get_completion(prompt)
is_valid = "valid" in result.lower()
return {"is_valid": is_valid, "feedback": result}

class RefinerAgent(ValidatorAgent):
async def validate(self, input_data: Dict[str, str], output_data: Dict[str, Any]) -> Dict[str, bool]:
prompt = f"""Review this research article for quality and accuracy:
Article: {output_data['article']}

Provide:
1. A score out of 5 (where 5 is perfect)
2. 'valid' or 'invalid'
3. Brief explanation

Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""

result = await self.get_completion(prompt)
is_valid = "valid" in result.lower()
return {"is_valid": is_valid, "feedback": result}

class SanitizeValidatorAgent(ValidatorAgent):
async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
prompt = f"""Verify if all Protected Health Information (PHI) has been properly masked in this text:
Masked text: {output_data['sanitized_data']}

Check for any unmasked:
- Patient names
- Doctor/Provider names
- Dates
- Locations/Addresses
- Phone numbers
- Email addresses
- Medical record numbers
- Social Security numbers
- Device identifiers
- Other identifying numbers
- Physical health conditions
- Medications
- Lab results
- Vital signs
- Procedures

Provide:
1. A score out of 5 (where 5 means all PHI properly masked)
2. 'valid' or 'invalid'
3. List any found unmasked PHI

Format: Score: X/5\nStatus: valid/invalid\nFindings: ..."""

result = await self.get_completion(prompt)
is_valid = "valid" in result.lower()
return {"is_valid": is_valid, "feedback": result}

Core Functions

agent_manager.py

from typing import Dict, Any
from agents.main_agents import SummarizeAgent, WriteArticleAgent, SanitizeDataAgent
from agents.validator_agents import SummarizeValidatorAgent, RefinerAgent, SanitizeValidatorAgent
from core.logger import Logger

class AgentManager:
def __init__(self):
self.logger = Logger()

# Initialize main agents
self.summarize_agent = SummarizeAgent()
self.write_article_agent = WriteArticleAgent()
self.sanitize_agent = SanitizeDataAgent()

# Initialize validator agents
self.summarize_validator = SummarizeValidatorAgent()
self.refiner_agent = RefinerAgent()
self.sanitize_validator = SanitizeValidatorAgent()

async def process_task(self, task_type: str, input_data: Any) -> Dict[str, Any]:
try:
self.logger.log_input(task_type, input_data)

if task_type == "summarize":
result = await self.summarize_agent.process(input_data)
validation = await self.summarize_validator.validate(input_data, result)

elif task_type == "write_article":
result = await self.write_article_agent.process(input_data)
validation = await self.refiner_agent.validate(input_data, result)

elif task_type == "sanitize":
result = await self.sanitize_agent.process(input_data)
validation = await self.sanitize_validator.validate(input_data, result)
else:
raise ValueError(f"Unknown task type: {task_type}")

self.logger.log_output(task_type, result, validation)
return {"result": result, "validation": validation}

except Exception as e:
self.logger.log_error(task_type, str(e))
raise

logger.py

import logging
from datetime import datetime
from typing import Any, Dict

class Logger:
def __init__(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('medical_ai_agents.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)

def log_input(self, task_type: str, input_data: Any):
self.logger.info(f"Task: {task_type} - Input received at {datetime.now()}")

def log_output(self, task_type: str, result: Dict[str, Any], validation: Dict[str, bool]):
self.logger.info(f"Task: {task_type} - Output generated at {datetime.now()}")
self.logger.info(f"Validation result: {validation['is_valid']}")

def log_error(self, task_type: str, error_message: str):
self.logger.error(f"Task: {task_type} - Error: {error_message}")

streamlit application

app.py

import streamlit as st
import asyncio
from core.agent_manager import AgentManager

# Set page configuration with custom theme
st.set_page_config(
page_title="Medical AI Agents",
layout="wide",
initial_sidebar_state="expanded"
)

# Custom CSS for styling
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
color: white;
text-align: center;
padding: 1.5rem;
margin-bottom: 1rem;
font-weight: bold;
background: linear-gradient(120deg, #1E88E5 0%, #1565C0 100%);
border-radius: 10px;
box-shadow: 0 2px 10px rgba(30,136,229,0.2);
}
.sub-header {
font-size: 1.8rem;
color: #0D47A1;
padding: 0.5rem 0;
border-bottom: 2px solid #1E88E5;
margin-bottom: 1rem;
}
.task-container {
background-color: #F8F9FA;
padding: 2rem;
border-radius: 10px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.result-box {
background-color: white;
padding: 1.5rem;
border-radius: 8px;
border-left: 4px solid #1E88E5;
margin: 1rem 0;
}
.validation-box {
padding: 1rem;
border-radius: 8px;
margin-top: 1rem;
}
.stButton>button {
background-color: #1E88E5;
color: white;
border-radius: 25px;
padding: 0.5rem 2rem;
border: none;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
transition: all 0.3s ease;
}
.stButton>button:hover {
background-color: #1565C0;
box-shadow: 0 4px 8px rgba(0,0,0,0.2);
transform: translateY(-2px);
}
.stTextArea>div>div {
border-radius: 8px;
border: 2px solid #E3F2FD;
}
.sidebar-content {
padding: 1rem;
background-color: #F8F9FA;
border-radius: 8px;
}
</style>
""", unsafe_allow_html=True)

@st.cache_resource
def get_agent_manager():
return AgentManager()

def show_results_page(result_data):
st.markdown("<h1 class='main-header'>Final Validated Content</h1>", unsafe_allow_html=True)

# Add a subheader based on the content type
if "summary" in result_data["result"]:
st.markdown("<h2 class='sub-header'>Medical Text Summary</h2>", unsafe_allow_html=True)
elif "article" in result_data["result"]:
st.markdown("<h2 class='sub-header'>Research Article</h2>", unsafe_allow_html=True)
elif "sanitized_data" in result_data["result"]:
st.markdown("<h2 class='sub-header'>Redacted PHI Content</h2>", unsafe_allow_html=True)

# Display content in a styled box
st.markdown("<div class='result-box'>", unsafe_allow_html=True)
if "summary" in result_data["result"]:
st.write(result_data["result"]["summary"])
elif "article" in result_data["result"]:
st.write(result_data["result"]["article"])
elif "sanitized_data" in result_data["result"]:
st.write(result_data["result"]["sanitized_data"])
st.markdown("</div>", unsafe_allow_html=True)

# Action buttons in columns
col1, col2, col3 = st.columns([1, 1, 1])
with col2:
# Export button
if st.button("📥 Export Results"):
export_data = ""
if "summary" in result_data["result"]:
export_data = result_data["result"]["summary"]
elif "article" in result_data["result"]:
export_data = result_data["result"]["article"]
elif "sanitized_data" in result_data["result"]:
export_data = result_data["result"]["sanitized_data"]

st.download_button(
label="💾 Download Content",
data=export_data,
file_name="final_content.txt",
mime="text/plain"
)

with col3:
# Return button
if st.button("🏠 Return to Main Page"):
st.session_state.show_results = False
st.rerun()

def main():
# Sidebar styling
with st.sidebar:
st.markdown("<h2 style='text-align: center; color: #1E88E5;'>Tasks</h2>", unsafe_allow_html=True)
st.markdown("<div class='sidebar-content'>", unsafe_allow_html=True)
task_type = st.radio(
"", # Empty label as we're using custom header
["summarize", "write_article", "Redact PHI"],
format_func=lambda x: {
"summarize": "📝 Summarize Medical Text",
"write_article": "📚 Write Research Article",
"Redact PHI": "🔒 Redact PHI"
}[x]
)
st.markdown("</div>", unsafe_allow_html=True)

# Main content - Single header for the entire page
st.markdown("<h1 class='main-header'>Medical Multi-Agent System</h1>", unsafe_allow_html=True)

# Initialize session state
if 'show_results' not in st.session_state:
st.session_state.show_results = False
if 'result_data' not in st.session_state:
st.session_state.result_data = None

if st.session_state.show_results:
show_results_page(st.session_state.result_data)
return

agent_manager = get_agent_manager()

# Task containers with consistent styling
st.markdown("<div class='task-container'>", unsafe_allow_html=True)

if task_type == "summarize":
st.markdown("<h2 class='sub-header'>📝 Summarize Medical Text</h2>", unsafe_allow_html=True)
input_text = st.text_area("Enter medical text to summarize", height=200)
col1, col2 = st.columns(2)

with col1:
if st.button("🔄 Generate Summary"):
with st.spinner("Processing..."):
result = asyncio.run(agent_manager.process_task("summarize", input_text))
st.session_state.result_data = result
st.markdown("<div class='result-box'>", unsafe_allow_html=True)
st.subheader("Summary")
st.write(result["result"]["summary"])
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
st.subheader("Validation")

# Extract and display score
feedback = result["validation"]["feedback"]
if "Score:" in feedback:
score = feedback.split("Score:")[1].split("\n")[0].strip()
st.markdown(f"""
<div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
<h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
</div>
""", unsafe_allow_html=True)

st.write(feedback)
st.markdown("</div>", unsafe_allow_html=True)

with col2:
if st.session_state.result_data and st.button("👁️ View Edited Content"):
st.session_state.show_results = True
st.rerun()

elif task_type == "write_article":
st.markdown("<h2 class='sub-header'>📚 Write Research Article</h2>", unsafe_allow_html=True)
topic = st.text_input("Enter research topic")
key_points = st.text_area("Enter key points (one per line)", height=150)
col1, col2 = st.columns(2)

with col1:
if st.button("📝 Generate Article"):
with st.spinner("Processing..."):
input_data = {"topic": topic, "key_points": key_points}
result = asyncio.run(agent_manager.process_task("write_article", input_data))
st.session_state.result_data = result
st.markdown("<div class='result-box'>", unsafe_allow_html=True)
st.subheader("Article")
st.write(result["result"]["article"])
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
st.subheader("Validation")

# Extract and display score
feedback = result["validation"]["feedback"]
if "Score:" in feedback:
score = feedback.split("Score:")[1].split("\n")[0].strip()
st.markdown(f"""
<div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
<h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
</div>
""", unsafe_allow_html=True)

st.write(feedback)
st.markdown("</div>", unsafe_allow_html=True)

with col2:
if st.session_state.result_data and st.button("👁️ View Edited Content"):
st.session_state.show_results = True
st.rerun()

elif task_type == "Redact PHI":
st.markdown("<h2 class='sub-header'>🔒 Redact Protected Health Information (PHI)</h2>", unsafe_allow_html=True)
input_text = st.text_area("Enter medical text to redact PHI", height=200)
col1, col2 = st.columns(2)

with col1:
if st.button("🔐 Redact PHI"):
with st.spinner("Processing..."):
result = asyncio.run(agent_manager.process_task("sanitize", input_text))
st.session_state.result_data = result
st.markdown("<div class='result-box'>", unsafe_allow_html=True)
st.subheader("Redacted Text")
st.write(result["result"]["sanitized_data"])
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
st.subheader("Validation")

# Extract and display score
feedback = result["validation"]["feedback"]
if "Score:" in feedback:
score = feedback.split("Score:")[1].split("\n")[0].strip()
st.markdown(f"""
<div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
<h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
</div>
""", unsafe_allow_html=True)

st.write(feedback)
st.markdown("</div>", unsafe_allow_html=True)

with col2:
if st.session_state.result_data and st.button("👁️ View Edited Content"):
st.session_state.show_results = True
st.rerun()

st.markdown("</div>", unsafe_allow_html=True)

if __name__ == "__main__":
main()

Run the Streamlit application

streamlit run app.py

The streamlit screen is instantiated

You can now view your Streamlit app in your browser.

Local URL: http://localhost:8501

Streamlit UI

Summary Generated

Validation results of the summary generated

View Validated Content

Click on Export Results to downlaod the results

Write Research Article Task

Generated Article

Validation Result

View Final Edited Content

Redact PHI Task

Redacted Text

Validation Score of the Redacted Text

View Final Edited Content

Future Scope for Improvement

While the current system is robust, there are several areas for future enhancement:

  • Enhanced Validation Metrics: Incorporating more detailed validation metrics and feedback mechanisms could improve the reliability of the outputs.
  • Additional Agents: Expanding the system to include more specialized agents, such as those for data visualization or predictive analytics, could broaden its applicability.
  • User Feedback Loop: Implementing a feedback loop where users can provide input on the quality of the outputs could help refine the models over time.
  • Scalability: Optimizing the system for scalability to handle larger datasets and more concurrent users would be beneficial as the application grows.
  • Integration with Other Tools: Integrating with other medical data processing tools or databases could enhance the system’s utility in real-world applications.

Conclusion

This multi-agent AI system demonstrates the power of collaborative agents in processing complex medical texts. By leveraging advanced models like LLaMA and providing a user-friendly interface through Streamlit, the application offers a practical solution for tasks such as summarization, article writing, and PHI redaction. As AI continues to evolve, systems like this will play a crucial role in transforming how we handle and interpret medical data.

connect with me

The entire content has been prepared by browsing resources online. In no way the experiment conducted is proprietary.

--

--

The AI Forum
The AI Forum

Published in The AI Forum

Its AI forum where all the topics spread across Data Analytics, Data Science, Machine Learning, Deep Learning are discussed.

Plaban Nayak
Plaban Nayak

Written by Plaban Nayak

Machine Learning and Deep Learning enthusiast

Responses (2)