← 返回博客列表
算法优化

数组数字频率统计算法优化 - 原地标记与计数策略|OAVOService 面试助攻

2026-01-10

问题背景

在数据分析和系统监控中,频率统计是一个基础且重要的操作。特别是在资源受限的嵌入式系统或高并发场景下,如何高效地统计给定范围内数字的出现频率,直接影响系统的性能表现。

OAVOService 面试助攻团队 在长期的大厂算法面试辅导中发现,频率统计问题经常出现在 Google、Amazon、Microsoft 等公司的编程面试中,是考察候选人对空间时间权衡、数组操作技巧的经典题目。

问题描述

Problem 3 — Print Frequency of Numbers from 1 to N

给定一个包含 N 个整数的数组,数组中只包含 1 到 N 的整数。某些数字可能出现多次,某些数字可能不出现。

编写代码打印出 1 到 N 每个数字在数组中出现的次数。

示例:

PrintFrequency(new int[] {3, 1, 2});
输出:
1 : 1
2 : 1
3 : 1

PrintFrequency(new int[] {2, 4, 4, 6, 6, 6});
输出:
1 : 0
2 : 1
3 : 0
4 : 2
5 : 0
6 : 3

核心考察点:

解法分析

解法一:标准计数数组(推荐)

def print_frequency_standard(arr):
    """
    标准计数数组解法
    时间复杂度: O(n)
    空间复杂度: O(n)
    """
    if not arr:
        return
    
    n = len(arr)
    # 创建计数数组,索引0对应数字1
    count = [0] * n
    
    # 统计每个数字的频率
    for num in arr:
        if 1 <= num <= n:  # 边界检查
            count[num - 1] += 1
    
    # 打印结果
    for i in range(n):
        print(f"{i + 1} : {count[i]}")

# 优化版本:支持任意范围
def print_frequency_range(arr, min_val, max_val):
    """
    支持任意数字范围的频率统计
    """
    count = [0] * (max_val - min_val + 1)
    
    for num in arr:
        if min_val <= num <= max_val:
            count[num - min_val] += 1
    
    for i in range(len(count)):
        print(f"{i + min_val} : {count[i]}")

解法二:原地标记法(进阶优化)

def print_frequency_inplace(arr):
    """
    原地标记法 - O(1) 额外空间
    时间复杂度: O(n)
    空间复杂度: O(1)
    
    核心思想:使用数组元素的正负号来标记访问状态
    """
    if not arr:
        return
    
    n = len(arr)
    
    # 第一步:标记出现的数字
    for i in range(n):
        # 获取当前数字(可能已被标记为负数)
        num = abs(arr[i])
        
        if 1 <= num <= n:
            # 将对应位置标记为负数(表示该数字出现过)
            if arr[num - 1] > 0:
                arr[num - 1] = -arr[num - 1]
    
    # 第二步:统计频率
    # 重新遍历,计算每个数字的出现次数
    frequency = [0] * n
    for num in arr:
        abs_num = abs(num)
        if 1 <= abs_num <= n:
            frequency[abs_num - 1] += 1
    
    # 第三步:恢复原数组(可选)
    for i in range(n):
        arr[i] = abs(arr[i])
    
    # 打印结果
    for i in range(n):
        print(f"{i + 1} : {frequency[i]}")

解法三:哈希表法(通用)

from collections import defaultdict, Counter

def print_frequency_hash(arr):
    """
    哈希表解法 - 适用于任意数字范围
    时间复杂度: O(n)
    空间复杂度: O(k),k为不同数字的个数
    """
    if not arr:
        return
    
    # 方法1:使用Counter
    frequency = Counter(arr)
    n = len(arr)
    
    for i in range(1, n + 1):
        print(f"{i} : {frequency.get(i, 0)}")
    
    # 方法2:手动计数
    # count_map = defaultdict(int)
    # for num in arr:
    #     count_map[num] += 1
    
    # for i in range(1, n + 1):
    #     print(f"{i} : {count_map[i]}")

高级优化策略

1. 分块处理大数组

