在当今互联网架构中,Redis 已经远远超越了传统的键值存储角色,成为构建高性能分布式系统的重要基础组件。其丰富的数据结构和高效的原子操作能力,使其非常适合实现如自动完成、分布式锁、任务队列、消息系统等核心功能模块。
例如,在一款支持海量并发、需保证数据一致性的多人在线游戏中,聊天、物品交易、邮件推送等“基础”业务背后,往往面临高并发数据访问、极致性能与可靠性的综合挑战。Redis 在这些场景下凭借其极低延迟和多样的数据结构,为构建可扩展、健壮的中间件和业务组件提供了极大的便利。

自动完成是现代应用不可或缺的功能,用户在找联系人、选商品、输入命令时都少不了它。在 Redis 里,我们可以用两种方法实现自动完成,每种适合不同场景,都有各自的优缺点。
以一家游戏公司的需求为例。假设“龙腾游戏公司”推出了一款火爆的多人在线游戏,玩家们经常会互相聊天。为了让体验更友好,公司希望让每位玩家都可以快速在最近联系过的人里选到目标——也就是实现最近联系人自动完成。
最大的问题是,每个玩家要维护自己独立的最近联系人列表,且这个列表得随时刷新。考虑到用户量巨大,我们必须非常注意节省内存并保证操作效率。
Redis 的 LIST 结构就特别适用。LIST 不仅能保持插入顺序,也很节省内存。最关键的是,Redis 提供了大量高效的列表命令,让我们能非常方便地对联系人做增、删、查等操作。
def add_or_update_contact(conn, user, contact):
# 添加或更新用户的最近联系人列表
contact_list = 'recent:' + user
pipe = conn.pipeline(True)
# 如果联系人已存在,先删除
pipe.lrem(contact_list, contact)
# 将联系人添加到列表开头
pipe.lpush(contact_list, contact)
# 确保列表不超过100个联系人
pipe.ltrim(contact_list, 0, 99)
pipe.execute()这个实现的核心思想是“最近使用优先”。每当玩家与某个联系人聊天时,我们就将这个联系人移到列表的最前面,同时删除列表中可能存在的重复项。通过 LTRIM 命令,我们确保列表永远不会超过100个联系人,从而控制内存使用。
这里使用管道(pipeline)是为了确保操作的原子性。三个命令要么全部成功,要么全部失败,避免了数据不一致的问题。
当玩家开始输入联系人姓名时,我们需要从 Redis 中获取完整的联系人列表,然后在应用层进行前缀匹配。虽然这看起来不够高效,但对于只有100个联系人的小列表来说,这种方法的性能完全可接受。
def get_autocomplete_list(conn, user, prefix):
# 获取匹配前缀的联系人列表
candidates = conn.lrange('recent:' + user, 0, -1)
matches = []
for candidate in candidates:
if candidate.lower().startswith(prefix.lower()):
matches.append(candidate)
return matches最近联系人自动完成功能适合用于联系人数量较少的情况。但如果要处理大规模的数据,比如游戏中某个公会的成员列表(可能包含上千人),就需要更高效的方式来实现。这时候,Redis 的 ZSET(有序集合)就非常适合用来做自动完成。
ZSET 最大的优势在于它不仅能按分数排序,还可以做高效的范围查询。我们可以利用这些特性,非常巧妙地实现前缀匹配,从而高效地完成自动补全功能。
以“龙腾游戏公司”的公会系统为例。每个公会的成员数量可能很多。当玩家尝试给公会成员发送邮件时,只要输入几个字母,系统就能快速列出所有匹配的成员姓名——这正需要一种高效的查询机制才能实现。
def find_prefix_range(prefix):
# 计算前缀匹配的字符范围
valid_characters = '`abcdefghijklmnopqrstuvwxyz{'
# 找到前缀最后一个字符的位置
pos = bisect.bisect_left(valid_characters, prefix[-1:])
# 获取前一个字符作为起始边界
suffix = valid_characters[(pos or 1) - 1]
return prefix[:-1] + suffix + '{', prefix + '{'这个函数的核心思想是利用 ASCII 字符的排序特性。我们知道,在 ASCII 表中,反引号位于字母 a 之前,花括号位于字母 z 之后。通过构造这样的边界值,我们可以在 ZSET 中找到所有以指定前缀开头的字符串。
def autocomplete_on_prefix(conn, guild, prefix):
# 使用 ZSET 实现高效的前缀自动完成
start, end = find_prefix_range(prefix)
identifier = str(uuid.uuid4())
start += identifier
end += identifier
zset_name = 'members:' + guild
# 添加边界标记
conn.zadd(zset_name, {start: 0, end: 0})
pipeline = conn.pipeline(True)
while True:
这个方案虽然实现上稍微复杂一些,但能够很好地解决大数据量下的前缀匹配问题。我们通过在 ZSET 里临时加上边界标记,精准定位出需要匹配的范围,然后批量取出结果。至于用到的 WATCH 命令,就是为了保证整个过程的原子性,防止多个客户端同时操作时出现数据不一致。
在多线程或多进程的场景下,“锁”是用来保证数据不被混乱修改的常用手段。Redis 也可以用来实现分布式锁,让不同服务器上的程序一起安全地操作数据。分布式锁的核心很简单——谁先抢到,谁就能操作,其他人只能等着。
还是拿“龙腾游戏公司”的物品交易市场举例。假如玩家张三和李四同时盯上了同一把价值1000金币的宝剑,如果没有锁,说不定两个玩家都会买到同一把剑,这显然乱套了。
Redis 的 WATCH 命令其实可以用作“乐观锁”,意思是:你先关注数据,如果有人改了数据,再取消这次操作。但是在高并发情况下,大家都在抢热门物品,WATCH 机制就会频繁检测到冲突,不停重试,效率会很低,系统开销也大。
在高负载情况下,WATCH 的重试机制会导致严重的性能问题。当多个玩家同时尝试购买热门物品时,大部分事务都会失败并重试,这不仅浪费了系统资源,还延长了响应时间。
分布式锁提供了一个更直接的解决方案。通过使用 Redis 的 SETNX(SET if Not eXists)命令,我们可以实现一个简单的锁机制。
def acquire_lock(conn, lock_name, acquire_timeout=10):
# 获取分布式锁
identifier = str(uuid.uuid4())
end = time.time() + acquire_timeout
while time.time() < end:
if conn.setnx('lock:' + lock_name, identifier):
return identifier
time.sleep(0.001)
return False这个实现的核心是使用 UUID 作为锁的唯一标识符。UUID 是一个128位的随机数,几乎不可能重复,这确保了只有获得锁的客户端才能释放锁。
def release_lock(conn, lock_name, identifier):
# 释放分布式锁
pipe = conn.pipeline(True)
lock_name = 'lock:' + lock_name
while True:
try:
pipe.watch(lock_name)
if pipe.get(lock_name) == identifier:
pipe.multi()
pipe.delete(lock_name)
pipe.execute()
return True
pipe.unwatch()
break
except redis.exceptions.WatchError:
释放锁的过程同样需要小心处理。我们使用 WATCH 命令来监控锁的状态,确保只有锁的持有者才能释放锁。这防止了其他客户端意外释放不属于自己的锁。
基础锁实现虽然简单,但它没有处理客户端崩溃的情况。如果持有锁的客户端突然崩溃,锁就会永远无法释放,导致其他客户端永远无法获得锁。 为了解决这个问题,我们需要为锁添加超时机制。Redis 的 EXPIRE 命令可以自动删除过期的键,这正是我们需要的功能。
def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=10, lock_timeout=10):
# 获取带超时机制的分布式锁
identifier = str(uuid.uuid4())
lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end:
if conn.setnx(lock_name, identifier):
conn.expire(lock_name, lock_timeout)
return identifier
elif not conn.ttl(lock_name):
conn.expire(lock_name, lock_timeout)
这个实现不仅为成功获取的锁设置了超时,还会检查其他客户端获取的锁是否设置了超时。如果发现某个锁没有设置超时,就会主动为其设置超时,确保锁最终会被释放。
虽然超时机制解决了死锁问题,但也引入了新的风险。如果持有锁的客户端在超时前没有完成操作,锁可能会被其他客户端获取,导致数据不一致。因此,锁的超时时间应该设置得足够长,确保正常操作能够完成。
在物品交易市场中,我们不需要锁定整个市场,只需要锁定正在交易的特定物品。这种细粒度的锁能够显著提高并发性能,允许多个不同的物品同时进行交易。
def purchase_item_with_lock(conn, buyer_id, item_id, seller_id):
# 使用细粒度锁保护物品交易
buyer = "users:%s" % buyer_id
seller = "users:%s" % seller_id
item = "%s.%s" % (item_id, seller_id)
inventory = "inventory:%s" % buyer_id
end_time = time.time() + 30
通过锁定特定的物品而不是整个市场,我们可以实现更高的并发度。多个不同的物品可以同时进行交易,只有相同物品的交易才会相互阻塞。
信号量其实就是一种“限流”的锁,它允许有多个客户端一起访问同一个资源,但这人数是有限制的,不是谁都能随便进。在 Redis 里,我们可以用 ZSET(有序集合)轻松地实现这种计数信号量,非常适合需要精细控制并发数量的场景。
还是拿“龙腾游戏公司”当例子。随着游戏越来越受欢迎,许多第三方开发者想通过 API 获取游戏数据。但如果大家一起调用 API,服务器肯定吃不消。所以,公司规定每个账号同一时刻最多只能有 5 个 API 调用在跑,这样就不会被刷崩溃了。
def 获取信号量(连接, 信号量名, 限制数量, 超时=10):
"""获取计数信号量"""
标识符 = str(uuid.uuid4())
当前时间 = time.time()
管道 = 连接.pipeline(True)
# 清理过期的信号量持有者
管道.zremrangebyscore(信号量名, '-inf', 当前时间 - 超时)
# 尝试获取信号量
管道.zadd(信号量名, 标识符, 当前时间)
管道.zrank(信号量名, 标识符)
if 管道.execute()[-1] < 限制数量:
这个实现使用 ZSET 来存储信号量的持有者,其中成员是唯一的标识符,分数是获取信号量的时间戳。通过检查标识符在 ZSET 中的排名,我们可以确定是否成功获得了信号量。
def release_semaphore(conn, semaphore_name, identifier):
# 释放计数信号量
return conn.zrem(semaphore_name, identifier)释放信号量其实很简单,就是把对应的标识符从 ZSET 里删掉。当你释放了信号量,排队等待的其他客户端就有机会获取到空出来的名额。
前面说的基础信号量虽然好用,但是它依赖于各个客户端的系统时间。如果有的机器时间不准确,可能就会出现“后来居上”或者“插队”的情况,导致分配不公平。为了让信号量分配更公平,我们可以加一个自增计数器,确保谁先来谁先得。
def acquire_fair_semaphore(conn, semaphore_name, limit, timeout=10):
# 获取公平的计数信号量
identifier = str(uuid.uuid4())
owner_zset = semaphore_name + ':owner'
counter_key = semaphore_name + ':counter'
now = time.time()
pipe = conn.pipeline(True)
# 清理过期的信号量持有者
pipe.zremrangebyscore(semaphore_name, '-inf', now - timeout)
# 更新所有者ZSET,只保留有效的持有者
公平信号量使用两个 ZSET:一个用于存储时间戳(处理超时),另一个用于存储计数器值(确保公平性)。通过这种方式,我们可以确保信号量按照请求的顺序进行分配,而不受系统时钟差异的影响。
在实际应用中,有些操作可能需要很长时间才能完成,比如生成复杂的报告或处理大量数据。在这种情况下,我们需要能够刷新信号量,防止它因为超时而丢失。
def refresh_fair_semaphore(conn, semaphore_name, identifier):
# 刷新公平信号量的超时时间
if conn.zadd(semaphore_name, {identifier: time.time()}):
# 如果添加成功,说明标识符已经不存在,信号量已丢失
release_fair_semaphore(conn, semaphore_name, identifier)
return False
return True刷新操作通过更新标识符的时间戳来延长信号量的有效期。如果更新失败,说明标识符已经不存在,信号量已经丢失。
虽然刷新机制很有用,但它也引入了新的竞态条件。多个客户端可能同时尝试获取同一个信号量,导致超过限制数量的客户端获得信号量。为了完全解决这个问题,我们需要结合分布式锁来确保操作的原子性。
任务队列在分布式系统中非常重要,它可以把一些耗时的工作变成异步处理,这样用户不用一直等待,系统的响应速度和吞吐量也都会更高。用 Redis 的 LIST 或 ZSET 就能很方便地实现各种类型的任务队列。
我们先来看最常见的“先进先出(FIFO)”队列。比如在“龙腾游戏公司”,每当玩家完成一次交易,系统都需要给他们发一封通知邮件。但邮件发送速度可能比较慢,如果让玩家一直等着肯定不合适。这时候,我们可以把发邮件这件事丢进任务队列,先让玩家继续游戏,等到有空闲资源时再慢慢发送邮件。
def enqueue_sold_email_task(conn, seller, item, price, buyer):
# 添加邮件任务到队列
data = {
'seller_id': seller,
'item_id': item,
'price': price,
'buyer_id': buyer,
'time': time.time()
}
conn.rpush('queue:email', json.dumps(data))这个函数将邮件任务序列化为 JSON 格式,然后使用 RPUSH 命令添加到队列的末尾。JSON 格式不仅人类可读,而且在大多数编程语言中都有高效的解析库。
def process_sold_email_queue(conn):
# 处理邮件队列中的任务
while not exit_flag:
packed_data = conn.blpop(['queue:email'], 30)
if not packed_data:
continue
to_send = json.loads(packed_data[1])
try:
fetch_data_and_send_sold_email(to_send)
except EmailSendError as err:
log_error("Failed to send sold email", err, to_send)
else
处理函数使用 BLPOP 命令从队列中获取任务。BLPOP 是一个阻塞操作,如果队列为空,它会等待最多30秒。这避免了轮询带来的资源浪费。
在实际应用中,不同类型的任务可能有不同的优先级。比如密码重置邮件应该比营销邮件更早发送。Redis 的 BLPOP 命令支持同时监听多个队列,我们可以利用这个特性来实现优先级队列。
def process_multi_queue_tasks(conn, queue_list, callbacks):
# 处理多个优先级队列中的任务
while not exit_flag:
packed_data = conn.blpop(queue_list, 30)
if not packed_data:
continue
queue_name, task_data = packed_data
func_name, args = json.loads(task_data)
if func_name not in callbacks:
log_error("Unknown callback function %s" % func_name)
continue
通过将高优先级队列放在列表的前面,BLPOP 会优先处理高优先级队列中的任务。只有当高优先级队列为空时,才会处理低优先级队列。
有时候我们需要延迟执行某些任务,比如在特定时间发送提醒邮件,或者在玩家离线一段时间后清理其临时数据。Redis 的 ZSET 同样也非常适合实现延迟任务队列。
def enqueue_delayed_task(conn, queue, func_name, args, delay=0):
# 创建延迟任务
identifier = str(uuid.uuid4())
task_data = json.dumps([identifier, queue, func_name, args])
if delay > 0:
# 延迟执行,添加到ZSET
conn.zadd('delayed:', {task_data: time.time() + delay})
else:
# 立即执行,添加到LIST
conn.rpush('queue:' + queue, task_data)
return identifier延迟任务使用 ZSET 存储,其中分数是执行时间。当时间到达时,任务会被移动到相应的 LIST 队列中等待执行。
def poll_delayed_queue(conn):
# 轮询延迟队列,将到期的任务移动到执行队列
while not exit_flag:
item = conn.zrange('delayed:', 0, 0, withscores=True)
if not item or item[0][1] > time.time():
time.sleep(0.01)
continue
task_data = item[0
轮询函数定期检查延迟队列,将到期的任务移动到执行队列。使用锁确保多个轮询进程不会重复处理同一个任务。
在传统的发布-订阅模式下,客户端必须一直在线,有消息才会被实时推送。但对于手机应用或者网络不好时,这其实不太方便。拉取式消息系统解决了这个问题——客户端可以在自己有空、有网的时候,主动去服务器拉取消息。哪怕之前离线,也不会错过任何消息。
比如在“龙腾游戏公司”的聊天系统里,玩家们希望像用微信一样,随时可以发消息、收消息。即使中途断了网,等下次上线登录后,之前别人给你发的消息也都能收到,不会漏掉。
def send_message(conn, recipient, sender, message):
# 发送消息给指定接收者
message_data = {
'sender': sender,
'msg': message,
'ts': time.time()
}
conn.rpush('mailbox:' + recipient, json.dumps(message_data))每个用户都像拥有一个自己的“收件箱”——这其实是一个列表,别人发给你的消息都会统统放进你的收件箱里。这样做特别简单明了,也能让你无论什么时候上线,都能收到之前别人发来的消息,不会丢。
def fetch_messages(conn, user, count=10):
# 获取用户邮箱中的消息
message_list = conn.lrange('mailbox:' + user, 0, count-1)
if message_list:
conn.ltrim('mailbox:' + user, count, -1)
return [json.loads(message) for message in message_list]获取消息时,我们使用 LTRIM 命令来删除已经读取的消息,避免邮箱无限增长。这种方式既保证了消息的可靠传递,又控制了存储空间的使用。
单接收者消息适用于私聊,但现代应用还需要群聊功能。群聊的挑战在于需要管理多个参与者,并且要确保每个参与者都能收到所有消息。
def create_chat(conn, sender, recipient_list, message, chat_id=None):
# 创建新的群聊会话
chat_id = chat_id or str(conn.incr('ids:chat:'))
recipient_list.append(sender)
recipient_dict = dict((recipient, 0) for recipient in recipient_list)
conn.zadd('chat:' + chat_id, **recipient_dict)
for recipient in recipient_list:
conn.zadd(
群聊功能其实就是用两个有序集合(ZSET)来记录状态:第一个集合用来记住每个群聊都有哪些人,以及每个人上次看到的那条消息的ID;第二个集合则记录每个用户都参加了哪些群聊,以及他们各自看到的最新消息ID。这样每个人上线都能准确知道自己在各个群聊的新消息有哪些。
def send_group_message(conn, chat_id, sender, message):
# 在群聊中发送消息
token = acquire_lock(conn, 'chat:' + chat_id)
if not token:
raise Exception("无法获取锁")
try:
msg_id = conn.incr('ids:' + chat_id)
timestamp = time.time()
packed_msg = json.dumps({
'id': msg_id,
'ts': timestamp,
发送群聊消息时,要先“上锁”,这样能保证每条消息都有唯一且连续的编号。每条消息会被存进一个专门的有序集合(ZSET),消息ID用来排序,这样查找最新的消息就很方便。
def get_pending_messages(conn, recipient):
# 获取用户在所有群聊中的待处理消息
seen_info = conn.zrange('seen:' + recipient, 0, -1, withscores=True)
pipe = conn.pipeline(True)
for chat_id, seen_id in seen_info:
pipe.zrangebyscore('msgs:' + chat_id, seen_id + 1, 'inf')
当你查看消息时,系统会先看看你在每个群聊里上次看到哪一条消息。然后,它会把每个群聊里你还没看的新消息都找出来给你。已经被群里所有人都读过的老消息,系统会自动帮你清理掉,不会一直占用空间。
在分布式系统里,常常会遇到需要把一个文件分发给很多台服务器一起处理的需求。我们以前可能会用 NFS 或 Samba 这样的文件共享工具,它们虽然好用,但碰到网络不稳定、机器经常变动的时候,可能就不太方便了。其实,Redis 也能帮我们简单高效地搞定文件分发。
举个例子,"龙腾游戏公司"最近要分析近两年用户的访问日志。这些日志文件加起来有几十个G,里面一共有超过70亿条记录。如果用传统方法一份份复制,把文件拷贝给每台机器,不仅慢,还特别占空间。
def daily_country_aggregate(conn, log_line):
# 聚合每日的国家级访问数据
if log_line:
fields = log_line.split()
ip_addr = fields[0]
date = fields[1]
country = lookup_city_by_ip(ip_addr)[2]
aggregate_data[date][country] += 1
return
# 处理完一天的数据,写入Redis
for date, aggregate in aggregate_data.items():
conn.zadd('daily:country:' +
通过本地聚合,我们可以将数百万次Redis写入操作减少到几百次。对于国家级别的聚合,我们可以在本地完成所有计算,然后一次性写入Redis。这种方式不仅减少了网络开销,还提高了处理速度。
本地聚合的关键在于理解数据的分布特征。对于国家级别的数据,我们只需要处理约200个不同的值,而城市级别的数据则需要处理数万个不同的值。通过合理选择聚合粒度,我们可以在性能和准确性之间找到平衡。
文件分发系统需要将大文件分块传输,并通知所有处理节点有新文件可用。我们使用Redis的APPEND命令来存储文件内容,使用群聊系统来通知处理节点。
def copy_logs_to_redis(conn, path, channel, num_workers=10, limit=2**30):
# 将日志文件复制到Redis并通知处理节点
bytes_in_redis = 0
wait_queue = deque()
create_group_chat(conn, 'source', map(str, range(num_workers)), '', channel)
num_workers = str(num_workers)
for log_file in sorted(os.listdir(path)):
文件发送过程需要仔细管理内存使用,避免Redis内存溢出。通过监控已处理文件的数量,系统可以及时清理已完成的文件,为新文件腾出空间。
处理节点需要从Redis里拿到文件并开始处理。我们没有一次把整个大文件都读进内存,而是像流水线一样一块一块地读,这样电脑内存不会很快被用完。
def process_logs_from_redis(conn, node_id, callback):
# 从Redis中读取并处理日志文件
while True:
file_data = get_pending_messages(conn, node_id)
for channel, messages in file_data:
for msg in messages:
log_file = msg['message']
if log_file == ':done':
return
elif not log_file:
continue
chunk_reader =
处理函数一行一行地读文件,每次只把需要的一小部分内容放进内存,所以不管文件有多大,内存都不会一下子被用光。一个文件处理好后,会告诉发送端,可以开始清理这个文件了。
def read_lines(conn, key, chunk_reader):
# 从Redis中逐行读取文件内容
buffer = ''
for chunk in chunk_reader(conn, key):
buffer += chunk
pos = buffer.rfind('\n')
if pos >= 0:
for line in buffer[:pos].split('\n'):
yield line + '\n
行读取这个函数会把每次读到的数据片段先存起来,找到最后一个换行符后,把完整的每一行都处理掉。如果最后有半截未读完的行,它会留到下次再一起处理,这样一行就不会被拆开。
def read_chunks_gz(conn, key):
# 从Redis中逐块读取并解压缩gzip文件
buffer = ''
decompressor = None
for chunk in read_chunks(conn, key, 2**17):
if not decompressor:
buffer += chunk
try:
# 解析gzip头部
if buffer[:3] != "\x1f\x8b\x08":
raise
gzip解压缩功能允许我们处理压缩的日志文件,这在存储和传输方面都有很大优势。gzip通常能够将文本文件压缩到原来的20-40%,大大减少了网络传输和存储开销。
通过Redis实现的文件分发系统具有许多优势:它不需要复杂的网络配置,能够处理网络中断和重连,支持动态扩展处理节点,并且可以立即开始处理数据而不需要等待整个文件下载完成。虽然Redis不是专门的文件存储系统,但在适当的场景下,它提供了一个简单而有效的文件分发解决方案。
通过这节课的学习,我们了解了Redis在构建复杂应用组件方面的强大能力。从简单的自动完成功能到复杂的文件分发系统,Redis的各种数据结构为我们提供了灵活而高效的解决方案。 这些组件不仅能够独立使用,更可以相互组合,构建出更加复杂和强大的系统。比如,我们可以将分布式锁与任务队列结合,实现更可靠的任务处理;可以将信号量与消息系统结合,实现流量控制和消息限流。
Redis的真正价值不仅在于其丰富的数据结构,更在于我们如何巧妙地组合这些结构来解决实际问题。掌握这些模式后,你就能够设计出既高效又可靠的分布式系统。