Description
Company: Perplexity
The Challenge
You need to build a task management system for AI agents. The system handles tasks that have different states (like READY or BLOCKED) and dependencies (some tasks wait for others).
The interview starts with a simple problem, but it gets harder. Important: As you add new features, you must make sure the old features still work (Regression Testing).
Starting Code:
The interviewer usually gives you this skeleton:
TaskStatus: A list of states (READY, IN_PROGRESS, SUCCEEDED, FAILED, BLOCKED).Task: A class representing one job.ToDoList: A class to manage all the tasks.
Part 1: Basic Features
What You Need to Build
Create the basic tools to manage tasks:
add_task(task_id, description)- Make a new task.
- Set its status to
READYby default. - If the ID already exists, raise an error.
get_task(task_id)- Find and return a task using its ID.
- If the task is missing, raise an error.
update_status(task_id, new_status)- Change the status of a task.
- Raise an error if the task is missing.
Solution Approach
We use a dictionary (HashMap) to store tasks for fast O(1) access.
from enum import Enum
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
class TaskStatus(Enum):
READY = "READY"
IN_PROGRESS = "IN_PROGRESS"
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
BLOCKED = "BLOCKED"
@dataclass
class Task:
task_id: str
description: str
status: TaskStatus = TaskStatus.READY
dependencies: Set[str] = field(default_factory=set) # Used in Part 2
def __repr__(self):
return f"Task({self.task_id}, {self.status.value})"
class ToDoList:
def __init__(self):
self.tasks: Dict[str, Task] = {}
# Track which tasks depend on a specific task
self.dependents: Dict[str, Set[str]] = {}
def add_task(self, task_id: str, description: str) -> Task:
"""Create a new task."""
if task_id in self.tasks:
raise ValueError(f"Task {task_id} already exists")
task = Task(task_id=task_id, description=description)
self.tasks[task_id] = task
return task
def get_task(self, task_id: str) -> Task:
"""Find a task by ID."""
if task_id not in self.tasks:
raise ValueError(f"Task {task_id} does not exist")
return self.tasks[task_id]
def update_status(self, task_id: str, new_status: TaskStatus) -> None:
"""Change a task's status."""
task = self.get_task(task_id)
task.status = new_status
Part 1 Test Cases
def test_part1():
todo_list = ToDoList()
# Test adding a task
task1 = todo_list.add_task("task1", "First task")
assert task1.status == TaskStatus.READY
assert task1.task_id == "task1"
# Test finding a task
retrieved = todo_list.get_task("task1")
assert retrieved.task_id == "task1"
# Test updating status
todo_list.update_status("task1", TaskStatus.IN_PROGRESS)
assert todo_list.get_task("task1").status == TaskStatus.IN_PROGRESS
todo_list.update_status("task1", TaskStatus.SUCCEEDED)
assert todo_list.get_task("task1").status == TaskStatus.SUCCEEDED
# Test errors
try:
todo_list.add_task("task1", "Duplicate")
assert False, "Should raise error for duplicate ID"
except ValueError:
pass
try:
todo_list.get_task("nonexistent")
assert False, "Should raise error for nonexistent task"
except ValueError:
pass
print("Part 1 tests passed!")
# test_part1()
Part 2: Handling Dependencies
What You Need to Build
Now, tasks can depend on other tasks. This adds complexity:
- Update
add_taskto handle dependencies:- You can now pass a list of parent task IDs.
- If any parent task is not
SUCCEEDED, the new task starts asBLOCKED. - If all parents are
SUCCEEDED(or there are no parents), it starts asREADY.
- Automatic Status Updates:
- Unblocking: When a parent task finishes (
SUCCEEDED), check its children. If a child's parents are all done, change that child fromBLOCKEDtoREADY. - Cascading Failure: If a parent task fails (
FAILED), all its children (and their children) must strictly becomeFAILEDtoo.
- Unblocking: When a parent task finishes (
Solution Approach
We need two maps:
tasks:task_id->Taskobject.dependents:parent_id->Setofchild_ids(reverse lookup). This helps us find children quickly when a parent updates.
class ToDoList:
def __init__(self):
self.tasks: Dict[str, Task] = {}
# Track which tasks depend on each task (reverse mapping)
self.dependents: Dict[str, Set[str]] = {}
def add_task(self, task_id: str, description: str,
dependencies: List[str] = None) -> Task:
"""Add a new task with optional dependencies."""
if task_id in self.tasks:
raise ValueError(f"Task {task_id} already exists")
# Check if dependency IDs exist
dep_set = set(dependencies or [])
for dep_id in dep_set:
if dep_id not in self.tasks:
raise ValueError(f"Dependency {dep_id} does not exist")
# Set status based on parent tasks
initial_status = TaskStatus.READY
if dep_set:
# Are all parents SUCCEEDED?
all_succeeded = all(
self.tasks[dep_id].status == TaskStatus.SUCCEEDED
for dep_id in dep_set
)
if not all_succeeded:
initial_status = TaskStatus.BLOCKED
# Create the task
task = Task(
task_id=task_id,
description=description,
status=initial_status,
dependencies=dep_set
)
self.tasks[task_id] = task
# Update the reverse map so parents know their children
for dep_id in dep_set:
if dep_id not in self.dependents:
self.dependents[dep_id] = set()
self.dependents[dep_id].add(task_id)
return task
def get_task(self, task_id: str) -> Task:
"""Get a task by its ID."""
if task_id not in self.tasks:
raise ValueError(f"Task {task_id} does not exist")
return self.tasks[task_id]
def update_status(self, task_id: str, new_status: TaskStatus) -> None:
"""Update task status and trigger effects on children."""
task = self.get_task(task_id)
old_status = task.status
task.status = new_status
# Check for side effects
if new_status == TaskStatus.SUCCEEDED:
self._try_unblock_dependents(task_id)
elif new_status == TaskStatus.FAILED:
self._cascade_failure(task_id)
def _try_unblock_dependents(self, task_id: str) -> None:
"""
When a task succeeds, check if children can be unblocked.
A child becomes READY only if ALL its parents are SUCCEEDED.
"""
if task_id not in self.dependents:
return
for dependent_id in self.dependents[task_id]:
dependent_task = self.tasks[dependent_id]
# Only check if currently BLOCKED
if dependent_task.status != TaskStatus.BLOCKED:
continue
# Check if all dependencies are SUCCEEDED
all_deps_succeeded = all(
self.tasks[dep_id].status == TaskStatus.SUCCEEDED
for dep_id in dependent_task.dependencies
)
if all_deps_succeeded:
dependent_task.status = TaskStatus.READY
def _cascade_failure(self, task_id: str) -> None:
"""
When a task fails, mark all children as FAILED recursively.
"""
if task_id not in self.dependents:
return
for dependent_id in self.dependents[task_id]:
dependent_task = self.tasks[dependent_id]
# Only update if not already FAILED
if dependent_task.status != TaskStatus.FAILED:
dependent_task.status = TaskStatus.FAILED
# Recursively fail the children of this task
self._cascade_failure(dependent_id)
Part 2 Test Cases
def test_part2_dependencies():
todo_list = ToDoList()
# Chain: task1 -> task2 -> task3
task1 = todo_list.add_task("task1", "First task")
assert task1.status == TaskStatus.READY
# Task2 waits for task1
task2 = todo_list.add_task("task2", "Second task", dependencies=["task1"])
assert task2.status == TaskStatus.BLOCKED, "Should be BLOCKED because task1 isn't done"
# Task3 waits for task2
task3 = todo_list.add_task("task3", "Third task", dependencies=["task2"])
assert task3.status == TaskStatus.BLOCKED
print("✓ Tasks start blocked correctly")
# Test Unblocking: task1 finishes -> task2 becomes READY
todo_list.update_status("task1", TaskStatus.SUCCEEDED)
assert todo_list.get_task("task2").status == TaskStatus.READY, "Task2 should be READY"
assert todo_list.get_task("task3").status == TaskStatus.BLOCKED, "Task3 stays BLOCKED"
print("✓ Unblocking works")
# Test Failure: task2 fails -> task3 fails
todo_list.update_status("task2", TaskStatus.FAILED)
assert todo_list.get_task("task3").status == TaskStatus.FAILED, "Task3 should fail too"
print("✓ Failure cascade works")
# Test Multiple Dependencies
todo_list2 = ToDoList()
t1 = todo_list2.add_task("t1", "Task 1")
t2 = todo_list2.add_task("t2", "Task 2")
# t3 waits for BOTH t1 and t2
t3 = todo_list2.add_task("t3", "Task 3", dependencies=["t1", "t2"])
assert t3.status == TaskStatus.BLOCKED
# Only t1 finishes - t3 should wait
todo_list2.update_status("t1", TaskStatus.SUCCEEDED)
assert todo_list2.get_task("t3").status == TaskStatus.BLOCKED
# Both finish - t3 runs
todo_list2.update_status("t2", TaskStatus.SUCCEEDED)
assert todo_list2.get_task("t3").status == TaskStatus.READY
print("✓ Multiple dependencies work")
print("Part 2 tests passed!")
# test_part2_dependencies()
Part 3: Searching and Filtering
What You Need to Build
Add methods to find specific types of tasks:
get_tasks_by_status: Find all tasks with a specific status (like "FAILED").get_ready_tasks: Helper to find everything ready to run.get_blocked_tasks: Helper to find stuck tasks.
Solution Approach
Iterate through the tasks values and filter them.
class ToDoList:
# ... keep previous methods ...
def get_tasks_by_status(self, status: TaskStatus) -> List[Task]:
"""Get all tasks that match a status."""
return [
task for task in self.tasks.values()
if task.status == status
]
def get_ready_tasks(self) -> List[Task]:
"""Get all READY tasks."""
return self.get_tasks_by_status(TaskStatus.READY)
def get_blocked_tasks(self) -> List[Task]:
"""Get all BLOCKED tasks."""
return self.get_tasks_by_status(TaskStatus.BLOCKED)
def get_task_dependencies_info(self, task_id: str) -> dict:
"""Debug helper: see parents and children of a task."""
task = self.get_task(task_id)
return {
"task_id": task_id,
"status": task.status,
"dependencies": list(task.dependencies),
"dependency_statuses": {
dep_id: self.tasks[dep_id].status.value
for dep_id in task.dependencies
},
"dependents": list(self.dependents.get(task_id, set()))
}
Part 4: Adding Tags and Priority
What You Need to Build
Add more ways to organize tasks:
- Tags: Labels like "Backend" or "Cleanup".
- Priority: A number (higher numbers = more important).
- Sorting: Return tasks in the correct order for execution (Topological Sort).
Solution Approach
Update the Task class and the add_task method. Use default values so we don't break the code from Part 1, 2, and 3.
@dataclass
class Task:
task_id: str
description: str
status: TaskStatus = TaskStatus.READY
dependencies: Set[str] = field(default_factory=set)
tags: Set[str] = field(default_factory=set) # NEW FIELD
priority: int = 0 # NEW FIELD (higher = more important)
Update add_task:
class ToDoList:
# ... keep previous methods ...
def add_task(self, task_id: str, description: str,
dependencies: List[str] = None,
tags: Set[str] = None,
priority: int = 0) -> Task:
"""Add task with optional tags and priority."""
if task_id in self.tasks:
raise ValueError(f"Task {task_id} already exists")
# Check dependencies
dep_set = set(dependencies or [])
for dep_id in dep_set:
if dep_id not in self.tasks:
raise ValueError(f"Dependency {dep_id} does not exist")
# Set status
initial_status = TaskStatus.READY
if dep_set:
all_succeeded = all(
self.tasks[dep_id].status == TaskStatus.SUCCEEDED
for dep_id in dep_set
)
if not all_succeeded:
initial_status = TaskStatus.BLOCKED
# Create task with ALL fields
task = Task(
task_id=task_id,
description=description,
status=initial_status,
dependencies=dep_set,
tags=tags or set(),
priority=priority
)
self.tasks[task_id] = task
# Update the reverse map so parents know their children
for dep_id in dep_set:
if dep_id not in self.dependents:
self.dependents[dep_id] = set()
self.dependents[dep_id].add(task_id)
return task
def get_tasks_by_tag(self, tag: str) -> List[Task]:
"""Find tasks with a specific tag."""
return [
task for task in self.tasks.values()
if tag in task.tags
]
def get_ready_tasks_sorted_by_priority(self) -> List[Task]:
"""Get READY tasks, highest priority first."""
ready_tasks = self.get_ready_tasks()
return sorted(ready_tasks, key=lambda t: t.priority, reverse=True)
def get_topological_order(self) -> List[str]:
"""
Return task IDs in an order where parents come before children.
Uses Kahn's algorithm.
"""
# Count how many dependencies each task has
in_degree = {task_id: len(task.dependencies)
for task_id, task in self.tasks.items()}
# Start with tasks that have 0 dependencies
queue = [task_id for task_id, degree in in_degree.items() if degree == 0]
result = []
while queue:
# Process a free task
current = queue.pop(0)
result.append(current)
# Tell children that one parent is done
for dependent_id in self.dependents.get(current, set()):
in_degree[dependent_id] -= 1
if in_degree[dependent_id] == 0:
queue.append(dependent_id)
# If result length != total tasks, we have a cycle (infinite loop)
if len(result) != len(self.tasks):
raise ValueError("Circular dependency detected")
return result
Important Coding Tips
1. Don't Break Old Features
This is critical. When you added priority in Part 4, if you changed the arguments for add_task without default values (= 0), the code from Part 1 would crash. Always check backward compatibility.
2. Validating Transitions
You might want to ensure tasks don't skip logical steps. For example, a SUCCEEDED task shouldn't suddenly become READY again.
# Allowed status changes
VALID_TRANSITIONS = {
TaskStatus.READY: {TaskStatus.IN_PROGRESS, TaskStatus.FAILED},
TaskStatus.IN_PROGRESS: {TaskStatus.SUCCEEDED, TaskStatus.FAILED},
TaskStatus.BLOCKED: {TaskStatus.READY, TaskStatus.FAILED},
TaskStatus.SUCCEEDED: set(), # Final state
TaskStatus.FAILED: set(), # Final state
}
def update_status(self, task_id: str, new_status: TaskStatus) -> None:
task = self.get_task(task_id)
old_status = task.status
# Check if this change is allowed
if new_status not in VALID_TRANSITIONS.get(old_status, set()):
raise ValueError(
f"Invalid transition from {old_status.value} to {new_status.value}"
)
task.status = new_status
# Run side effects (Unblock or Cascade Failure)
if new_status == TaskStatus.SUCCEEDED:
self._try_unblock_dependents(task_id)
elif new_status == TaskStatus.FAILED:
self._cascade_failure(task_id)
3. Avoiding Infinite Loops
In a real system, you must check for circular dependencies (e.g., Task A waits for Task B, and Task B waits for Task A). This usually requires a graph search algorithm.
Time and Space Complexity
Time Complexity
add_task:O(D). Depends on the number of dependencies (D).get_task:O(1). Fast lookup.update_status:- Unblocking:
O(C × D). Depends on the number of children (C) and how many parents they have. - Failure Cascade:
O(N). Worst case, one failure could affect every other task in the list.
- Unblocking:
get_topological_order:O(N + E). Standard graph traversal time (N = tasks, E = dependency links).
Space Complexity
- Total:
O(N + E). We store every task and every dependency link.
Common Mistakes
1. Failing to Recurse
When a task fails, you must fail its children. But don't forget to fail the children's children too. You need a recursive function for this.
Correct Way:
def _cascade_failure(self, task_id: str):
for dep_id in self.dependents[task_id]:
self.tasks[dep_id].status = TaskStatus.FAILED
self._cascade_failure(dep_id) # <-- This line is crucial!
2. Forgetting Edge Cases
- What if the dependency list is empty?
- What if a task has no children?
- What if multiple parents finish at the same time?
3. Breaking Previous Tests
If you add tags and priority in Part 4, make sure add_task("id", "desc") still works. If you force the user to provide tags, you broke the API.
Related Practice Problems
- LeetCode 207: Course Schedule (Detecting cycles).
- LeetCode 210: Course Schedule II (Topological Sort).
- Design Task Scheduler: Very similar logic.