在音乐流媒体平台的面试中,播放列表管理系统是一个经典的系统设计题目。本文通过 Spotify 风格的播放列表管理器设计,展示如何处理大规模用户数据和实时推荐,csvosupport 助你掌握音乐平台核心技术
📋 问题背景
设计一个智能播放列表管理系统,支持
- 创建和管理多个播放列
- 歌曲的添加、删除、移
- 播放历史记录
- 基于用户喜好的智能推
- 多设备同
🎯 核心挑战
- 数据结构选择 - 如何高效存储和操作播放列
- 推荐算法 - 基于历史和偏好的歌曲推荐
- 并发控制 - 多设备同时操作的一致
- 性能优化 - 大规模用户的实时响应
💡 系统设计(csvosupport 指导
数据结构设计
from collections import deque, defaultdict
import heapq
class Song:
def __init__(self, song_id, title, artist, duration, genre):
self.id = song_id
self.title = title
self.artist = artist
self.duration = duration
self.genre = genre
self.play_count = 0
self.last_played = None
class Playlist:
def __init__(self, playlist_id, name, owner):
self.id = playlist_id
self.name = name
self.owner = owner
self.songs = [] # 有序列表
self.song_set = set() # 快速查
self.created_at = datetime.now()
self.total_duration = 0
def add_song(self, song, position=None):
if song.id in self.song_set:
return False
if position is None:
self.songs.append(song)
else:
self.songs.insert(position, song)
self.song_set.add(song.id)
self.total_duration += song.duration
return True
def remove_song(self, song_id):
if song_id not in self.song_set:
return False
for i, song in enumerate(self.songs):
if song.id == song_id:
self.songs.pop(i)
self.song_set.remove(song_id)
self.total_duration -= song.duration
return True
return False
def move_song(self, from_pos, to_pos):
if 0 <= from_pos < len(self.songs) and 0 <= to_pos < len(self.songs):
song = self.songs.pop(from_pos)
self.songs.insert(to_pos, song)
return True
return False
class PlaylistManager:
def __init__(self):
self.playlists = {} # {playlist_id: Playlist}
self.user_playlists = defaultdict(list) # {user_id: [playlist_ids]}
self.play_history = deque(maxlen=1000) # 最000首播放记
self.song_database = {} # {song_id: Song}
def create_playlist(self, user_id, name):
playlist_id = self._generate_id()
playlist = Playlist(playlist_id, name, user_id)
self.playlists[playlist_id] = playlist
self.user_playlists[user_id].append(playlist_id)
return playlist_id
def get_user_playlists(self, user_id):
return [self.playlists[pid] for pid in self.user_playlists[user_id]]
智能推荐算法
class RecommendationEngine:
def __init__(self, playlist_manager):
self.manager = playlist_manager
self.user_preferences = defaultdict(lambda: defaultdict(int))
def update_preferences(self, user_id, song):
# 更新用户对流派、艺术家的偏
self.user_preferences[user_id]['genre'][song.genre] += 1
self.user_preferences[user_id]['artist'][song.artist] += 1
def recommend_songs(self, user_id, count=10):
# 基于用户偏好推荐歌曲
user_pref = self.user_preferences[user_id]
# 计算每首歌的推荐分数
scores = []
for song_id, song in self.manager.song_database.items():
score = 0
score += user_pref['genre'].get(song.genre, 0) * 2
score += user_pref['artist'].get(song.artist, 0) * 3
score -= song.play_count * 0.1 # 降低已播放歌曲的权重
scores.append((score, song))
# 返回得分最高的歌曲
scores.sort(reverse=True)
return [song for _, song in scores[:count]]
播放历史和统
class PlaybackTracker:
def __init__(self):
self.history = deque(maxlen=10000)
self.daily_stats = defaultdict(lambda: defaultdict(int))
def record_play(self, user_id, song, timestamp):
self.history.append({
'user_id': user_id,
'song_id': song.id,
'timestamp': timestamp,
'duration': song.duration
})
# 更新统计
date = timestamp.date()
self.daily_stats[date]['total_plays'] += 1
self.daily_stats[date]['total_duration'] += song.duration
# 更新歌曲播放次数
song.play_count += 1
song.last_played = timestamp
def get_most_played(self, user_id, limit=10):
# 获取用户最常播放的歌曲
song_counts = defaultdict(int)
for record in self.history:
if record['user_id'] == user_id:
song_counts[record['song_id']] += 1
# 返回播放次数最多的歌曲
top_songs = heapq.nlargest(
limit,
song_counts.items(),
key=lambda x: x[1]
)
return [(self.manager.song_database[sid], count)
for sid, count in top_songs]
🚀 高级功能
1. 协同过滤推荐
def collaborative_filtering(self, user_id, k=5):
# 找到相似用户
similar_users = self._find_similar_users(user_id, k)
# 聚合相似用户喜欢的歌
recommended_songs = defaultdict(float)
for similar_user, similarity in similar_users:
for song_id, score in self.user_preferences[similar_user].items():
recommended_songs[song_id] += score * similarity
# 过滤掉用户已有的歌曲
user_songs = self._get_user_songs(user_id)
return [song_id for song_id in recommended_songs
if song_id not in user_songs][:10]
2. 多设备同
class SyncManager:
def __init__(self):
self.device_states = {} # {device_id: state}
self.sync_queue = defaultdict(deque)
def sync_playlist(self, user_id, playlist_id, device_id):
# 获取最新状
latest_state = self._get_latest_state(playlist_id)
# 推送到设备
self.device_states[device_id] = latest_state
# 记录同步时间
return {
'playlist': latest_state,
'synced_at': datetime.now(),
'version': self._get_version(playlist_id)
}
def handle_conflict(self, playlist_id, device1_state, device2_state):
# 使用时间戳解决冲
if device1_state['timestamp'] > device2_state['timestamp']:
return device1_state
return device2_state
📊 性能优化
缓存策略
from functools import lru_cache
class CacheManager:
def __init__(self):
self.cache = {}
self.cache_ttl = 300 # 5分钟
@lru_cache(maxsize=1000)
def get_popular_songs(self, genre=None):
# 缓存热门歌曲
cache_key = f"popular_{genre}"
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.cache_ttl:
return cached_data
# 重新计算
popular = self._calculate_popular_songs(genre)
self.cache[cache_key] = (popular, time.time())
return popular
💼 csvosupport 如何助力面试
在音乐平台系统设计面试中,csvosupport 提供
系统架构 - 完整的播放列表管理系统设 推荐算法 - 多种推荐策略的实 性能优化 - 缓存和索引优化技 *扩展性讨 - 如何支持亿级用户
想在 Spotify、Apple Music、YouTube Music 等音乐平台的面试中脱颖而出
联系 csvosupport,专业系统设计面试辅助!
*标签 #Spotify #音乐平台 #播放列表 #推荐系统 #系统设计 #VO辅助 #面试辅助 #一亩三分地
需要面试真题? 立刻联系微信 Coding0201,获得真题。