처음부터 시작하는 Django 데이터 적재(4)

June
None
Published in
32 min readFeb 25, 2023
출처: https://www.datachannel.co/blogs/what-is-etl-and-how-the-etl-process-works

안녕하세요. 휴먼스케이프 june입니다.

이번시간엔 임상연구의 적재 단계를 나누고, 신규 임상연구를 빠르게 업데이트 하는 방법에 대해 알아보도록 하겠습니다.

프로젝트를 진행한 코드는 깃허브에 올라가 있으니 참고하시기 바랍니다.

임상연구는 다음과 같은 과정으로 적재됩니다.

clinicaltrialgs.gov에서 임상연구 데이터를 가져옴
-> 정의한 model에 맞게 데이터 convert
-> 저장된 임상연구 번역

우리는 control_status_type이라는 컬럼을 추가해 현재 임상연구의 적재 단계를 나타내도록 하고, 각 단계를 중간저장 할 수 있도록 수정할 예정입니다.

class Study(models.Model):
nct_id = models.CharField(verbose_name="임상연구 번호", max_length=50)
control_status_type = models.CharField(max_length=50, verbose_name="임상연구 적재 상태", null=True, blank=True)
original_data = models.TextField(verbose_name="원본 데이터", null=True, blank=True)
results_first_submitted_date = models.DateField(verbose_name="최초 제출 날짜", null=True, blank=True)
last_update_submitted_date = models.DateField(verbose_name="최근 수정 날짜", null=True, blank=True)
start_date = models.DateField(verbose_name="임상연구 시작 날짜", null=True, blank=True)
completion_date = models.DateField(verbose_name="임상연구 종료 날짜", null=True, blank=True)
title = models.TextField(verbose_name="제목", null=True, blank=True)
overall_status = models.CharField(max_length=50, verbose_name="진행 상태", null=True, blank=True)
phase = models.CharField(max_length=50, verbose_name="임상 단계", null=True, blank=True)
enrollment = models.IntegerField(verbose_name="대상자 수", null=True, blank=True)
original_study = models.ForeignKey('self', null=True, blank=True, related_name='translated_studies', verbose_name="원본 임상연구(study) 고유번호", on_delete=models.CASCADE)
locale = models.CharField(max_length=2, verbose_name="언어코드", null=True, blank=True)

class Meta:
unique_together = ('original_study', 'locale',)

control_status_type과 원본 임상연구 데이터를 저장하는 original_data 컬럼을 추가했습니다.

여기서 control_status_type은 정해진 값(예: convert/20, translate/50)이니 choices로 정해줍니다.

class ControlStatusType(models.IntegerChoices):
CONVERT_READY = 20, 'CONVERT_READY'
TRANSLATE_READY = 50, 'TRANSLATE_READY'
COMPLETED = 100, 'COMPLETED'
control_status_type = models.CharField(max_length=50, verbose_name="임상연구 적재 상태", null=True, blank=True, choices=ControlStatusType.choices)

이제 적재 함수에서 중간저장 코드를 추가해줍니다.


def save_all_studies():
"""
clinicaltrials.gov 에서 제공하는 API 에서 전체 임상 연구 목록을 저장하는 메소드
"""
studies_num = get_studies_num()
loaded_studies_num = int(ConfigurationVariable.objects.get_or_create(name='loaded_studies_num', defaults={'value': 1})[0].value)
with tqdm(total=studies_num, initial=loaded_studies_num) as progress_bar:
for start in range(loaded_studies_num, studies_num, 100):
end = start + 99
studies = get_studies(start, end)
for original_data in studies:
# save
with transaction.atomic():
study = Study(original_data=original_data, control_status_type=ControlStatusType.CONVERT_READY)
study.save()
try:
# convert
with transaction.atomic():
study_serializer = StudySerializer(data=convert_study(original_data), instance=study)
study_serializer.is_valid(raise_exception=True)
study_serializer.save()

# translate
with transaction.atomic():
translated_study_serializer = StudySerializer(data=translate_study(study_serializer.instance))
translated_study_serializer.is_valid(raise_exception=True)
translated_study_serializer.save()
study.control_status_type = ControlStatusType.COMPLETED
study.save()
except ValidationError as e:
if e.detail.get('nct_id', None) is not None and e.detail['nct_id'][0].code == 'unique':
continue
raise e
finally:
progress_bar.update(1)
ConfigurationVariable.objects.filter(name='loaded_studies_num').update(value=progress_bar.n)

