Building a Serverless Application with AWS Lambda and Qdrant for Semantic Search

Benito Martin
10 min readJun 17, 2024

--

Author: Benito Martin

In this post, I’ll explain how to build a serverless application to perform semantic search over academic papers using AWS Lambda and Qdrant. I used LangChain and OpenAI’s embeddings to create vector representations of document chunks and store them in Qdrant. A simple shell script helps build and push the Docker image to AWS ECR and deploy it as an AWS Lambda function. After testing the Lambda function, I created an API Gateway endpoint and built a Streamlit application to interact with our Lambda function.

The complete code can be found in this repository.

Let’s dive in!!!

Prerequisites

Before we start, ensure you have the following:

  1. An AWS account with necessary permissions to create Lambda functions and ECR repositories: AWSLambda_FullAccess, AmazonEC2ContainerRegistryFullAccess
  2. One Qdrant cluster and the corresponding API Key and URL
  3. OpenAI API key
  4. Docker installed for building and pushing the Lambda Function and Docker image

Step 1: Setting Up Environment Variables

Create a .env file in your project directory with the following content:

OPENAI_API_KEY=your_openai_api_key
QDRANT_API_KEY=your_qdrant_api_key
QDRANT_URL=your_qdrant_url
COLLECTION_NAME=your_collection_name
AWS_ACCOUNT_ID=your_aws_account_id
AWS_REGION=your_aws_region
REPOSITORY_NAME=your_ecr_repository_name
LAMBDA_FUNCTION_NAME=your_lambda_function_name
LAMBDA_ROLE_ARN=your_lambda_role_arn
API_ENDPOINT=your_api_gateway_endpoint

The LAMBDA_ROLE_ARN is previously created in the AWS account. The API_ENDPOINT will be provided once you create the endpoint, which will be explained in Step 7.

Step 2: Creating the Vector Store

File name: create_vector_store.py

To create the vector store, we first need to load our academic paper. Fortunately, all papers follow the same structure, so by selecting the corresponding paper number, the following code will fetch it and save it locally on your computer.

def download_pdf_paper_from_arxiv(paper_number):

url = f"https://arxiv.org/pdf/{paper_number}.pdf"
res = requests.get(url)
pdf_path = f"{paper_number}.pdf"
with open(pdf_path, 'wb') as f:
f.write(res.content)
return pdf_path

Next, we need to check if the collection exists in our cluster. If it doesn’t, it will be created.

def create_collection_if_not_exists(client, collection_name):

try:
collections = client.get_collections()
if collection_name not in [col.name for col in collections.collections]:
client.create_collection(
collection_name=collection_name,
vectors_config={
"content": VectorParams(size=1536, distance=Distance.COSINE)
}
)
print(f"Collection '{collection_name}' created.")
else:
print(f"Collection '{collection_name}' already exists.")
except ResponseHandlingException as e:
print(f"Error checking or creating collection: {e}")

All academic papers contain the same structure, so each document that is loaded will contain the page content, source, and page. As metadata, I selected these three and added an additional unique identifier to easily find the specific chunk.

def chunked_metadata(data, client, collection_name):

chunked_metadata = []

for item in data:
content = item.page_content

id = str(uuid4())
source = item.metadata["source"]
page = item.metadata["page"]

content_vector = embedding.embed_documents([content])[0]
vector_dict = {"content": content_vector}

payload = {
"page_content": content,
"metadata": {
"id": id,
"page_content": content,
"source": source,
"page": page,
}
}

metadata = PointStruct(id=id, vector=vector_dict, payload=payload)
chunked_metadata.append(metadata)

if chunked_metadata:
client.upsert(
collection_name=collection_name,
wait=True,
points=chunked_metadata
)

print(f"{len(chunked_metadata)} Chunked metadata upserted.")

Now it’s time to trigger the function by selecting the corresponding paper number, and the chunks will be ready to be retrieved in the cluster collection.

python create_vector_store.py - paper_number 1706.03762
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process an ArXiv paper and upsert metadata into Qdrant.")
parser.add_argument("--paper_number", required=True, help="The ArXiv paper number (e.g., 1706.03762)")
args = parser.parse_args()

paper_number = args.paper_number

# Download and process the paper
pdf_path = download_pdf_paper_from_arxiv(paper_number)

# Load documents
loader = PyPDFLoader(pdf_path)

# Embed and store documents in Qdrant
embedding = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)

# Initialize Qdrant client
client = QdrantClient(
url=QDRANT_URL,
api_key=QDRANT_API_KEY
)