def print_frequency_chunked(arr, chunk_size=1000):
    """
    分块处理大数组,减少内存峰值
    适用于内存受限场景
    """
    if not arr:
        return
    
    n = len(arr)
    total_count = [0] * n
    
    # 分块处理
    for start in range(0, len(arr), chunk_size):
        end = min(start + chunk_size, len(arr))
        chunk = arr[start:end]
        
        # 处理当前块
        chunk_count = [0] * n
        for num in chunk:
            if 1 <= num <= n:
                chunk_count[num - 1] += 1
        
        # 累加到总计数
        for i in range(n):
            total_count[i] += chunk_count[i]
    
    # 打印结果
    for i in range(n):
        print(f"{i + 1} : {total_count[i]}")

2. 并行处理优化

from concurrent.futures import ThreadPoolExecutor
import threading

def print_frequency_parallel(arr, num_threads=4):
    """
    多线程并行处理
    适用于大数组的频率统计
    """
    if not arr:
        return
    
    n = len(arr)
    chunk_size = len(arr) // num_threads
    results = [None] * num_threads
    lock = threading.Lock()
    
    def process_chunk(thread_id, start_idx, end_idx):
        chunk_count = [0] * n
        for i in range(start_idx, end_idx):
            num = arr[i]
            if 1 <= num <= n:
                chunk_count[num - 1] += 1
        
        with lock:
            results[thread_id] = chunk_count
    
    # 启动线程
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = []
        for i in range(num_threads):
            start = i * chunk_size
            end = len(arr) if i == num_threads - 1 else (i + 1) * chunk_size
            future = executor.submit(process_chunk, i, start, end)
            futures.append(future)
        
        # 等待所有线程完成
        for future in futures:
            future.result()
    
    # 合并结果
    total_count = [0] * n
    for chunk_result in results:
        if chunk_result:
            for i in range(n):
                total_count[i] += chunk_result[i]
    
    # 打印结果
    for i in range(n):
        print(f"{i + 1} : {total_count[i]}")

3. 流式处理大文件

def print_frequency_streaming(file_path):
    """
    流式处理大文件中的数字频率
    适用于无法一次性加载到内存的大数据集
    """
    frequency = {}
    max_num = 0
    
    with open(file_path, 'r') as file:
        for line in file:
            numbers = map(int, line.strip().split())
            for num in numbers:
                frequency[num] = frequency.get(num, 0) + 1
                max_num = max(max_num, num)
    
    # 打印结果
    for i in range(1, max_num + 1):
        print(f"{i} : {frequency.get(i, 0)}")

性能基准测试

import time
import random

def benchmark_frequency_methods():
    """
    性能基准测试
    """
    # 生成测试数据
    test_sizes = [1000, 10000, 100000]
    
    for size in test_sizes:
        arr = [random.randint(1, size) for _ in range(size)]
        
        print(f"\n测试数组大小: {size}")
        
        # 测试标准计数法
        start_time = time.time()
        print_frequency_standard(arr.copy())
        standard_time = time.time() - start_time
        
        # 测试原地标记法
        start_time = time.time()
        print_frequency_inplace(arr.copy())
        inplace_time = time.time() - start_time
        
        # 测试哈希表法
        start_time = time.time()
        print_frequency_hash(arr.copy())
        hash_time = time.time() - start_time
        
        print(f"标准计数法: {standard_time:.4f}s")
        print(f"原地标记法: {inplace_time:.4f}s")
        print(f"哈希表法: {hash_time:.4f}s")

OAVOService 面试技巧

常见面试变形

  1. 限制空间复杂度 - 要求 O(1) 空间解法
  2. 处理负数/零 - 扩展到任意整数范围
  3. Top-K频率 - 只输出出现次数最多的K个数字
  4. 实时更新 - 支持动态增删数字

面试讨论要点

复杂度对比

方法 时间复杂度 空间复杂度 适用场景
标准计数 O(n) O(n) 数字范围已知且不大
原地标记 O(n) O(1) 空间受限,数字范围小
哈希表 O(n) O(k) 数字范围大,稀疏分布
分块处理 O(n) O(n/块数) 内存受限的大数组

OAVOService 专业面试助攻服务 - 我们深度解析各类算法面试题目,为学员提供 Google、Amazon、Microsoft、Meta 等顶级公司的实时面试辅助。凭借丰富的面试经验和算法优化技巧,助你在编程面试中脱颖而出。

如果你正在准备算法面试,欢迎了解我们的 定制算法面试训练方案 —— 从基础数据结构到高级优化技巧,全面提升你的编程面试竞争力。

标签: 频率统计, 数组算法, 空间优化, 原地操作, 算法面试, OAVOService