DeepInterview

ToDo List with Task Dependencies (OOD)

Coding2025/11

Description

Company: Perplexity

The Challenge

You need to build a task management system for AI agents. The system handles tasks that have different states (like READY or BLOCKED) and dependencies (some tasks wait for others).

The interview starts with a simple problem, but it gets harder. Important: As you add new features, you must make sure the old features still work (Regression Testing).

Starting Code:

The interviewer usually gives you this skeleton:

  • TaskStatus: A list of states (READY, IN_PROGRESS, SUCCEEDED, FAILED, BLOCKED).
  • Task: A class representing one job.
  • ToDoList: A class to manage all the tasks.

Part 1: Basic Features

What You Need to Build

Create the basic tools to manage tasks:

  • add_task(task_id, description)
    • Make a new task.
    • Set its status to READY by default.
    • If the ID already exists, raise an error.
  • get_task(task_id)
    • Find and return a task using its ID.
    • If the task is missing, raise an error.
  • update_status(task_id, new_status)
    • Change the status of a task.
    • Raise an error if the task is missing.

Solution Approach

We use a dictionary (HashMap) to store tasks for fast O(1) access.

from enum import Enum
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field

class TaskStatus(Enum):
    READY = "READY"
    IN_PROGRESS = "IN_PROGRESS"
    SUCCEEDED = "SUCCEEDED"
    FAILED = "FAILED"
    BLOCKED = "BLOCKED"

@dataclass
class Task:
    task_id: str
    description: str
    status: TaskStatus = TaskStatus.READY
    dependencies: Set[str] = field(default_factory=set)  # Used in Part 2

    def __repr__(self):
        return f"Task({self.task_id}, {self.status.value})"

class ToDoList:
    def __init__(self):
        self.tasks: Dict[str, Task] = {}
        # Track which tasks depend on a specific task
        self.dependents: Dict[str, Set[str]] = {}

    def add_task(self, task_id: str, description: str) -> Task:
        """Create a new task."""
        if task_id in self.tasks:
            raise ValueError(f"Task {task_id} already exists")

        task = Task(task_id=task_id, description=description)
        self.tasks[task_id] = task
        return task

    def get_task(self, task_id: str) -> Task:
        """Find a task by ID."""
        if task_id not in self.tasks:
            raise ValueError(f"Task {task_id} does not exist")
        return self.tasks[task_id]

    def update_status(self, task_id: str, new_status: TaskStatus) -> None:
        """Change a task's status."""
        task = self.get_task(task_id)
        task.status = new_status

Part 1 Test Cases

def test_part1():
    todo_list = ToDoList()

    # Test adding a task
    task1 = todo_list.add_task("task1", "First task")
    assert task1.status == TaskStatus.READY
    assert task1.task_id == "task1"

    # Test finding a task
    retrieved = todo_list.get_task("task1")
    assert retrieved.task_id == "task1"

    # Test updating status
    todo_list.update_status("task1", TaskStatus.IN_PROGRESS)
    assert todo_list.get_task("task1").status == TaskStatus.IN_PROGRESS

    todo_list.update_status("task1", TaskStatus.SUCCEEDED)
    assert todo_list.get_task("task1").status == TaskStatus.SUCCEEDED

    # Test errors
    try:
        todo_list.add_task("task1", "Duplicate")
        assert False, "Should raise error for duplicate ID"
    except ValueError:
        pass

    try:
        todo_list.get_task("nonexistent")
        assert False, "Should raise error for nonexistent task"
    except ValueError:
        pass

    print("Part 1 tests passed!")

# test_part1()

Part 2: Handling Dependencies

What You Need to Build

Now, tasks can depend on other tasks. This adds complexity:

  • Update add_task to handle dependencies:
    • You can now pass a list of parent task IDs.
    • If any parent task is not SUCCEEDED, the new task starts as BLOCKED.
    • If all parents are SUCCEEDED (or there are no parents), it starts as READY.
  • Automatic Status Updates:
    • Unblocking: When a parent task finishes (SUCCEEDED), check its children. If a child's parents are all done, change that child from BLOCKED to READY.
    • Cascading Failure: If a parent task fails (FAILED), all its children (and their children) must strictly become FAILED too.

Solution Approach

We need two maps:

  • tasks: task_id -> Task object.
  • dependents: parent_id -> Set of child_ids (reverse lookup). This helps us find children quickly when a parent updates.
