
Google VO 常考系统设计与编码结合的题型,这道题考察在 UDP 不可靠传输场景下,如何实现一个能按序号严格连续输出数据的 Sequencer 组件。题目背景来自日志系统、实时流处理、分布式 replication 等真实场景。
题目背景
UDP 传输不保证顺序,数据包可能乱序、重复甚至丢失。上游持续发送数据片段,每个片段包含 (data, seq_num)。
需要在接收端实现一个 Sequencer 组件,满足:
- 输出严格按
seq_num从小到大连续输出。 - 若中间序号未到达,缓存后续数据,等缺失数据到达后再一并输出。
思路讲解
核心数据结构:哈希表 + next_seq 指针
- 用一个
dict缓存已收到但暂时无法输出的片段,key = seq_num,value = data。 - 维护变量
next_seq,表示当前期望输出的下一个序号。
每次收到新片段的处理流程:
- 将
(seq_num, data)存入哈希表(去重:若已存在则忽略)。 - 检查
next_seq是否在哈希表中。 - 若存在:输出该数据,删除哈希表中该条目,
next_seq += 1,重复步骤 2。 - 若不存在:等待,不输出任何数据。
关键优势:
- 哈希表查询 O(1),每个片段最多存取各一次。
- 无需对缓存排序,逻辑简洁。
代码实现
class Sequencer:
def __init__(self):
self.buffer = {} # seq_num -> data
self.next_seq = 0 # 下一个期望输出的序号
self.output = [] # 已有序输出的数据列表
def receive(self, seq_num: int, data: str):
"""接收一个数据片段"""
# 去重:已输出或已缓存则忽略
if seq_num < self.next_seq or seq_num in self.buffer:
return
self.buffer[seq_num] = data
# 尝试连续输出
while self.next_seq in self.buffer:
self.output.append(self.buffer.pop(self.next_seq))
self.next_seq += 1
def get_output(self):
return self.output
Dry Run
接收顺序:(2, "C") → (0, "A") → (1, "B") → (3, "D")
| 操作 | buffer | next_seq | output |
|---|---|---|---|
| receive(2, "C") | {2:"C"} | 0 | [] |
| receive(0, "A") | {2:"C", 0:"A"} → 输出0,1? | 0 | 输出"A",next=1;1不在buffer,停止 |
| receive(1, "B") | {2:"C", 1:"B"} → 输出1,2 | 1 | 输出"B",next=2;输出"C",next=3 |
| receive(3, "D") | {3:"D"} → 输出3 | 3 | 输出"D",next=4 |
最终有序输出:A → B → C → D ✓
复杂度分析
- 时间:每个数据包 O(1) 接收,O(1) 输出(均摊)。总体 O(n)。
- 空间:O(k),k 为当前缓存中等待的乱序包数量(最坏 O(n))。
Follow-up:丢包处理策略
面试官通常会追问:如果某个 seq 永远不到达怎么办?
策略一:超时重传请求(ARQ)
记录每个缓存包的等待时间。若 next_seq 对应的包等待超过阈值 timeout,向发送端发送重传请求(NACK)。
import time
class SequencerWithTimeout(Sequencer):
def __init__(self, timeout=5.0):
super().__init__()
self.wait_start = None # 开始等待 next_seq 的时间
self.timeout = timeout
def receive(self, seq_num, data):
super().receive(seq_num, data)
if self.next_seq not in self.buffer:
if self.wait_start is None:
self.wait_start = time.time()
else:
self.wait_start = None
def check_timeout(self):
if self.wait_start and time.time() - self.wait_start > self.timeout:
return self.next_seq # 返回需要重传的 seq
return None
策略二:跳过丢失序号(Skip & Forward)
若等待时间超过阈值且无法重传(如直播场景),直接将 next_seq 推进到下一个已缓存的序号,用占位符标记缺失数据,保证系统继续推进。
def skip_missing(self):
"""跳过丢失的 seq,推进 next_seq"""
if not self.buffer:
return
# 推进到最小的已缓存序号
next_available = min(self.buffer.keys())
self.output.append(None) # 占位符,表示丢失
self.next_seq = next_available
# 继续尝试输出
while self.next_seq in self.buffer:
self.output.append(self.buffer.pop(self.next_seq))
self.next_seq += 1
策略对比
| 策略 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 超时重传(ARQ) | 日志、replication | 数据完整 | 延迟增加 |
| 跳过丢失(Skip) | 直播、实时音视频 | 低延迟 | 数据不完整 |
扩展讨论
- 重复包:代码中
if seq_num < self.next_seq or seq_num in self.buffer已处理去重。 - 滑动窗口:可加入窗口大小限制,防止缓冲区无限增长(类似 TCP 接收窗口)。
- 多线程:生产环境中
receive和check_timeout在不同线程,需加锁保护buffer和next_seq。
💬 需要 VO 辅助?
Google、Amazon、TikTok、微软、Meta、Uber 等方向 VO 实时辅助持续进行中。
微信:Coding0201
#转码 #Google面试 #26ng #sde找工 #西雅图找工 #VO辅助 #面经
📬 联系方式
Email: [email protected]
Telegram: @OAVOProxy