Understanding clean architecture in Python — Deep dive on the code

devil_!234@
8 min readMar 18, 2023

--

Clean Architecture

In this article, I will take you through a working example of clean architecture in Python. The underlying concepts and design are language agnostic and can be applied with any language.

Topic — We will build a sample use case of creating a Customer. The Customer has the following attributes :-

  1. First Name
  2. Last Name
  3. Email
  4. Phone

The core of Clean architecture design is made up of the following :-

  1. Domain layer
  2. Use Case layer
  3. Repository layer

Let’s start with the Domain layer. In the above example, our Customer is a domain and we need to create a domain entity for the customer.

Domain — We will use Pydantic to build our domain entity. Using Pydantic is purely a preference and it can be replaced with a dataclass or a simple Python class

Packages required

pytest==7.2.1
pytest-cov==4.0.0
pytest-django==4.5.2
pytest-factoryboy==2.5.1
pydantic==1.10.5
djangorestframework==3.14.0
django==4.1.7
email-validator==1.3.1

customer/domain/entities.py

import re
import uuid

from pydantic import BaseModel, EmailStr, validator

from domain.exceptions import InvalidPhoneNumberException


class Customer(BaseModel):
id: uuid.UUID = Field(default_factory=lambda: uuid.uuid4())
first_name: str
last_name: str
email: EmailStr
phone: str

@validator("phone")
def phone_validation(cls, v):
# For reference - https://stackoverflow.com/questions/70414211/pydantic-custom-data-type-for-phone-number
# -value-error-missing
regex = r"^(\+)[1-9][0-9\-\(\)\.]{9,15}$"
if v and not re.search(regex, v, re.I):
raise InvalidPhoneNumberException("Invalid Phone Number.")
return v

customer/domain/exceptions.py

class InvalidPhoneNumberException(Exception):
...

So, we have our domain entity for Customer set up. We can write a quick test to check if its working as expected.

customer/tests/domain/test_entities.py

import pytest
from pydantic import ValidationError

from customer.domain.entities import Customer
from customer.domain.exceptions import InvalidPhoneNumberException

def test_customer_create():
customer = Customer(
first_name="test",
last_name="user",
email="xyz@example.com",
phone="+25499919191919"
)

assert customer
assert customer.first_name == "test"
assert customer.last_name == "user"
assert customer.email == "xyz@example.com"
assert customer.phone == "+25499919191919"


def test_customer_create_throws_invalid_email_exception():

with pytest.raises(ValidationError) as exc:
Customer(
first_name="test",
last_name="user",
email="xyz@exampl",
phone="+25499919191919"
)
assert exc.value.errors()[0]["loc"][0] == "email"
assert exc.value.errors()[0]["msg"] == "value is not a valid email address"


def test_customer_create_throws_invalid_phone_exception():
with pytest.raises(InvalidPhoneNumberException) as exc:
Customer(
first_name="test",
last_name="user",
email="xyz@exampl",
phone="+25499919"
)
assert isinstance(exc.value, InvalidPhoneNumberException)

Good. We are done with our first step. We have an entity set up and we have also added our tests.

Repository — This is what is going to interact with your infrastructure layer. Remember the repository structure should be infrastructure agnostic. Let me explain with a bit of code.

customer/repo/base.py


class AbstractCustomerRepository(ABC):
@abstractmethod
def insert(self, customer: CustomerEntity) -> Optional[CustomerEntity]:
...

@abstractmethod
def update(self, customer: CustomerEntity) -> CustomerEntity:
...

@abstractmethod
def get_by_id(self, customer_id) -> Optional[CustomerEntity]:
...

@abstractmethod
def delete(self, customer_id):
...

@abstractmethod
def list(self) -> Optional[Sequence[CustomerEntity]]:
...

customer/repo/customer.py

class CustomerRepository(AbstractCustomerRepository):

def insert(self, obj: CustomerEntity) -> CustomerEntity:
pass

def update(self, obj: CustomerEntity) -> CustomerEntity:
pass

