问题背景
在数据分析和系统监控中,频率统计是一个基础且重要的操作。特别是在资源受限的嵌入式系统或高并发场景下,如何高效地统计给定范围内数字的出现频率,直接影响系统的性能表现。
OAVOService 面试助攻团队 在长期的大厂算法面试辅导中发现,频率统计问题经常出现在 Google、Amazon、Microsoft 等公司的编程面试中,是考察候选人对空间时间权衡、数组操作技巧的经典题目。
问题描述
Problem 3 — Print Frequency of Numbers from 1 to N
给定一个包含 N 个整数的数组,数组中只包含 1 到 N 的整数。某些数字可能出现多次,某些数字可能不出现。
编写代码打印出 1 到 N 每个数字在数组中出现的次数。
示例:
PrintFrequency(new int[] {3, 1, 2});
输出:
1 : 1
2 : 1
3 : 1
PrintFrequency(new int[] {2, 4, 4, 6, 6, 6});
输出:
1 : 0
2 : 1
3 : 0
4 : 2
5 : 0
6 : 3
核心考察点:
- 数组遍历与计数算法
- 空间复杂度优化技巧
- 原地操作的数组修改策略
- 边界条件处理
解法分析
解法一:标准计数数组(推荐)
def print_frequency_standard(arr):
"""
标准计数数组解法
时间复杂度: O(n)
空间复杂度: O(n)
"""
if not arr:
return
n = len(arr)
# 创建计数数组,索引0对应数字1
count = [0] * n
# 统计每个数字的频率
for num in arr:
if 1 <= num <= n: # 边界检查
count[num - 1] += 1
# 打印结果
for i in range(n):
print(f"{i + 1} : {count[i]}")
# 优化版本:支持任意范围
def print_frequency_range(arr, min_val, max_val):
"""
支持任意数字范围的频率统计
"""
count = [0] * (max_val - min_val + 1)
for num in arr:
if min_val <= num <= max_val:
count[num - min_val] += 1
for i in range(len(count)):
print(f"{i + min_val} : {count[i]}")
解法二:原地标记法(进阶优化)
def print_frequency_inplace(arr):
"""
原地标记法 - O(1) 额外空间
时间复杂度: O(n)
空间复杂度: O(1)
核心思想:使用数组元素的正负号来标记访问状态
"""
if not arr:
return
n = len(arr)
# 第一步:标记出现的数字
for i in range(n):
# 获取当前数字(可能已被标记为负数)
num = abs(arr[i])
if 1 <= num <= n:
# 将对应位置标记为负数(表示该数字出现过)
if arr[num - 1] > 0:
arr[num - 1] = -arr[num - 1]
# 第二步:统计频率
# 重新遍历,计算每个数字的出现次数
frequency = [0] * n
for num in arr:
abs_num = abs(num)
if 1 <= abs_num <= n:
frequency[abs_num - 1] += 1
# 第三步:恢复原数组(可选)
for i in range(n):
arr[i] = abs(arr[i])
# 打印结果
for i in range(n):
print(f"{i + 1} : {frequency[i]}")
解法三:哈希表法(通用)
from collections import defaultdict, Counter
def print_frequency_hash(arr):
"""
哈希表解法 - 适用于任意数字范围
时间复杂度: O(n)
空间复杂度: O(k),k为不同数字的个数
"""
if not arr:
return
# 方法1:使用Counter
frequency = Counter(arr)
n = len(arr)
for i in range(1, n + 1):
print(f"{i} : {frequency.get(i, 0)}")
# 方法2:手动计数
# count_map = defaultdict(int)
# for num in arr:
# count_map[num] += 1
# for i in range(1, n + 1):
# print(f"{i} : {count_map[i]}")
高级优化策略
1. 分块处理大数组
def print_frequency_chunked(arr, chunk_size=1000):
"""
分块处理大数组,减少内存峰值
适用于内存受限场景
"""
if not arr:
return
n = len(arr)
total_count = [0] * n
# 分块处理
for start in range(0, len(arr), chunk_size):
end = min(start + chunk_size, len(arr))
chunk = arr[start:end]
# 处理当前块
chunk_count = [0] * n
for num in chunk:
if 1 <= num <= n:
chunk_count[num - 1] += 1
# 累加到总计数
for i in range(n):
total_count[i] += chunk_count[i]
# 打印结果
for i in range(n):
print(f"{i + 1} : {total_count[i]}")
2. 并行处理优化
from concurrent.futures import ThreadPoolExecutor
import threading
def print_frequency_parallel(arr, num_threads=4):
"""
多线程并行处理
适用于大数组的频率统计
"""
if not arr:
return
n = len(arr)
chunk_size = len(arr) // num_threads
results = [None] * num_threads
lock = threading.Lock()
def process_chunk(thread_id, start_idx, end_idx):
chunk_count = [0] * n
for i in range(start_idx, end_idx):
num = arr[i]
if 1 <= num <= n:
chunk_count[num - 1] += 1
with lock:
results[thread_id] = chunk_count
# 启动线程
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
for i in range(num_threads):
start = i * chunk_size
end = len(arr) if i == num_threads - 1 else (i + 1) * chunk_size
future = executor.submit(process_chunk, i, start, end)
futures.append(future)
# 等待所有线程完成
for future in futures:
future.result()
# 合并结果
total_count = [0] * n
for chunk_result in results:
if chunk_result:
for i in range(n):
total_count[i] += chunk_result[i]
# 打印结果
for i in range(n):
print(f"{i + 1} : {total_count[i]}")
3. 流式处理大文件
def print_frequency_streaming(file_path):
"""
流式处理大文件中的数字频率
适用于无法一次性加载到内存的大数据集
"""
frequency = {}
max_num = 0
with open(file_path, 'r') as file:
for line in file:
numbers = map(int, line.strip().split())
for num in numbers:
frequency[num] = frequency.get(num, 0) + 1
max_num = max(max_num, num)
# 打印结果
for i in range(1, max_num + 1):
print(f"{i} : {frequency.get(i, 0)}")
性能基准测试
import time
import random
def benchmark_frequency_methods():
"""
性能基准测试
"""
# 生成测试数据
test_sizes = [1000, 10000, 100000]
for size in test_sizes:
arr = [random.randint(1, size) for _ in range(size)]
print(f"\n测试数组大小: {size}")
# 测试标准计数法
start_time = time.time()
print_frequency_standard(arr.copy())
standard_time = time.time() - start_time
# 测试原地标记法
start_time = time.time()
print_frequency_inplace(arr.copy())
inplace_time = time.time() - start_time
# 测试哈希表法
start_time = time.time()
print_frequency_hash(arr.copy())
hash_time = time.time() - start_time
print(f"标准计数法: {standard_time:.4f}s")
print(f"原地标记法: {inplace_time:.4f}s")
print(f"哈希表法: {hash_time:.4f}s")
OAVOService 面试技巧
常见面试变形
- 限制空间复杂度 - 要求 O(1) 空间解法
- 处理负数/零 - 扩展到任意整数范围
- Top-K频率 - 只输出出现次数最多的K个数字
- 实时更新 - 支持动态增删数字
面试讨论要点
- 算法选择依据 - 根据数据特征选择合适算法
- 边界条件处理 - 空数组、重复元素、越界值
- 扩展性考虑 - 如何支持更大的数据规模
- 实际应用场景 - 日志分析、用户行为统计等
复杂度对比
| 方法 | 时间复杂度 | 空间复杂度 | 适用场景 |
|---|---|---|---|
| 标准计数 | O(n) | O(n) | 数字范围已知且不大 |
| 原地标记 | O(n) | O(1) | 空间受限,数字范围小 |
| 哈希表 | O(n) | O(k) | 数字范围大,稀疏分布 |
| 分块处理 | O(n) | O(n/块数) | 内存受限的大数组 |
OAVOService 专业面试助攻服务 - 我们深度解析各类算法面试题目,为学员提供 Google、Amazon、Microsoft、Meta 等顶级公司的实时面试辅助。凭借丰富的面试经验和算法优化技巧,助你在编程面试中脱颖而出。
如果你正在准备算法面试,欢迎了解我们的 定制算法面试训练方案 —— 从基础数据结构到高级优化技巧,全面提升你的编程面试竞争力。
标签: 频率统计, 数组算法, 空间优化, 原地操作, 算法面试, OAVOService