ConfigurationVariable.objects.filter(name='loaded_studies_num').update(value=1)

잘 동작함을 확인했으면 이제 각 단계별 수행 함수, 명령어를 만들어 줍니다.


def save_study_original_datas():
"""
clinicaltrials.gov 에서 제공하는 API 에서 임상 연구 데이터를 저장하는 메소드
"""
studies_num = get_studies_num()
loaded_studies_num = int(ConfigurationVariable.objects.get_or_create(name='loaded_studies_num', defaults={'value': 1})[0].value)
with tqdm(total=studies_num, initial=loaded_studies_num) as progress_bar:
for start in range(loaded_studies_num, studies_num, 100):
end = start + 99
studies = get_studies(start, end)
for original_data in studies:
study = Study(original_data=original_data, control_status_type=ControlStatusType.CONVERT_READY)
study.save()
progress_bar.update(1)
ConfigurationVariable.objects.filter(name='loaded_studies_num').update(value=progress_bar.n)
ConfigurationVariable.objects.filter(name='loaded_studies_num').update(value=1)

def convert_studies():
"""
저장된 original_data를 이용하여 임상 연구 데이터를 저장하는 메소드
"""
studies_count = Study.objects.filter(control_status_type=ControlStatusType.CONVERT_READY).count()
with tqdm(total=studies_count) as progress_bar:
for study in Study.objects.filter(control_status_type=ControlStatusType.CONVERT_READY)[:100]:
with transaction.atomic():
try:
study_serializer = StudySerializer(data=convert_study(study.original_data), instance=study)
study_serializer.is_valid(raise_exception=True)
study_serializer.save()
except:
traceback.print_exc()
finally:
progress_bar.update(1)

def translate_studies():
"""
저장된 임상 연구 데이터를 번역하는 메소드
"""
studies_count = Study.objects.filter(control_status_type=ControlStatusType.TRANSLATE_READY).count()
with tqdm(total=studies_count) as progress_bar:
for study in Study.objects.filter(control_status_type=ControlStatusType.TRANSLATE_READY)[:100]:
with transaction.atomic():
try:
translated_study_serializer = StudySerializer(data=translate_study(study))
translated_study_serializer.is_valid(raise_exception=True)
translated_study_serializer.save()
study.control_status_type = ControlStatusType.COMPLETED
study.save()
except:
traceback.print_exc()
finally:
progress_bar.update(1)
from enum import Enum
from django.core.management.base import BaseCommand
from studies.batch_tasks import save_all_studies, convert_studies, translate_studies, save_study_original_datas

class CommandAction(Enum):
SAVE_ALL_STUDIES = "save_all_studies"
SAVE_ORIGINAL_DATA = "save_original_data"
CONVERT = "convert"
TRANSLATE = "translate"


class Command(BaseCommand):
help = 'clinicaltrials.gov의 전체 임상연구 적재'

actions = {
CommandAction.SAVE_ALL_STUDIES: save_all_studies,
CommandAction.SAVE_ORIGINAL_DATA: save_study_original_datas,
CommandAction.CONVERT: convert_studies,
CommandAction.TRANSLATE: translate_studies,
}

def add_arguments(self, parser):
parser.add_argument(
"--save-all-studies",
action="store_const",
dest="action",
const=CommandAction.SAVE_ALL_STUDIES,
help="save all studies",
)
parser.add_argument(
"--save-original-data",
action="store_const",
dest="action",
const=CommandAction.SAVE_ORIGINAL_DATA,
help="save original data",
)
parser.add_argument(
"--convert",
action="store_const",
dest="action",
const=CommandAction.CONVERT,
help="convert original data",
)
parser.add_argument(
"--translate",
action="store_const",
dest="action",
const=CommandAction.TRANSLATE,
help="translate study",
)

def handle(self, *args, **options):
self.actions[options["action"]]()

python3 manage.py save_studies --convert 를 실행하면 convert가 잘 동작하는걸 확인할 수 있습니다.

지금까지의 코드를 이용해 신나게 임상연구를 적재하고 약 한달 뒤, 신규 임상연구만을 다시 적재해야 하는 상황이 생겼습니다.