def get_by_id(self, customer_id) -> Optional[CustomerEntity]:
pass

def delete(self, customer_id) -> None:
pass

def list(self) -> Optional[Sequence[CustomerEntity]]:
pass

So, we have a base repository interface where all the basic operations are defined. Then we define a concrete class which implements the abstract repo interface. In our case, we are going to save our customer record in our Django model. Let’s create a customer model.

customer/models.py

class TimeStampedModel(models.Model):
id = models.UUIDField(max_length=34, primary_key=True)
class Meta:
abstract = True

class Customer(TimeStampedModel):
first_name = models.CharField(max_length=120)
last_name = models.CharField(max_length=120)
email = models.EmailField(unique=True)
phone = models.CharField(max_length=20, unique=True)

def __str__(self):
return f"{self.first_name} {self.last_name}"

Let’s write our implementation for the repository

We will implement the insert operation for the customer

customer/repo/customer.py

class CustomerRepository(AbstractCustomerRepository):

def insert(self, obj: CustomerEntity) -> CustomerEntity:
customer = Customer.objects.create(**obj.dict())
return CustomerEntity(**customer.__dict__)

Now, lets implement the get_by_id operation for the customer

class CustomerRepository(AbstractCustomerRepository):

def insert(self, obj: CustomerEntity) -> CustomerEntity:
customer = Customer.objects.create(**obj.dict())
return CustomerEntity(**customer.__dict__)

def get_by_id(self, customer_id) -> CustomerEntity:
if customer := Customer.objects.filter(id=customer_id).first():
return CustomerEntity(**customer.__dict__)
else:
return None

So, we have the insert and get_by_id operation implemented. Let’s write a test for the methods

Let’s create a factory class for the customer

customer/tests/factories.py

class CustomerFactory(factory.django.DjangoModelFactory):
id = factory.LazyAttribute(lambda s: uuid.uuid4())
first_name = factory.Faker("pystr")
last_name = factory.Faker("pystr")
email = factory.Faker("email")
phone = "+9551370038"

class Meta:
model = Customer

Let’s register this factory in customer/tests/conftest.py

from factories import CustomerFactory
register(CustomerFactory, name="customer_factory")

So, we have added a factory class and registered the factory class in conftest.py

Let’s create a fixture for customer_repo because we will use it in our tests.

customer/tests/conftest.py

from customer.repo.customer import CustomerRepository
@pytest.fixture
def customer_repo():
return CustomerRepository()

Now, our boilerplate code is ready for us to create the tests for the repo.

customer/tests/repo/test_customer_repo.py

import pytest

from customer.domain.entities import CustomerEntity

pytestmark = pytest.mark.django_db


def test_customer_repository_insert(customer_factory, customer_repo):
customer_data = customer_factory.build()
customer_entity = CustomerEntity(**customer_data.__dict__)
customer_repo.insert(customer_entity)
customer_obj = customer_repo.get_by_id(customer_entity.id)
assert customer_obj.id == customer_entity.id

def test_customer_repository_get_by_id(customer_factory, customer_repo):
customer_data = customer_factory()
customer = customer_repo.get_by_id(customer_data.id)
assert customer
assert customer.id == customer_data.id
assert customer.email == customer_data.email

So, we have added the tests for insert and get_by_id. Next, we will implement the update

class CustomerRepository(AbstractCustomerRepository):

def insert(self, obj: CustomerEntity) -> CustomerEntity:
customer = Customer.objects.create(**obj.dict())
return CustomerEntity(**customer.__dict__)

def update(self, obj: CustomerEntity):
Customer.objects.filter(id=obj.id).update(**obj.dict())
customer = Customer.objects.get(id=obj.id)
return CustomerEntity(**customer.__dict__)

def get_by_id(self, customer_id) -> Optional[CustomerEntity]:
if customer := Customer.objects.filter(id=customer_id).first():
return CustomerEntity(**customer.__dict__)
else:
return None

Now, let’s write a test for the update

