systemref

DAG-Based Task Execution

H. Maqsood Jun 8, 2026 2 min read dag schedulingn pipelines distributed-systems task-execution
A directed acyclic graph defines task dependencies explicitly. Topological sort determines execution order; tasks without shared dependencies run in parallel automatically.

A directed acyclic graph (DAG) encodes what must complete before something else can start. Nodes are tasks. Edges are dependencies. The absence of cycles is the guarantee that execution always terminates — you can never have a task waiting on itself through a chain of dependencies.

Build systems, data pipelines, and CI workflows all use this model.

The structure

A ──┐
    ├──▶ C ──┐
B ──┘        ├──▶ E
B ──▶ D ─────┘

Task C requires A and B. Task E requires C and D. Task D requires only B. A and B have no dependencies — they can run immediately, in parallel.

In code, represent the graph as a map from each node to its dependencies:

graph = {
  'A': [],
  'B': [],
  'C': ['A', 'B'],
  'D': ['B'],
  'E': ['C', 'D'],
}

Topological sort

To execute the DAG, derive a valid linear ordering where every node appears after its dependencies. Kahn's algorithm does this iteratively: start with nodes that have no dependencies, process them, decrement the in-degree of their dependents, and enqueue newly unblocked nodes.

from collections import defaultdict, deque

def topo_sort(graph):
  in_degree = {n: 0 for n in graph}
  dependents = defaultdict(list)

  for node, deps in graph.items():
    for d in deps:
      in_degree[node] += 1
      dependents[d].append(node)

  queue = deque(n for n, deg in in_degree.items() if deg == 0)
  order = []

  while queue:
    n = queue.popleft()
    order.append(n)
    for dep in dependents[n]:
      in_degree[dep] -= 1
      if in_degree[dep] == 0:
        queue.append(dep)

  if len(order) != len(graph):
    raise ValueError("Cycle detected")
  return order

A cycle produces an incomplete ordering — the length check surfaces it.


Parallel execution

Nodes with no shared dependency path can execute concurrently. In the example, A and B are independent; C and D can both start as soon as B completes.

import asyncio

async def execute_dag(graph, runners):
  completed = set()

  async def run(node):
    deps = graph.get(node, [])
    await asyncio.gather(*[run(d) for d in deps if d not in completed])
    if node not in completed:
      await runners[node]()
      completed.add(node)

  await asyncio.gather(*[run(n) for n in graph])

asyncio.gather runs independent coroutines concurrently. Tasks block only on their actual dependencies, not on each other.


Cycle detection matters

Inserting a cycle into a DAG produces a deadlock — tasks that each wait on the other never execute. Any DAG runtime must detect cycles at graph construction, not at execution time.

def has_cycle(graph):
  return len(topo_sort(graph)) != len(graph)

Validate before running, not during.


When to use it

Use DAG-based execution when:

  • Tasks have explicit, non-trivial dependencies that vary between runs
  • Parallelism must be maximized without manual coordination
  • The execution graph is data-driven (generated at runtime, not hardcoded)

This model underlies tools like Make, Apache Airflow, Bazel, and GitHub Actions. Don't use it for event-driven flows or workflows with cycles — model those as state machines or process nets respectively.