# Create collection if it does not exist
create_collection_if_not_exists(client, COLLECTION_NAME)

# Split documents
text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
document = loader.load_and_split(text_splitter)

# Upsert documents in vector store
chunked_metadata(document, client, COLLECTION_NAME)

Step 3: Creating the RAG Application

File name: rag_app.py

Now we need to create a function for retrieval and generation of responses using the Qdrant vector store as a retriever and an LLM model, in our case GPT-3.5-turbo.

def rag_retrieve_and_generate(query, collection_name):

# Initialize vector store
vectorstore = Qdrant(client=client,
collection_name=collection_name,
embeddings=embedding,
vector_name="content")

# Define the prompt template
template = """

You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \


Question: {question}
Context: {context}

Answer:

"""

# Initialize retriever
retriever = vectorstore.as_retriever()

# Create prompt using the template
prompt = ChatPromptTemplate.from_template(template)

# Initialize the LLM (GPT-3.5-turbo)
llm35 = ChatOpenAI(temperature=0.0,
model="gpt-3.5-turbo",
max_tokens=512)


# Create a retrieval QA chain
qa_d35 = RetrievalQA.from_chain_type(llm=llm35,
chain_type="stuff",
chain_type_kwargs = {"prompt": prompt},
retriever=retriever)

# Invoke the chain with the query to get the result
result = qa_d35.invoke({"query": query})["result"]
return result

This function initializes the vector store, defines a prompt template for the LLM, and sets up the retrieval QA chain. It then invokes the chain with the provided query to retrieve context from Qdrant and generate an answer using GPT-3.5-turbo.

Here is an example of how to use this function:

if __name__ == "__main__":
# Example usage
collection_name = COLLECTION_NAME
query = "What is the attention mechanism?"
print(f"Response: {rag_retrieve_and_generate(query, collection_name)}")

Step 4: Creating the AWS Lambda

File name: lambda_function.py

Next, we need to create an AWS Lambda function that processes incoming events and generates responses using a retrieval and generation method.

Why Lambda?

AWS Lambda is a serverless compute service that allows you to run code without provisioning or managing servers. Lambda functions are event-driven, executing the code in response to events, and can be triggered by various AWS services, in our case, AWS API Gateway. Some other benefits of Lambda functions are:

  • Automatic Scaling: Scales the applications by running code in parallel in response to each event. As default it provides 1'000 concurrent executions but the quota can be increased based on needs.
  • Stateless: Each Lambda function invocation is stateless, meaning each execution is independent.
  • No Server Management: The cloud provider AWS handles all the underlying infrastructure.

For this specific use case, this type of architecture offers numerous benefits:

  1. Handling Simultaneously Requests: Due to simplified automatic scaling, which handle the incoming workload and adjust it accordingly.
  2. Speeding Up Deployment: Rapid deployment and iteration of code, enabling continuous integration and continuous deployment (CI/CD) practices.
  3. Cost Efficiency: You only pay for the compute time your code consumes.
  4. Integration with Other AWS Services: Seamless integration with other AWS services such AWS API Gateway.
  5. Improved Focus on Business Logic: Developers can focus more on writing and optimizing business logic, developing features, and enhancing the application, instead of managing infrastructure.

Lambda Function Code

Now, if we focus on the code, this Lambda function will be responsible for handling HTTP requests (events) made to our REST API, parsing the input, invoking our retrieval and generation function, and returning the response. The lambda_handler function is defined to handle incoming Lambda events. This is the main entry point for the Lambda function. We log the incoming event for debugging purposes.

def lambda_handler(event, context):

# Log the incoming event for debugging
print("Received event:", json.dumps(event))

try:
# Parse the body if it exists
if 'body' in event:
body = json.loads(event['body'])
print(f"Parsed body: {body}")
query = body.get('query', '')
collection_name = body.get('collection_name', COLLECTION_NAME)
else:
# Handle direct invocation with query parameters
query = event.get('query', '')
collection_name = event.get('collection_name', COLLECTION_NAME)

print(f"Query: {query}, Collection Name: {collection_name}")

if not query:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Query parameter is missing'})
}

# Generate the response using the RAG (Retrieve and Generate) method
response = rag_retrieve_and_generate(query, collection_name)
print(f"Response: {response}")

return {
'statusCode': 200,
'body': json.dumps({'response': response})
}
except json.JSONDecodeError as e:
print(f"JSON decode error: {str(e)}")
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid JSON format', 'details': str(e)})
}
except Exception as e:
print(f"Error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error', 'details': str(e)})
}