def test_customer_repository_update(customer_factory, customer_repo):
customer_data = customer_factory()
assert customer_data.first_name != "Test_1"
assert customer_data.last_name != "User_1"
assert customer_data.email != "abc@example.com"

customer_entity = CustomerEntity(**customer_data.__dict__)
customer_entity.first_name = "Test_1"
customer_entity.last_name = "User_1"
customer_entity.email = "abc@example.com"

customer = customer_repo.update(customer_entity)
assert customer
assert customer.id == customer_entity.id
assert customer.first_name == "Test_1"
assert customer_entity.last_name == "User_1"
assert customer_entity.email == "abc@example.com"

The last two implementation that we have is to get all customers and delete a customer. Let’s dive in

class CustomerRepository(AbstractCustomerRepository):

def insert(self, obj: CustomerEntity) -> CustomerEntity:
customer = Customer.objects.create(**obj.dict())
return CustomerEntity(**customer.__dict__)

def update(self, obj: CustomerEntity) -> CustomerEntity:
Customer.objects.filter(id=obj.id).update(**obj.dict())
customer = Customer.objects.get(id=obj.id)
return CustomerEntity(**customer.__dict__)

def get_by_id(self, customer_id) -> Optional[CustomerEntity]:
if customer := Customer.objects.filter(id=customer_id).first():
return CustomerEntity(**customer.__dict__)
else:
return None

def delete(self, customer_id) -> None:
Customer.objects.get(id=customer_id).delete()

def list(self) -> Optional[Sequence[CustomerEntity]]:
customers = Customer.objects.all()
return [CustomerEntity(**customer.__dict__) for customer in customers]

Now, let’s add a tests for the delete and list methods.

def test_customer_repository_delete(customer_factory, customer_repo):
customer_data = customer_factory()
customer_repo.delete(customer_data.id)
customer = customer_repo.get_by_id(customer_data.id)
assert not customer


def test_customer_repository_list(customer_factory, customer_repo):
customer_factory(phone="+9551370037")
customer_factory(phone="+9551370038")
customer_factory(phone="+9551370039")

customers = customer_repo.list()
assert customers
assert len(customers) == 3

Final look at the repo and tests.

customer/repo/customer.py

class CustomerRepository(AbstractCustomerRepository):

def insert(self, obj: CustomerEntity) -> CustomerEntity:
customer = Customer.objects.create(**obj.dict())
return CustomerEntity(**customer.__dict__)

def update(self, obj: CustomerEntity):
Customer.objects.filter(id=obj.id).update(**obj.dict())
customer = Customer.objects.get(id=obj.id)
return CustomerEntity(**customer.__dict__)

def get_by_id(self, customer_id) -> Optional[CustomerEntity]:
if customer := Customer.objects.filter(id=customer_id).first():
return CustomerEntity(**customer.__dict__)
else:
return None

def delete(self, customer_id) -> None:
Customer.objects.get(id=customer_id).delete()

def list(self) -> Optional[Sequence[CustomerEntity]]:

customers = Customer.objects.all()
return [CustomerEntity(**customer.__dict__) for customer in customers]

customer/tests/repo/test_customer_repo.py

def test_customer_repository_insert(customer_factory, customer_repo):
customer_data = customer_factory.build()
customer_entity = CustomerEntity(**customer_data.__dict__)
customer_repo.insert(customer_entity)
customer_obj = customer_repo.get_by_id(customer_entity.id)
assert customer_obj
assert customer_obj.id == customer_entity.id


def test_customer_repository_get_by_id(customer_factory, customer_repo):
customer_data = customer_factory()
customer = customer_repo.get_by_id(customer_data.id)
assert customer
assert customer.id == customer_data.id
assert customer.email == customer_data.email


def test_customer_repository_update(customer_factory, customer_repo):
customer_data = customer_factory()
assert customer_data.first_name != "Test_1"
assert customer_data.last_name != "User_1"
assert customer_data.email != "abc@example.com"

customer_entity = CustomerEntity(**customer_data.__dict__)
customer_entity.first_name = "Test_1"
customer_entity.last_name = "User_1"
customer_entity.email = "abc@example.com"

