Apache airflow의 task group retry 야매구현
문제
https://github.com/apache/airflow/issues/21867
SubDag를 대체하는 개념으로 TaskGroup이 추가되었는데, TaskGroup을 실행하다가 뭔가 실패했을 때, 해당 TaskGroup을 재시도하도록 하는 게 목표다.
아이디어
그냥 TaskGroup을 여러개 놓고, 이전에 성공한 적이 있으면 이후 시도는 skip하도록 branch를 하면 되겠지?
난관
branch task에서 다음에 실행할 task를 판단할 때, 해당 task의 task id는 task의 함수명 외에도 task가 속한 group의 id를 prefix로 갖게 된다. 다만 해당 group의 id는 경우에 따라 일련번호가 추가로 붙기도 하는 등, 내가 강제로 지정할 수 있는 물건이 아니다. 예를 들면:
some_group.first_attempt
next_group.first_attempt
과 같은 식으로 first_attemp에 해당하지만 다른 TaskGroup에 속한 task가 여러 개 존재할 수 있다는 점이다.
해결
핵심은… TaskInstance에서 현재 task의 full task id를 가져다가 뒷부분만 갈아끼우는 방식을 써서 현재 Group 내에서만 유효한 task가 지정될 수 있도록 하라는 초 야매 사기를 치라는 것이다.
from typing import Optional
import datetime
from airflow.decorators import dag, task, task_group
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import TaskInstance
@dag(start_date=datetime.datetime(2021, 1, 1), schedule=None, schedule_interval=None)
def example_branch():
@task
def always_false(ignore):
return False
def make_outer_group(outer_group_id, ignore):
@task_group(group_id=outer_group_id)
def make_group(input):
skip_everything = EmptyOperator(task_id="skip_everything")
first_attempt = EmptyOperator(task_id="first_attempt")
@task.branch
def branch_by_previous(prev_group_res, task_instance: TaskInstance):
print(task_instance.task_id)
print("prev result: ", prev_group_res)
if prev_group_res:
return task_instance.task_id.replace(
".branch_by_previous", ".skip_everything"
)
else:
return task_instance.task_id.replace(
".branch_by_previous", ".first_attempt"
)
join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.ONE_SUCCESS)
@task
def log(val):
print(val)
return val
l1 = log(1)
l2 = log(2)
skip_everything >> l1
first_attempt >> l2
[l1, l2] >> join
l3 = log(3)
join >> l3
branch_by_previous(input) >> [skip_everything, first_attempt]
return l3
return make_group(ignore)
def attempts(name, dep):
token = always_false(dep)
attempt1 = make_outer_group(f"{name}_attempt1", token)
attempt2 = make_outer_group(f"{name}_attempt2", attempt1)
return attempt2
g1 = attempts("g1", None)
g2 = attempts("g2", g1)
return g2
example_branch()
결론

결과
된다. 근데 하지마라.