This Lambda function integrates seamlessly with AWS API Gateway (configured as a REST API) to handle HTTP requests, invoke the retrieval and generation logic, and return the results in a structured format.

Step 5: Creating and Deploying the Docker Image

File name: build_and_deploy.sh

Now we need a shell script to build and push the Docker image to AWS ECR and deploy it as an AWS Lambda function. This step involves several important tasks: building the Docker image, authenticating with AWS ECR, tagging the Docker image, pushing it to the ECR repository, and finally creating or updating the Lambda function with the new Docker image.

#!/bin/bash

# Load environment variables from .env file
set -o allexport
source .env
set -o allexport

# Check if the ECR repository exists, create it if it does not
REPO_EXISTS=$(aws ecr describe-repositories --repository-names ${REPOSITORY_NAME} --region ${AWS_REGION} 2>&1)

if [[ $REPO_EXISTS == *"RepositoryNotFoundException"* ]]; then
echo "Repository ${REPOSITORY_NAME} does not exist. Creating..."
aws ecr create-repository --repository-name ${REPOSITORY_NAME} --region ${AWS_REGION}
else
echo "Repository ${REPOSITORY_NAME} already exists."
fi

# Build Docker image
echo "Building Docker image ${IMAGE_NAME}..."
docker build -t ${IMAGE_NAME} .

# Authenticate Docker to your Amazon ECR registry
echo "Authenticating Docker to ECR..."
aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com

# Tag the Docker image
echo "Tagging Docker image..."
docker tag ${IMAGE_NAME}:latest ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${REPOSITORY_NAME}:latest

# Push the Docker image to Amazon ECR
echo "Pushing Docker image to ECR..."
docker push ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${REPOSITORY_NAME}:latest

# Check if the Lambda function exists, create it if it does not
FUNCTION_EXISTS=$(aws lambda get-function --function-name ${LAMBDA_FUNCTION_NAME} --region ${AWS_REGION} 2>&1)