customer = customer_repo.update(customer_entity)
assert customer
assert customer.id == customer_entity.id
assert customer.first_name == "Test_1"
assert customer_entity.last_name == "User_1"
assert customer_entity.email == "abc@example.com"


def test_customer_repository_delete(customer_factory, customer_repo):
customer_data = customer_factory()
customer_repo.delete(customer_data.id)
customer = customer_repo.get_by_id(customer_data.id)
assert not customer


def test_customer_repository_list(customer_factory, customer_repo):
customer_factory(phone="+9551370037")
customer_factory(phone="+9551370038")
customer_factory(phone="+9551370039")

customers = customer_repo.list()
assert customers
assert len(customers) == 3

So far, we have covered two of the key components of clean architecture

  1. Domain.
  2. Repository.

Now, we need to implement the business logic which will be in the use cases. Use case is also similar to services module.

customer/use_cases/base.py

from abc import ABC, abstractmethod

class AbstractCustomerUseCase(ABC):
@abstractmethod
def get_by_id(self, customer_id):
...

@abstractmethod
def insert(self, customer: CustomerEntity):
...

@abstractmethod
def update(self, customer: CustomerEntity):
...

@abstractmethod
def delete(self, customer_id):
...

@abstractmethod
def list(self):
...

Now, let’s create the concrete implementation for this use case for the customer ops.

customer/user_cases/customer.py

class CustomerUseCase(AbstractCustomerUseCase):
def __init__(self, customer_repo):
self.repo = customer_repo

def get_by_id(self, customer_id):
return self.repo.get_by_id(customer_id)

def insert(self, customer: CustomerEntity):
return self.repo.insert(customer)

def update(self, customer: CustomerEntity):
return self.repo.update(customer)

def delete(self, customer_id):
self.repo.delete(customer_id)

def list(self):
return self.repo.list()

Great, let’s add few tests for our use case

customer/tests/use_cases/test_customer.py

pytestmark = pytest.mark.django_db


def test_create_customer_use_case(customer_factory, customer_repo):
customer_use_case = CustomerUseCase(customer_repo=customer_repo)
customer_data = customer_factory.build()
customer_entity = CustomerEntity(**customer_data.__dict__)
customer = customer_use_case.insert(customer_entity)
assert customer
assert customer.first_name == customer_data.first_name
assert customer.email == customer_data.email
assert customer.phone == customer_data.phone


def test_update_customer_use_case(customer_factory, customer_repo):
customer_use_case = CustomerUseCase(customer_repo=customer_repo)
customer_data = customer_factory()
customer_entity = CustomerEntity(**customer_data.__dict__)
customer_entity.first_name = "Test_1"
customer_entity.last_name = "User_1"
customer_entity.email = "abc@example.com"

customer = customer_use_case.update(customer_entity)
assert customer
assert customer.first_name == "Test_1"
assert customer.last_name == "User_1"
assert customer.email == "abc@example.com"


def test_get_customer_use_case(customer_factory, customer_repo):
customer_use_case = CustomerUseCase(customer_repo=customer_repo)
customer_data = customer_factory()
customer = customer_use_case.get_by_id(customer_data.id)
assert customer
assert customer.first_name == customer_data.first_name
assert customer.last_name == customer_data.last_name
assert customer.email == customer_data.email


def test_list_customer_use_case(customer_factory, customer_repo):
customer_use_case = CustomerUseCase(customer_repo=customer_repo)
customer_factory(phone="+9551370037")
customer_factory(phone="+9551370038")
customer_factory(phone="+9551370039")
customer = customer_use_case.list()
assert customer
assert len(customer) == 3


def test_delete_customer_use_case(customer_factory, customer_repo):
customer_use_case = CustomerUseCase(customer_repo=customer_repo)

customer_data = customer_factory()
customer_use_case.delete(customer_data.id)

customer = customer_use_case.get_by_id(customer_data.id)
assert not customer

