Applied Intuition

Applied Intuition Software Engineer Onsite Coding Questions

8+ questions from real Applied Intuition Software Engineer Onsite Coding rounds, reported by candidates who interviewed there.

8
Questions
7
Topic Areas
10+
Sources

What does the Applied Intuition Onsite Coding round test?

The Applied Intuition onsite coding round is the core technical evaluation. Software Engineer candidates typically see 2-3 algorithm and data structure problems. Problems range from medium to hard difficulty, and interviewers evaluate both correctness and code quality.

Top Topics in This Round

Applied Intuition Software Engineer Onsite Coding Questions

LeetCode #317: Shortest Distance from All Buildings. Difficulty: Hard. Topics: Array, Breadth-First Search, Matrix. Asked at Applied Intuition in the last 6 months.

LeetCode #609: Find Duplicate File in System. Difficulty: Medium. Topics: Array, Hash Table, String. Asked at Applied Intuition in the last 6 months.

## Problem You are asked to design a system that ingests 10 TB of application logs per day, processes them for anomaly detection and aggregation, and serves dashboards with sub-second query latency. Walk through the architecture. ``` Requirements: - Ingest: 10 TB/day, 100k events/sec peak - Processing: real-time aggregation + batch anomaly detection - Query: dashboard queries over last 7 days, p99 < 500ms - Retention: 90 days hot, 3 years cold - Fault tolerance: no data loss ``` **Proposed architecture:** ``` Producers -> Kafka (partitioned by service) -+-> Flink (real-time agg) | -> Redis (hot counters) +-> S3 (raw parquet, partitioned by date) -> Spark (batch anomaly, daily) -> Trino/Presto (ad-hoc SQL) Dashboard -> Druid (pre-agg OLAP) ``` ## Follow-ups 1. Why partition Kafka by service rather than by timestamp? What are the tradeoffs? 2. How do you handle late-arriving events in the Flink streaming layer? 3. What compaction and partitioning strategy on S3 enables fast Trino queries? 4. How would you implement exactly-once semantics end-to-end from Kafka to the database?

## Problem Design a `JobMonitor` that tracks the lifecycle of asynchronous jobs. Jobs can be registered, updated with status, and queried. Alert (log to a provided callback) when a job fails or has not completed within a timeout period. ```python from typing import Callable import time class JobMonitor: def __init__(self, alert_callback: Callable[[str, str], None]): """alert_callback(job_id, reason) is called on failure/timeout.""" pass def register(self, job_id: str, timeout_sec: int) -> None: """Register a new job with a timeout.""" pass def update_status(self, job_id: str, status: str) -> None: """status: 'running', 'completed', 'failed'""" pass def check_timeouts(self, current_time: float | None = None) -> None: """Call periodically; alerts on any job past its timeout.""" pass def get_status(self, job_id: str) -> dict: pass ``` ``` monitor = JobMonitor(alert_callback=lambda jid, reason: print(f"{jid}: {reason}")) monitor.register("job1", timeout_sec=30) monitor.update_status("job1", "running") # ... 31 seconds later ... monitor.check_timeouts() # -> prints "job1: timeout" ``` ## Follow-ups 1. How do you efficiently find all timed-out jobs without scanning all registered jobs every check? 2. How would you persist job state so the monitor survives process restarts? 3. Extend to support job dependencies: job B should not start until job A completes. 4. How would you expose this as a REST API with endpoints for registration, status updates, and querying?

## Problem Implement a key-value store with get, set, and optionally delete operations. ## Likely LeetCode equivalent Similar to LC 146 LRU Cache design. ## Tags coding, hash_table, design, onsite

## Problem You are given a sequence of 2D waypoints that define a road lane as a polyline. Given a query point `P`, determine which segment of the polyline it belongs to (i.e., which consecutive pair of waypoints `(W[i], W[i+1])` is closest to `P`), and return the index `i`. ```python def find_lane_segment(waypoints: list[tuple[float, float]], point: tuple[float, float]) -> int: # waypoints: [(x0,y0), (x1,y1), ...], len >= 2 # Returns index i such that segment (waypoints[i], waypoints[i+1]) is closest to point pass ``` **Example:** ``` waypoints = [(0,0), (10,0), (10,10), (20,10)] point = (11, 3) -> 1 # closest to segment (10,0)-(10,10) point = (5, 2) -> 0 # closest to segment (0,0)-(10,0) ``` ## Follow-ups 1. How do you compute the perpendicular (orthogonal) distance from a point to a finite line segment vs. an infinite line? 2. What if multiple segments are equidistant? How do you break ties? 3. How would you extend this to 3D waypoints for autonomous vehicle path tracking? 4. If you need to handle thousands of query points per second, what spatial index structure would you use to speed up segment lookup?

## Problem Two players alternate turns on an N x M grid. Each cell is either empty (`.`) or a mine (`*`). A player places their token on the grid and must move it exactly one step (up, down, left, right) on their turn. A player loses if they step on a mine or have no valid moves. Both play optimally. Given the starting position and whose turn it is, determine whether the first player wins or loses. ```python def mine_game(grid: list[str], start_r: int, start_c: int) -> str: # Returns "WIN" if first player wins, "LOSE" otherwise pass ``` **Example:** ``` grid = [ "...", ".*.", "..." ] start_r, start_c = 0, 0 -> "WIN" grid = [".*", ".."], start_r=0, start_c=0 -> "WIN" # first player can avoid the mine ``` ## Follow-ups 1. What game-theory concept underpins this problem, and what is the time complexity of the minimax approach? 2. How would you use memoization to avoid recomputing states you have already visited? 3. What if a player can move 1 OR 2 steps in a single turn? How does the state space change? 4. Can you detect cycles (infinite games) and handle them as a draw rather than a win/loss?

## Problem You are given a list of GPS readings for a vehicle: each reading has a timestamp (in seconds) and a position `(x, y)` in meters. Compute the instantaneous velocity (m/s) at each interior point using central differences, and return the average speed over the full trace. ```python from dataclasses import dataclass @dataclass class GPSReading: t: float # seconds x: float y: float def compute_velocities(readings: list[GPSReading]) -> tuple[list[float], float]: # Returns (instantaneous_speeds, average_speed) # instantaneous_speeds[0] and [-1] should use forward/backward differences pass ``` **Example:** ``` readings = [ GPSReading(0, 0, 0), GPSReading(1, 3, 4), # distance from prev = 5m GPSReading(3, 9, 12), # distance from prev = 10m over 2s ] -> speeds = [5.0, 5.0, 5.0], avg = 5.0 ``` ## Follow-ups 1. What is the difference between central, forward, and backward finite-difference approximations for velocity? 2. GPS readings often have noise. What smoothing technique would you apply before computing velocities? 3. How would you compute acceleration from the velocity array? 4. If timestamps are not uniformly spaced, how does that affect the finite-difference formula?

See All 8 Questions from This Round

Full question text, answer context, and frequency data for subscribers.

Get Access