if [[ $FUNCTION_EXISTS == *"ResourceNotFoundException"* ]]; then
echo "Lambda function ${LAMBDA_FUNCTION_NAME} does not exist. Creating..."
aws lambda create-function \
--function-name ${LAMBDA_FUNCTION_NAME} \
--package-type Image \
--code ImageUri=${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${REPOSITORY_NAME}:latest \
--role ${LAMBDA_ROLE_ARN} \
--region ${AWS_REGION}
else
echo "Lambda function ${LAMBDA_FUNCTION_NAME} already exists. Updating..."
aws lambda update-function-code --function-name ${LAMBDA_FUNCTION_NAME} --image-uri ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${REPOSITORY_NAME}:latest
fi

echo "Deployment complete."

By automating this process, we ensure a smooth deployment by handling all necessary steps and checks. You should run these commands on the terminal. A specific Dockerfile is available in the repo with all requirements.

chmod +x build_and_deploy.sh
./build_and_deploy.sh
# Dockerfile
FROM public.ecr.aws/lambda/python:3.10

# Copy function code
COPY lambda_function.py ${LAMBDA_TASK_ROOT}
COPY rag_app.py ${LAMBDA_TASK_ROOT}
COPY requirements.txt ${LAMBDA_TASK_ROOT}
COPY .env ${LAMBDA_TASK_ROOT}

# Install dependencies
RUN pip install -r ${LAMBDA_TASK_ROOT}/requirements.txt

# Command to run the Lambda function
CMD ["lambda_function.lambda_handler"]

Step 6: Testing the Lambda Function

File name: test_lambda.py

Before we move forward to the Streamlit app, we need to ensure that our Lambda function is correctly set up and working. The following test script will invoke the AWS Lambda function and print the response.

# Initialize a session using Amazon Lambda
client = boto3.client('lambda', region_name=AWS_REGION)

# Create a payload
payload = {
"query": "Positional Encoding",
"collection_name": COLLECTION_NAME
}

# Log the payload for debugging
print("Payload:", json.dumps(payload, indent=4))

# Invoke the Lambda function
response = client.invoke(
FunctionName=LAMBDA_FUNCTION_NAME,
InvocationType='RequestResponse',
Payload=json.dumps(payload)
)

# Print the response
response_payload = json.loads(response['Payload'].read())
print(json.dumps(response_payload, indent=4))

If everything is working correctly, you should see the following output. If there are any issues, the logs on the terminal and AWS CloudWatch will help in debugging and identifying the root cause.

# Output
Payload: {
"query": "Positional Encoding",
"collection_name": "arxiv-collection"
}
{
"statusCode": 200,
"body": "{\"response\": \"Positional encoding is a technique used in models with no recurrence or convolution to inject information about the relative or absolute position of tokens in a sequence. It involves adding positional encodings to input embeddings at the bottoms of encoder and decoder stacks. In this work, sine and cosine functions of different frequencies are used to create positional encodings. The choice of positional encodings can be learned or fixed.\"}"
}

Step 7: Create an API Gateway Endpoint

Once the lambda function is working, we need to create an API Gateway endpoint to interact with your Lambda function. Follow these steps to create the endpoint:

1. Navigate to the API Gateway Console:

  • Go to the AWS Management Console.
  • Open the API Gateway service.

2. Create a New REST API:

  • Click on “Create API”.
  • Select “REST API” and then click “Build”.
  • Choose “New API”.
  • Provide a name for your API.
  • Click “Create API”.

3. Create a Resource:

  • Click on “Create Resource”.
  • Provide a Resource Name (query)
  • Click “Create Resource”.

4. Create a Method:

  • Select the newly created resource (/query).
  • Click on “Actions” and select “Create Method”.
  • Choose “POST” from the dropdown and click the checkmark.
  • In the “Integration type” , select “Lambda Function”.
  • Check the box for “Lambda Proxy integration”.
  • In the “Lambda Function” field, select the name of your Lambda function as per LAMBDA_FUNCTION_NAMEin the .env file
  • Click “Create Method”.
  • You will be prompted to grant API Gateway permission to invoke your Lambda function. Click “OK”.

6. Deploy the API:

  • Click on “Deploy API”.
  • Create a new deployment stage (choose a name, dev).
  • Click “Deploy”.

7. Get the Endpoint URL:

  • After deployment, you will be provided with an Invoke URL (https://<api-id>.execute-api.<region>.amazonaws.com/dev/query), that you shall add in the .env file:
API_ENDPOINT=https://<api-id>.execute-api.<region>.amazonaws.com/dev/query

Step 8: Building the Streamlit App

File name: streamlit_app.py

Finally, let’s build a Streamlit app to interact with our Lambda function and Qdrant collection. The Streamlit app provides a user-friendly interface to enter queries and display the responses from the Lambda function. The previously created API_ENDPOINT is integrated in the file.

def call_lambda(query, collection_name):

headers = {
"Content-Type": "application/json"
}
data = {
"query": query,
"collection_name": collection_name
}
try:
response = requests.post(API_ENDPOINT, headers=headers, data=json.dumps(data))
response.raise_for_status() # Raise an HTTPError for bad responses (4xx and 5xx)
print(f"Response status code: {response.status_code}") # Debugging
print(f"Response content: {response.content}") # Debugging
if response.content:
return response.json() # Attempt to parse JSON response
else:
return {"error": "Empty response"}
except requests.exceptions.HTTPError as http_err:
st.error(f"HTTP error occurred: {http_err}")
st.error(f"Response content: {response.content.decode()}")
except requests.exceptions.RequestException as req_err:
st.error(f"Error occurred: {req_err}")
except json.JSONDecodeError as json_err:
st.error(f"JSON decode error: {json_err}")
st.error(f"Response content: {response.content.decode()}")

# Button to submit the query
if st.button("Submit"):
if query:
with st.spinner('Calling Lambda function...'):
response = call_lambda(query, collection_name)
if response:
st.write("Response:")
st.write(response['response'])
else:
st.error("No response received from the Lambda function.")
else:
st.error("Please enter a query.")

To run the Streamlit app, use the following command in the terminal:

streamlit run streamlit_app.py

Now you can interact with the app and ask questions about the paper!

Author: Benito Martin

Don’t forget to delete the endpoint as this will incur unnecessary costs.

Conclusion

In this post, we walked through building a serverless application using AWS Lambda, AWS API Gateway and Qdrant for semantic search. We created a Python script to download and process academic papers, a Lambda function to handle search queries, and a Streamlit app for user interaction. This architecture allows for scalable and efficient semantic search over large document collections.

If you enjoyed reading this content you can support it by:

  • Clapping and following me on Medium! 👏 👏 👏
  • Follow my Github 🎶 🎷 🎶
  • Staring the repo ⭐⭐⭐
  • Share my content on LinkedIn! 💯💯💯

Happy coding!

--

--

Benito Martin

😎 Passionate Data Scientist, AI & ML Engineer | 👉 Founder Martin Data Solutions https://martindatasol.com/