단순히 구현하자면 get_studies의 start 값을 studies.objects.count()+1 부터 시작하도록 할 수도 있습니다.

하지만 우리는 clinicaltrials.gov가 중간 id값에 임상연구를 끼워넣지 하지 않고 차례차례 추가한다는 보장이 없기 때문에 전체 임상연구를 검색 해 nct_id 기준으로 존재하지 않는 임상연구만 적재하려 합니다.

위의 요구사항을 만족하는 함수 save_new_study_original_datas, save_all_new_studies를 만들어보도록 합시다.

먼저, nct_id기준으로 임상연구의 존재 유무를 판단하기 때문에 original_data에서 nct_id를 추출하는 함수를 만들어줍니다.

def get_nct_id(study):
return study['Study']['ProtocolSection']['IdentificationModule']['NCTId']

이제 original_data를 가져와 study를 만들기 전 nct_id로 검사하는 코드를 추가합니다.

def save_all_new_studies():
"""
clinicaltrials.gov 에서 제공하는 API 에서 전체 임상 연구 중 신규 임상을 저장하는 메소드
"""
studies_num = get_studies_num()
loaded_new_studies_num = int(ConfigurationVariable.objects.get_or_create(name='loaded_new_studies_num', defaults={'value': 1})[0].value)
with tqdm(total=studies_num, initial=loaded_new_studies_num) as progress_bar:
for start in range(1, studies_num, 100):
end = start + 99
studies = get_studies(start, end)
for original_data in studies:
# save
with transaction.atomic():
if Study.objects.filter(nct_id=get_nct_id(original_data)).exists():
progress_bar.update(1)
ConfigurationVariable.objects.filter(name='loaded_new_studies_num').update(value=progress_bar.n)
continue
study = Study(original_data=original_data, control_status_type=ControlStatusType.CONVERT_READY)
study.save()
try:
# convert
with transaction.atomic():
study_serializer = StudySerializer(data=convert_study(original_data), instance=study)
study_serializer.is_valid(raise_exception=True)
study_serializer.save()

# translate
with transaction.atomic():
translated_study_serializer = StudySerializer(data=translate_study(study_serializer.instance))
translated_study_serializer.is_valid(raise_exception=True)
translated_study_serializer.save()
study.control_status_type = ControlStatusType.COMPLETED
study.save()
except:
traceback.print_exc()
finally:
progress_bar.update(1)
ConfigurationVariable.objects.filter(name='loaded_new_studies_num').update(value=progress_bar.n)

ConfigurationVariable.objects.filter(name='loaded_new_studies_num').update(value=1)
def save_new_study_original_datas():
"""
clinicaltrials.gov 에서 제공하는 API 에서 신규 임상 연구 데이터를 저장하는 메소드
"""
studies_num = get_studies_num()
loaded_new_studies_num = int(ConfigurationVariable.objects.get_or_create(name='loaded_new_studies_num', defaults={'value': 1})[0].value)
with tqdm(total=studies_num, initial=loaded_new_studies_num) as progress_bar:
for start in range(1, studies_num, 100):
end = start + 99
studies = get_studies(start, end)
for original_data in studies:
if Study.objects.filter(nct_id=get_nct_id(original_data)).exists():
progress_bar.update(1)
ConfigurationVariable.objects.filter(name='loaded_new_studies_num').update(value=progress_bar.n)
continue
study = Study(original_data=original_data, control_status_type=ControlStatusType.CONVERT_READY)
study.save()
progress_bar.update(1)
ConfigurationVariable.objects.filter(name='loaded_new_studies_num').update(value=progress_bar.n)

ConfigurationVariable.objects.filter(name='loaded_new_studies_num').update(value=1)

신규 임상연구 적재 함수는 완성했으니 이제 save_all_studies 함수를 신규일 경우 생성, 기존 임상연구일 경우 업데이트 하도록 만들어봅시다.

사실 임상연구의 업데이트는 생각보다 어려운 작업이 될 수 있습니다.

단순 업데이트는 간단할 수 있지만, 실시간으로 적재한 데이터를 보여줘야 하는 서비스에서의 업데이트는 작업 단위를 나누지 않고 바로 적재 완료된 데이터를 덮어씌우거나, 적재 완료된 데이터를 복제해 각 적재 단계를 수행한 뒤 기존 적재된 데이터를 업데이트 하는 식으로 진행해야합니다.