class ToDoList:
    def __init__(self):
        self.tasks: Dict[str, Task] = {}
        # Track which tasks depend on each task (reverse mapping)
        self.dependents: Dict[str, Set[str]] = {}

    def add_task(self, task_id: str, description: str,
                 dependencies: List[str] = None) -> Task:
        """Add a new task with optional dependencies."""
        if task_id in self.tasks:
            raise ValueError(f"Task {task_id} already exists")

        # Check if dependency IDs exist
        dep_set = set(dependencies or [])
        for dep_id in dep_set:
            if dep_id not in self.tasks:
                raise ValueError(f"Dependency {dep_id} does not exist")

        # Set status based on parent tasks
        initial_status = TaskStatus.READY
        if dep_set:
            # Are all parents SUCCEEDED?
            all_succeeded = all(
                self.tasks[dep_id].status == TaskStatus.SUCCEEDED
                for dep_id in dep_set
            )
            if not all_succeeded:
                initial_status = TaskStatus.BLOCKED

        # Create the task
        task = Task(
            task_id=task_id,
            description=description,
            status=initial_status,
            dependencies=dep_set
        )
        self.tasks[task_id] = task

        # Update the reverse map so parents know their children
        for dep_id in dep_set:
            if dep_id not in self.dependents:
                self.dependents[dep_id] = set()
            self.dependents[dep_id].add(task_id)

        return task

    def get_task(self, task_id: str) -> Task:
        """Get a task by its ID."""
        if task_id not in self.tasks:
            raise ValueError(f"Task {task_id} does not exist")
        return self.tasks[task_id]

    def update_status(self, task_id: str, new_status: TaskStatus) -> None:
        """Update task status and trigger effects on children."""
        task = self.get_task(task_id)
        old_status = task.status
        task.status = new_status

        # Check for side effects
        if new_status == TaskStatus.SUCCEEDED:
            self._try_unblock_dependents(task_id)
        elif new_status == TaskStatus.FAILED:
            self._cascade_failure(task_id)

    def _try_unblock_dependents(self, task_id: str) -> None:
        """
        When a task succeeds, check if children can be unblocked.
        A child becomes READY only if ALL its parents are SUCCEEDED.
        """
        if task_id not in self.dependents:
            return

        for dependent_id in self.dependents[task_id]:
            dependent_task = self.tasks[dependent_id]

            # Only check if currently BLOCKED
            if dependent_task.status != TaskStatus.BLOCKED:
                continue

            # Check if all dependencies are SUCCEEDED
            all_deps_succeeded = all(
                self.tasks[dep_id].status == TaskStatus.SUCCEEDED
                for dep_id in dependent_task.dependencies
            )

            if all_deps_succeeded:
                dependent_task.status = TaskStatus.READY

    def _cascade_failure(self, task_id: str) -> None:
        """
        When a task fails, mark all children as FAILED recursively.
        """
        if task_id not in self.dependents:
            return

        for dependent_id in self.dependents[task_id]:
            dependent_task = self.tasks[dependent_id]

            # Only update if not already FAILED
            if dependent_task.status != TaskStatus.FAILED:
                dependent_task.status = TaskStatus.FAILED
                # Recursively fail the children of this task
                self._cascade_failure(dependent_id)

Part 2 Test Cases

def test_part2_dependencies():
    todo_list = ToDoList()

    # Chain: task1 -> task2 -> task3
    task1 = todo_list.add_task("task1", "First task")
    assert task1.status == TaskStatus.READY

    # Task2 waits for task1
    task2 = todo_list.add_task("task2", "Second task", dependencies=["task1"])
    assert task2.status == TaskStatus.BLOCKED, "Should be BLOCKED because task1 isn't done"

    # Task3 waits for task2
    task3 = todo_list.add_task("task3", "Third task", dependencies=["task2"])
    assert task3.status == TaskStatus.BLOCKED

    print("✓ Tasks start blocked correctly")

    # Test Unblocking: task1 finishes -> task2 becomes READY
    todo_list.update_status("task1", TaskStatus.SUCCEEDED)
    assert todo_list.get_task("task2").status == TaskStatus.READY, "Task2 should be READY"
    assert todo_list.get_task("task3").status == TaskStatus.BLOCKED, "Task3 stays BLOCKED"

    print("✓ Unblocking works")

    # Test Failure: task2 fails -> task3 fails
    todo_list.update_status("task2", TaskStatus.FAILED)
    assert todo_list.get_task("task3").status == TaskStatus.FAILED, "Task3 should fail too"

    print("✓ Failure cascade works")

    # Test Multiple Dependencies
    todo_list2 = ToDoList()
    t1 = todo_list2.add_task("t1", "Task 1")
    t2 = todo_list2.add_task("t2", "Task 2")
    # t3 waits for BOTH t1 and t2
    t3 = todo_list2.add_task("t3", "Task 3", dependencies=["t1", "t2"])

    assert t3.status == TaskStatus.BLOCKED

    # Only t1 finishes - t3 should wait
    todo_list2.update_status("t1", TaskStatus.SUCCEEDED)
    assert todo_list2.get_task("t3").status == TaskStatus.BLOCKED

    # Both finish - t3 runs
    todo_list2.update_status("t2", TaskStatus.SUCCEEDED)
    assert todo_list2.get_task("t3").status == TaskStatus.READY

    print("✓ Multiple dependencies work")
    print("Part 2 tests passed!")

