InterviewDB
Question
Agents Coding: Design a Task Orchestration System for Concurrent Agents
phone
Question Details
Problem
Build a simple agent orchestration framework. Each agent runs a task (a callable) and may spawn sub-tasks. Tasks have dependencies — a task cannot start until all its dependencies complete. The system should run independent tasks in parallel (simulated with threads or asyncio) and return results in topological order.
python
class AgentOrchestrator:
def add_task(self, task_id: str, fn: callable, deps: list[str]) -> None: ...
def run_all(self) -> dict[str, any]: ...
Example
orchestrator = AgentOrchestrator()
orchestrator.add_task("fetch", lambda: fetch_data(), deps=[])
orchestrator.add_task("parse", lambda: parse(results["fetch"]), deps=["fetch"])
orchestrator.add_task("store", lambda: store(results["parse"]), deps=["parse"])
orchestrator.add_task("notify", lambda: send_alert(), deps=["fetch"])
results = orchestrator.run_all()
# fetch runs first; parse and notify run after fetch; store runs after parse
Follow-ups
- How do you detect and report circular dependencies before execution?
- What happens if a task raises an exception — do dependent tasks still run?
- How would you add timeout and retry logic per task?
- How would this design change if tasks are distributed across machines?
Full Details
Problem
Build a simple agent orchestration framework. Each agent runs a task (a callable) and may spawn sub-tasks. Tasks have dependencies — a task cannot start until all its dependencies complete. The system should run independent tasks in parallel (simulated with threads or asyncio) and return results in topological order.
python
class AgentOrchestrator:
def add_task(self, task_id: str, fn: callable, deps: list[str]) -> None: ...
def run_all(self) -> dict[str, any]: ...
Example
orchestrator = AgentOrchestrator()
orchestrator.add_task("fetch", lambda: fetch_data(), deps=[])
orchestrator.add_task("parse", lambda: parse(results["fetch"]), deps=["fetch"])
orchestrator.add_task("store", lambda: store(results["parse"]), deps=["parse"])
orchestrator.add_task("notify", lambda: send_alert(), deps=["fetch"])
results = orchestrator.run_all()
# fetch runs first; parse and notify run after fetch; store runs after parse
Follow-ups
- How do you detect and report circular dependencies before execution?
- What happens if a task raises an exception — do dependent tasks still run?
- How would you add timeout and retry logic per task?
- How would this design change if tasks are distributed across machines?
Free preview. Unlock all questions →
Topics
Coding
Phone
More from Anthropic
Reddit
Anthropic phone screen question
Reddit
Anthropic Technical Interview (55 min CodeSignal) – Anyone done this before?
1p3a
Anthropic Software Engineer Interview OA and Phone Screen
1p3a
Anthropic Onsite Interview: In-Memory Cache Extension Coding Challenge
1p3a
Anthropic Growth SDE Onsite Interview Experience and Preparation