또, 임상연구의 id를 저장해두고 사용하는 다른 서비스가 있을 경우, id값이 변경되면 안되기 때문에 주의가 필요합니다.

전자의 경우 굳이 설명하지 않아도 지금까지의 내용을 응용해 충분히 만들 수 있기 때문에 후자의 방식을 진행하려 합니다.

업데이트 단계는 다음과 같이 진행됩니다.

clinicaltrials.gov에서 original_data를 가져와 ct_id 비교 -> 신규적재 or 업데이트
-> 업데이트의 경우, 적재된 데이터와 original_data(hash)비교, 같을 경우 업데이트 하지 않음
-> original_data가 다를 경우 기존 임상연구 복제
-> 복제한 임상연구의 control_status_type을 convert_ready로 변경, oirignal_data 업데이트
-> 복제한 임상연구에서 각 적재 단계 진행
-> 모든 적재가 완료되었을 경우 기존 임상연구 삭제, 업데이트된 임상연구로 대체clinicaltrials.gov에서 original_data를 가져와 ct_id 비교 -> 신규적재 or 업데이트

하나하나 차례대로 구현해보도록 합시다.

clinicaltrials.gov에서 original_data를 가져와 ct_id 비교 -> 신규적재 or 업데이트

이는 신규 임상연구 적재 코드와 반대로, exsits일 경우 업데이트 하도록 구현하면 됩니다.

  if Study.objects.filter(nct_id=get_nct_id(original_data)).exists():
study = Study.objects.get(nct_id=get_nct_id(original_data))
study.original_data = original_data
study.save()

업데이트의 경우, 적재된 데이터와 original_data(hash)비교, 같을 경우 업데이트 하지 않음

매 번 두개의 original_data를 비교하는건 효율적이지 못하니, 처음 original_data를 저장할 때 hash를 계산해 저장해두고 비교해두는 방식으로 구현하겠습니다.

hash 방식은 여러가지가 있는데, 보안이 크게 중요하지 않으니 간단한 sha256 방식으로 저장해 비교하도록 하겠습니다.

model에 original_data_hash 필드를 추가해주도록 합시다.

original_data_hash = models.CharField(max_length=64, verbose_name="original_data sha256 hash", null=True, blank=True)

이제 batch_tasks의 각 함수에서 임상연구 적재 시 hash값을 저장하도록 함수를 추가해 사용해줍니다.

def get_original_data_hash(original_data):
return hashlib.sha256(json.dumps(original_data).encode('utf-8')).hexdigest()

original_data가 다를 경우 기존 임상연구 복제

살짝 어려울 수 있는 문제이며, 잘못 구현할 경우 무한의 오버 엔지니어링의 굴레에 빠질 수 있는 문제입니다.

이번 시간에는 study model에서 clone함수를 구현해 사용하도록 합니다.

(humanscape github public repo 중, clone하려는 model의 관계를 모두 탐색하며 clone하는 패키지를 사용해도 됩니다. 해당 패키지에 대한 이슈제보/피드백을 기다리고있습니다.)

먼저 복제된 study에서 원본 study를 가리키기 위한 foreign key field를 생성합니다.

(네이밍 센스가 부족해 기존 original_study의 이름까지 변경했습니다.)

하나의 임상연구에 대한 복제본이 여러개인 경우(동시에 여러 업데이트가 진행되는 경우)는 피해야 하기 때문에 unique constraint를 걸어줍니다.

    translate_from_study = models.ForeignKey('self', null=True, blank=True, related_name='translated_studies', verbose_name="번역 원본 임상연구(study) 고유번호", on_delete=models.CASCADE)
clone_from_study = models.OneToOneField('self', null=True, blank=True, related_name='cloned_study', verbose_name="복제 원본 임상연구(study) 고유번호", on_delete=models.CASCADE)
...
class Meta:
unique_together = ('translate_from_study', 'locale')

이를 Intervention, Eligibility에도 똑같이 적용해줍니다.

(Condition은 ManyToMany관계이기 때문에 clone기능이 필요하지 않습니다.)

이제 study model의 clone 메서드를 구현합니다.