# test_part2_dependencies()

Part 3: Searching and Filtering

What You Need to Build

Add methods to find specific types of tasks:

  • get_tasks_by_status: Find all tasks with a specific status (like "FAILED").
  • get_ready_tasks: Helper to find everything ready to run.
  • get_blocked_tasks: Helper to find stuck tasks.

Solution Approach

Iterate through the tasks values and filter them.

class ToDoList:
    # ... keep previous methods ...

    def get_tasks_by_status(self, status: TaskStatus) -> List[Task]:
        """Get all tasks that match a status."""
        return [
            task for task in self.tasks.values()
            if task.status == status
        ]

    def get_ready_tasks(self) -> List[Task]:
        """Get all READY tasks."""
        return self.get_tasks_by_status(TaskStatus.READY)

    def get_blocked_tasks(self) -> List[Task]:
        """Get all BLOCKED tasks."""
        return self.get_tasks_by_status(TaskStatus.BLOCKED)

    def get_task_dependencies_info(self, task_id: str) -> dict:
        """Debug helper: see parents and children of a task."""
        task = self.get_task(task_id)

        return {
            "task_id": task_id,
            "status": task.status,
            "dependencies": list(task.dependencies),
            "dependency_statuses": {
                dep_id: self.tasks[dep_id].status.value
                for dep_id in task.dependencies
            },
            "dependents": list(self.dependents.get(task_id, set()))
        }

Part 4: Adding Tags and Priority

What You Need to Build

Add more ways to organize tasks:

  • Tags: Labels like "Backend" or "Cleanup".
  • Priority: A number (higher numbers = more important).
  • Sorting: Return tasks in the correct order for execution (Topological Sort).

Solution Approach

Update the Task class and the add_task method. Use default values so we don't break the code from Part 1, 2, and 3.

@dataclass
class Task:
    task_id: str
    description: str
    status: TaskStatus = TaskStatus.READY
    dependencies: Set[str] = field(default_factory=set)
    tags: Set[str] = field(default_factory=set)  # NEW FIELD
    priority: int = 0  # NEW FIELD (higher = more important)

Update add_task:

class ToDoList:
    # ... keep previous methods ...

    def add_task(self, task_id: str, description: str,
                 dependencies: List[str] = None,
                 tags: Set[str] = None,
                 priority: int = 0) -> Task:
        """Add task with optional tags and priority."""
        if task_id in self.tasks:
            raise ValueError(f"Task {task_id} already exists")

        # Check dependencies
        dep_set = set(dependencies or [])
        for dep_id in dep_set:
            if dep_id not in self.tasks:
                raise ValueError(f"Dependency {dep_id} does not exist")

        # Set status
        initial_status = TaskStatus.READY
        if dep_set:
            all_succeeded = all(
                self.tasks[dep_id].status == TaskStatus.SUCCEEDED
                for dep_id in dep_set
            )
            if not all_succeeded:
                initial_status = TaskStatus.BLOCKED

        # Create task with ALL fields
        task = Task(
            task_id=task_id,
            description=description,
            status=initial_status,
            dependencies=dep_set,
            tags=tags or set(),
            priority=priority
        )
        self.tasks[task_id] = task

        # Update the reverse map so parents know their children
        for dep_id in dep_set:
            if dep_id not in self.dependents:
                self.dependents[dep_id] = set()
            self.dependents[dep_id].add(task_id)

        return task

    def get_tasks_by_tag(self, tag: str) -> List[Task]:
        """Find tasks with a specific tag."""
        return [
            task for task in self.tasks.values()
            if tag in task.tags
        ]

    def get_ready_tasks_sorted_by_priority(self) -> List[Task]:
        """Get READY tasks, highest priority first."""
        ready_tasks = self.get_ready_tasks()
        return sorted(ready_tasks, key=lambda t: t.priority, reverse=True)

    def get_topological_order(self) -> List[str]:
        """
        Return task IDs in an order where parents come before children.
        Uses Kahn's algorithm.
        """
        # Count how many dependencies each task has
        in_degree = {task_id: len(task.dependencies)
                     for task_id, task in self.tasks.items()}

        # Start with tasks that have 0 dependencies
        queue = [task_id for task_id, degree in in_degree.items() if degree == 0]
        result = []

        while queue:
            # Process a free task
            current = queue.pop(0)
            result.append(current)

            # Tell children that one parent is done
            for dependent_id in self.dependents.get(current, set()):
                in_degree[dependent_id] -= 1
                if in_degree[dependent_id] == 0:
                    queue.append(dependent_id)

        # If result length != total tasks, we have a cycle (infinite loop)
        if len(result) != len(self.tasks):
            raise ValueError("Circular dependency detected")

        return result

