InterviewDB Question

Agents Coding: Design a Task Orchestration System for Concurrent Agents

Question Details

Problem

Build a simple agent orchestration framework. Each agent runs a task (a callable) and may spawn sub-tasks. Tasks have dependencies — a task cannot start until all its dependencies complete. The system should run independent tasks in parallel (simulated with threads or asyncio) and return results in topological order.

python
class AgentOrchestrator:
    def add_task(self, task_id: str, fn: callable, deps: list[str]) -> None: ...
    def run_all(self) -> dict[str, any]: ...

Example

orchestrator = AgentOrchestrator()
orchestrator.add_task("fetch", lambda: fetch_data(), deps=[])
orchestrator.add_task("parse", lambda: parse(results["fetch"]), deps=["fetch"])
orchestrator.add_task("store", lambda: store(results["parse"]), deps=["parse"])
orchestrator.add_task("notify", lambda: send_alert(), deps=["fetch"])

results = orchestrator.run_all()
# fetch runs first; parse and notify run after fetch; store runs after parse

Follow-ups

  1. How do you detect and report circular dependencies before execution?
  2. What happens if a task raises an exception — do dependent tasks still run?
  3. How would you add timeout and retry logic per task?
  4. How would this design change if tasks are distributed across machines?

Full Details

Problem

Build a simple agent orchestration framework. Each agent runs a task (a callable) and may spawn sub-tasks. Tasks have dependencies — a task cannot start until all its dependencies complete. The system should run independent tasks in parallel (simulated with threads or asyncio) and return results in topological order.

python
class AgentOrchestrator:
    def add_task(self, task_id: str, fn: callable, deps: list[str]) -> None: ...
    def run_all(self) -> dict[str, any]: ...

Example

orchestrator = AgentOrchestrator()
orchestrator.add_task("fetch", lambda: fetch_data(), deps=[])
orchestrator.add_task("parse", lambda: parse(results["fetch"]), deps=["fetch"])
orchestrator.add_task("store", lambda: store(results["parse"]), deps=["parse"])
orchestrator.add_task("notify", lambda: send_alert(), deps=["fetch"])

results = orchestrator.run_all()
# fetch runs first; parse and notify run after fetch; store runs after parse

Follow-ups

  1. How do you detect and report circular dependencies before execution?
  2. What happens if a task raises an exception — do dependent tasks still run?
  3. How would you add timeout and retry logic per task?
  4. How would this design change if tasks are distributed across machines?
Free preview. Unlock all questions →

Topics

Coding Phone