Awesome, we have completed all the three layers for Clean Architecture so far. We have added the domain layer which is our CustomerEntity. Then we have added the repository layer which is our CustomerRepository and finally, we have added our UseCase layer which is our CustomerUseCase class.

So, far we have able to build the components in isolation and test them without any coupling. The use case class for customer takes a repo in the __init__ method and that is what the real magic is. This is where we could easily switch the repos which interacts with out storage at run time if need be. The structure of the base repo dictates any other concrete repo class must implement the methods declared which is why this design is so awesome.

The last and final layer is the views or handlers or routes…

I am going to demonstrate the views but it is pretty straightforward.

Let’s add our serializers…

customer/handlers/serializers.py

class CustomerBaseSerializer(serializers.ModelSerializer):
class Meta:
model = Customer
fields = ["first_name", "last_name", "email", "phone", ]


class CustomerCreateSerializer(CustomerBaseSerializer):
...


class CustomerPutOrPatchSerializer(CustomerBaseSerializer):
...


class CustomerDetailSerializer(CustomerBaseSerializer):
...


class CustomerListSerializer(CustomerBaseSerializer):
class Meta:
model = Customer
fields = CustomerBaseSerializer.Meta.fields + ["id", ]

customer/handlers/web.py

class CustomerAPIView(APIView):
def post(self, request):
customer_serializer = CustomerCreateSerializer(data=request.data)
if not customer_serializer.is_valid():
return Response(customer_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
customer_entity = CustomerEntity(**customer_serializer.validated_data)
customer_repo = CustomerRepository()
customer = CustomerUseCase(customer_repo).insert(customer_entity)
return Response(customer.dict(), status=status.HTTP_201_CREATED)

def put(self, request, customer_id):
customer_repo = CustomerRepository()
customer = CustomerUseCase(customer_repo).get_by_id(customer_id)
if not customer:
return Response(
{"message": "Customer with that Id Doesnot exist"},
status=status.HTTP_400_BAD_REQUEST,
)
else:
customer_serializer = CustomerCreateSerializer(data=request.data)
if not customer_serializer.is_valid():
return Response(customer_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
customer_entity = CustomerEntity(**customer_serializer.validated_data)
customer_entity.id = customer.id
customer_repo = CustomerRepository()
customer = CustomerUseCase(customer_repo).update(customer_entity)
return Response(customer.dict(), status=status.HTTP_200_OK)

def get(self, request, customer_id=None):
customer_repo = CustomerRepository()
if not customer_id:
if customers := CustomerUseCase(customer_repo).list():
customers_dict = [customer.__dict__ for customer in customers]
else:
customers_dict = {}
return Response(
{
"message": customers_dict,
},
status=status.HTTP_200_OK,
)
else:
if customer := CustomerUseCase(customer_repo).get_by_id(customer_id):
return Response(customer.dict(), status=status.HTTP_200_OK)
else:
return Response(
{"message": "Customer with that Id DoesNot exist"},
status=status.HTTP_400_BAD_REQUEST,
)

So, what’s happening in the views/handlers/routes ?

  1. The route or view layer is the presentation layer.
  2. It receives a request data.
  3. The data is passed to the serializers.
  4. The serializer then returns a validated_data which is a dictionary.
  5. The dictionary is then mapped to the domain entity.
  6. The Concrete Customer Use case object is instantiated with a concrete repository instance.
  7. Then, we invoke the right method based on the route method. e.g get_by_id, list, insert or delete.
  8. The use case then invokes the repository method and returns the data.
  9. The returned data is then converted to a dict and passed to the DRF Response class with the right status_code.

Key things to note -

  1. The clean structure creates a clear separation of concern.
  2. It separates the infrastructure concerns from the application code.
  3. Decoupled code is easier to maintain and scale.
  4. Testing each component or layer is easy since it is not coupled with other layers.
  5. Dependency injection and IOC is at the heart of the above implementation.

--

--

devil_!234@

Just a tinkerer who is passionate about software engineering. I share my mistakes and let others learn from my mistakes.