Important Coding Tips

1. Don't Break Old Features

This is critical. When you added priority in Part 4, if you changed the arguments for add_task without default values (= 0), the code from Part 1 would crash. Always check backward compatibility.

2. Validating Transitions

You might want to ensure tasks don't skip logical steps. For example, a SUCCEEDED task shouldn't suddenly become READY again.

# Allowed status changes
VALID_TRANSITIONS = {
    TaskStatus.READY: {TaskStatus.IN_PROGRESS, TaskStatus.FAILED},
    TaskStatus.IN_PROGRESS: {TaskStatus.SUCCEEDED, TaskStatus.FAILED},
    TaskStatus.BLOCKED: {TaskStatus.READY, TaskStatus.FAILED},
    TaskStatus.SUCCEEDED: set(),  # Final state
    TaskStatus.FAILED: set(),  # Final state
}

def update_status(self, task_id: str, new_status: TaskStatus) -> None:
    task = self.get_task(task_id)
    old_status = task.status

    # Check if this change is allowed
    if new_status not in VALID_TRANSITIONS.get(old_status, set()):
        raise ValueError(
            f"Invalid transition from {old_status.value} to {new_status.value}"
        )

    task.status = new_status

    # Run side effects (Unblock or Cascade Failure)
    if new_status == TaskStatus.SUCCEEDED:
        self._try_unblock_dependents(task_id)
    elif new_status == TaskStatus.FAILED:
        self._cascade_failure(task_id)

3. Avoiding Infinite Loops

In a real system, you must check for circular dependencies (e.g., Task A waits for Task B, and Task B waits for Task A). This usually requires a graph search algorithm.

Time and Space Complexity

Time Complexity

  • add_task: O(D). Depends on the number of dependencies (D).
  • get_task: O(1). Fast lookup.
  • update_status:
    • Unblocking: O(C × D). Depends on the number of children (C) and how many parents they have.
    • Failure Cascade: O(N). Worst case, one failure could affect every other task in the list.
  • get_topological_order: O(N + E). Standard graph traversal time (N = tasks, E = dependency links).

Space Complexity

  • Total: O(N + E). We store every task and every dependency link.

Common Mistakes

1. Failing to Recurse

When a task fails, you must fail its children. But don't forget to fail the children's children too. You need a recursive function for this.

Correct Way:

def _cascade_failure(self, task_id: str):
    for dep_id in self.dependents[task_id]:
        self.tasks[dep_id].status = TaskStatus.FAILED
        self._cascade_failure(dep_id)  # <-- This line is crucial!

2. Forgetting Edge Cases

  • What if the dependency list is empty?
  • What if a task has no children?
  • What if multiple parents finish at the same time?

3. Breaking Previous Tests

If you add tags and priority in Part 4, make sure add_task("id", "desc") still works. If you force the user to provide tags, you broke the API.

Related Practice Problems

  • LeetCode 207: Course Schedule (Detecting cycles).
  • LeetCode 210: Course Schedule II (Topological Sort).
  • Design Task Scheduler: Very similar logic.
Loading editor…
OUTPUTLast run results appear here.
No output yet. Click "Run Code" to see results.