# study model clone
@transaction.atomic
def clone(self):
cloned_study = self._clone_study()
for translated_study in self.translated_studies.all():
translated_study._clone_study()
return cloned_study

def _clone_study(self):
cloned_study = deepcopy(self)
cloned_study.pk = None
cloned_study.clone_from_study = self
if self.translate_from_study is None:
cloned_study.translate_from_study = None
else:
cloned_study.translate_from_study = self.translate_from_study.cloned_study
cloned_study.save()

for intervention in self.interventions.all():
intervention.clone(cloned_study)

for condition in self.conditions.all():
cloned_study.conditions.add(condition)

for eligibility in self.eligibilities.all():
eligibility.clone(cloned_study)
return cloned_study

# intervention model clone
def clone(self, study):
cloned_intervention = deepcopy(self)
cloned_intervention.clone_from_intervention = self
cloned_intervention.pk = None
cloned_intervention.study = study
if self.translate_from_intervention is not None:
cloned_intervention.translate_from_intervention = self.translate_from_intervention.cloned_intervention
cloned_intervention.save()
return cloned_intervention

# eligibility model clone
def clone(self, study):
cloned_eligibility = deepcopy(self)
cloned_eligibility.pk = None
cloned_eligibility.study = study
cloned_eligibility.clone_from_eligibility = self
if self.translate_from_eligibility is not None:
cloned_eligibility.translate_from_eligibility = self.translate_from_eligibility.clone_from_eligibility
cloned_eligibility.save()
return cloned_eligibility

구현한 clone 메서드를 활용해 임상연구 업데이트를 구현해줍시다.

이 때 이미 업데이트 중인 임상연구일 경우 clone하지 않도록 확인하는 코드를 추가해야합니다.

(unique constraint로 막아두었지만 코드 작성시에도 모든 경우의 수에 신경쓰는 습관을 들이면 여러 대참사를 막을 수 있습니다.)


def update_study_original_data():
"""
clinicaltrials.gov 에서 제공하는 API 에서 임상 연구 데이터를 업데이트 하는 메소드
"""
studies_num = get_studies_num()
updated_studies_num = int(ConfigurationVariable.objects.get_or_create(name='updated_studies_num', defaults={'value': 1})[0].value)
with tqdm(total=studies_num, initial=updated_studies_num) as progress_bar:
for start in range(1, studies_num, 100):
end = start + 99
studies = get_studies(start, end)
for original_data in studies:
study = Study.objects.filter(nct_id=get_nct_id(original_data)).first()
original_data_hash = get_original_data_hash(original_data)
if study is not None and study.original_data_hash != original_data_hash and not Study.objects.filter(clone_from_study=study).exists():
study = Study.objects.get(nct_id=get_nct_id(original_data), translate_from_study__isnull=True).clone()
study.original_data = original_data
study.original_data_hash = original_data_hash
study.control_status_type = ControlStatusType.CONVERT_READY
study.save()
progress_bar.update(1)
ConfigurationVariable.objects.filter(name='updated_studies_num').update(value=progress_bar.n)

ConfigurationVariable.objects.filter(name='updated_studies_num').update(value=1)

구현한 임상연구 업데이트 코드를 save_all_studies에 적용하면 신규 임상연구일 경우 신규 적재, 적재된 임상연구일 경우 업데이트 하는 함수를 구현할 수 있습니다.

(적재가 완료된 후 기존 적재된 데이터를 업데이트 하는 코드는 model instance save 메서드에 hook을 넣어 구현했습니다.)

    def save(self, *args, **kwargs) -> None:
if self.control_status_type == ControlStatusType.COMPLETED and self.clone_from_study is not None:
clone_from_study_id = self.clone_from_study_id
self.translated_studies.all().update(clone_from_study=None)
self.clone_from_study = None
self.save()
Study.objects.filter(id=clone_from_study_id).delete()
return
return super().save(*args, **kwargs)

이 외의 자잘한 수정사항들은 github repository에서 확인해주시기 바랍니다.

여기까지만해도 큰 무리없이 여러 데이터를 적재할 수 있습니다.

다음 시간엔 유료 번역 api를 사용한다면?! 편을 연재할 예정입니다.

업데이트 된 부분을 인식해 해당 부분만 업데이트 하며 convert, translate의 비용을 줄이는 방법에 대해 알아보도록 하겠습니다.

--

--