在现代分布式系统中,我们常常需要将数据同时写入多个存储系统(如数据库+缓存),这种操作被称为"双写"。双写场景下最大的挑战是如何保证不同存储系统间的数据一致性。本文将深入探讨双写一致性问题,并通过Java实战演示解决方案。
常见问题场景:
写入顺序不一致:先写缓存成功但数据库失败
并发冲突:多个线程同时更新同一数据
部分失败:一个存储成功另一个失败
网络延迟:不同存储系统的响应时间差异
基于消息队列的最终一致性方案的实现思路为: 应用服务 → 更新数据库 ↓ 同时发送消息 → 消息队列 → 缓存服务 → Redis
数据库操作与消息发送(原子操作)
java@Service
@RequiredArgsConstructor
public class UserService {
private final UserRepository userRepository;
private final MessageSender messageSender;
@Transactional
public void updateUser(User user) {
// 1. 更新数据库
userRepository.save(user);
// 2. 发送缓存更新消息
messageSender.sendCacheUpdate(new CacheUpdateEvent(
"user:" + user.getId(),
user
));
}
}
消息发送器(保证可靠性)
java@Component
@RequiredArgsConstructor
public class MessageSender {
private final RabbitTemplate rabbitTemplate;
private final TransactionTemplate transactionTemplate;
public void sendCacheUpdate(CacheUpdateEvent event) {
transactionTemplate.execute(status -> {
// 将消息存入本地事务表(略)
saveToLocalTxTable(event);
// 发送到MQ
rabbitTemplate.convertAndSend(
"cache.update.exchange",
"cache.update.key",
event
);
return null;
});
}
}
缓存消费者(幂等处理)
java@Component
@RequiredArgsConstructor
public class CacheConsumer {
private final RedisTemplate<String, Object> redisTemplate;
private final CacheUpdateRecordRepository recordRepository;
@RabbitListener(queues = "cache.update.queue")
public void handleMessage(CacheUpdateEvent event) {
// 幂等检查:防止重复消费
if (recordRepository.existsByMessageId(event.getMessageId())) {
return;
}
try {
// 更新缓存
redisTemplate.opsForValue().set(
event.getKey(),
event.getData(),
Duration.ofMinutes(30)
);
// 记录消费日志
recordRepository.save(new CacheUpdateRecord(event.getMessageId()));
} catch (Exception e) {
// 告警并加入重试队列
sendToRetryQueue(event);
}
}
}
在实体中添加版本号字段,每次更新完版本号叠加,发送消息的同时存储事务消息记录(缓存更新的快照记录),到缓存消费者中进行幂等校验处理,校验后将缓存进行更新,如果更新失败,可抛出异常进行重试,也可进入自定义重试队列中,当重试达到一定次数后,进入死信队列人工处理。
java@Entity
public class User {
@Id
private Long id;
@Version
private Long version; // 乐观锁版本号
// 其他字段...
}
注意
以上仅为demo实现,具体可根据思路自行扩展,例如可以将存储的本地事务消息添加状态,在消费者中可以先查询缓存版本号与当前消息的版本号进行对比,对比不通过则更新缓存,缓存处理成功时,查询一次最新缓存的版本号与事务版本号对比,如果对比成功则对本地事务消息的状态进行更新。
这个时候,定时任务只需要扫描缓存未更新的情况即可,而无需全量扫描。
添加双重保障机制,消息队列保证主流程,通过定时任务兜底检查,如果版本不一致也可以添加补偿缓存数据,保证数据的最终一致性。
同时查询方法添加降级策略,保证服务的高可用。
补偿任务(兜底机制)
java@Scheduled(fixedDelay = 30000)
public void checkInconsistency() {
// 查询最近更新的数据库记录
List<User> recentUsers = userRepository.findUpdatedLast5Minutes();
recentUsers.forEach(user -> {
String key = "user:" + user.getId();
User cachedUser = (User) redisTemplate.opsForValue().get(key);
// 比较数据库与缓存的数据版本
if (cachedUser == null ||
cachedUser.getVersion() < user.getVersion()) {
// 触发缓存更新
redisTemplate.opsForValue().set(key, user);
}
});
}
熔断降级策略
java// 当缓存更新失败时降级
public User getUser(Long id) {
try {
return cacheService.getUser(id);
} catch (Exception e) {
log.warn("缓存降级,查询数据库");
return userRepository.findById(id).orElse(null);
}
}
方案的整体效果上,通过高性能的 RocketMQ 可以保障最终一致性达到秒级。不过会对性能产生一些影响:由于添加了消息发送,会给业务流程操作增加部分耗时。同时依赖 MQ 中间件,要注意消息堆积,消息幂等,消息的可靠性处理
此方案的核心原则:容忍短暂不一致,保证最终一致;优先保障核心存储,非核心系统可降级;始终设计补偿机制。
补偿机制的核心思想是:当主操作成功但辅助操作失败时,通过后续的补偿操作来达到数据一致的状态。在双写场景中,补偿机制通常用于在数据库更新成功但缓存更新失败时,通过定时任务或后台线程来修复缓存的不一致状态。
主操作:先更新数据库,再更新缓存(或发送消息更新缓存)
失败检测:当缓存更新失败时,记录失败操作(记录到补偿表或日志)
补偿任务:定时扫描补偿记录,重试失败的缓存更新操作
重试策略:采用指数退避重试,避免雪崩
最终处理:超过最大重试次数后,发出告警,人工介入
定义基础的补偿框架,通过模板方法模式进行实现
javapublic abstract class CompensableTransaction {
private static final int MAX_RETRY = 3;
public void execute() {
try {
// 执行主操作
doPrimaryAction();
// 执行辅助操作
doSecondaryAction();
} catch (Exception e) {
// 补偿操作
compensate();
}
}
protected abstract void doPrimaryAction();
protected abstract void doSecondaryAction();
protected abstract void doCompensate();
private void compensate() {
int retryCount = 0;
while (retryCount < MAX_RETRY) {
try {
doCompensate();
return; // 补偿成功
} catch (Exception ex) {
retryCount++;
log.warn("补偿操作失败({}/{}): {}", retryCount, MAX_RETRY, ex.getMessage());
sleep(exponentialBackoff(retryCount)); // 指数退避
}
}
// 最终补偿失败
log.error("补偿操作最终失败");
alertAdmin();
}
private long exponentialBackoff(int retryCount) {
return (long) (100 * Math.pow(2, retryCount)); // 100ms, 200ms, 400ms...
}
}
数据库+缓存双写补偿实现
java@Service
@RequiredArgsConstructor
public class UserService extends CompensableTransaction {
private final UserRepository userRepository;
private final RedisTemplate<String, User> redisTemplate;
private final CompensationLogRepository compLogRepo;
@Override
protected void doPrimaryAction() {
// 更新数据库
userRepository.save(user);
}
@Override
protected void doSecondaryAction() {
// 更新缓存
String cacheKey = "user:" + user.getId();
redisTemplate.opsForValue().set(cacheKey, user, Duration.ofMinutes(30));
}
@Override
protected void doCompensate() {
// 补偿操作:删除缓存(防止缓存中存在不一致数据)
String cacheKey = "user:" + user.getId();
redisTemplate.delete(cacheKey);
// 记录补偿日志
compLogRepo.save(new CompensationLog(
"CACHE_DELETE",
cacheKey,
"User update compensation"
));
}
// 业务入口方法
public void updateUserWithCompensation(User user) {
this.user = user;
execute();
}
}
补偿日志实体设计
java@Entity
public class CompensationLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String operationType; // 操作类型: CACHE_DELETE, DATA_ROLLBACK等
private String targetResource; // 目标资源: user:123, order:456
private String description;
private LocalDateTime createTime = LocalDateTime.now();
private boolean success = false;
// 省略getter/setter
}
在实现以上补偿机制的同时,还要注意补偿失败情下的补偿,做好补偿的监控体系,关注补偿的失败率统计与失败的问题定位,进行人工干预补偿,也可定时扫描补偿
在补偿的过程中,又会存在一些问题
优势:
实现相对简单,不依赖分布式事务框架
系统耦合度低,各服务可独立开发
保证最终一致性,可靠性高
对性能影响小(补偿操作异步执行)
局限性:
数据存在短暂不一致窗口
补偿逻辑需要与业务逻辑同步维护
极端情况下需要人工干预
补偿操作本身可能失败
延迟双删是一种在双写场景中用于保证缓存与数据库一致性的策略。它主要用于解决在更新数据库和缓存时,由于并发操作导致的脏数据问题。
延迟双删是解决缓存与数据库双写一致性的经典策略,其核心流程为:
先删缓存 - 避免旧数据污染
更新数据库 - 完成持久化操作
延迟删除缓存 - 消除并发期间产生的脏数据
先删除缓存 -> 更新数据库 -> 延迟一段时间(比如几百毫秒) -> 再次删除缓存
考虑以下并发场景:
线程A更新数据,先删除缓存,然后准备更新数据库。
此时线程B来读取数据,发现缓存不存在,于是从数据库读取旧数据,并写入缓存。
线程A更新数据库,然后等待一段时间(确保线程B的读操作完成并写入缓存)后再次删除缓存。
这样,线程B写入的旧数据缓存会被第二次删除,后续读取会重新加载最新数据。
业务代码的基础实现
java@Service
@RequiredArgsConstructor
public class UserService {
private final UserRepository userRepository;
private final RedisTemplate<String, Object> redisTemplate;
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(4);
// 更新操作入口
@Transactional
public void updateUser(User user) {
String cacheKey = "user:" + user.getId();
// 1. 先删除缓存
deleteCache(cacheKey);
// 2. 更新数据库
userRepository.save(user);
// 3. 提交事务后发起延迟双删
scheduler.schedule(() -> {
deleteCacheWithRetry(cacheKey, 3); // 带重试的二次删除
}, 500, TimeUnit.MILLISECONDS); // 延迟500ms
}
// 带重试机制的缓存删除
private void deleteCacheWithRetry(String key, int maxRetry) {
int retryCount = 0;
while (retryCount < maxRetry) {
try {
redisTemplate.delete(key);
log.info("延迟双删成功: {}", key);
return;
} catch (Exception e) {
retryCount++;
log.warn("缓存删除失败({}/{}): {}", retryCount, maxRetry, key);
try {
Thread.sleep(100 * retryCount); // 指数退避
} catch (InterruptedException ignored) {}
}
}
log.error("缓存删除最终失败: {}", key);
}
}
优化版 - 结合版本号校验
java// 实体类增加版本控制
@Entity
public class User {
@Id
private Long id;
@Version
private Long version; // 乐观锁版本号
// 其他字段...
}
// 服务层增强校验
public void updateUserWithVersion(User user) {
String cacheKey = "user:" + user.getId();
// 1. 先删除缓存
redisTemplate.delete(cacheKey);
// 2. 更新数据库(带版本校验)
try {
userRepository.save(user);
} catch (OptimisticLockingFailureException ex) {
// 版本冲突时重新加载数据
User freshUser = userRepository.findById(user.getId()).orElseThrow();
throw new ConcurrentUpdateException("数据已被修改,请刷新重试");
}
// 3. 延迟双删(携带版本信息)
scheduler.schedule(() -> {
Object cached = redisTemplate.opsForValue().get(cacheKey);
if (cached instanceof User) {
User cachedUser = (User) cached;
// 仅当缓存版本较旧时删除
if (cachedUser.getVersion() < user.getVersion()) {
redisTemplate.delete(cacheKey);
}
}
}, 600, TimeUnit.MILLISECONDS);
}
生产级增强方案
java@Component
public class CacheDoubleDeleteManager {
private final RedisTemplate<String, Object> redisTemplate;
private final RedissonClient redissonClient;
private final DelayQueue<DeleteTask> delayQueue = new DelayQueue<>();
@PostConstruct
public void init() {
// 启动处理线程
new Thread(this::processTasks).start();
}
// 添加延迟双删任务
public void scheduleDelete(String key, long delayMs) {
delayQueue.put(new DeleteTask(key, delayMs));
}
// 任务处理核心
private void processTasks() {
while (!Thread.currentThread().isInterrupted()) {
try {
DeleteTask task = delayQueue.take();
RLock lock = redissonClient.getLock(task.key + ":lock");
try {
if (lock.tryLock(1, 5, TimeUnit.SECONDS)) {
redisTemplate.delete(task.key);
}
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 延迟任务对象
private static class DeleteTask implements Delayed {
final String key;
final long executeTime;
DeleteTask(String key, long delayMs) {
this.key = key;
this.executeTime = System.currentTimeMillis() + delayMs;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(executeTime, ((DeleteTask)o).executeTime);
}
}
}
延迟双删是平衡性能与一致性的有效方案,以可控延迟换取更高吞吐,适用于写少读多、可接受短暂不一致的业务
优势:
有效解决"读请求在更新期间加载旧数据"问题
实现简单,不依赖复杂中间件
性能影响可控(异步化处理)
兼容各种数据库和缓存类型
局限性:
短暂不一致窗口(延迟期间)
删除操作可能失败(需配合重试)
不保证强一致性(最终一致)
延迟时间需根据业务调优
注意
没有完美的解决方案,只有最适合场景的方案。延迟双删在电商库存、用户配置等场景表现优异,但对金融交易等强一致性场景需慎重使用。
方案 | 原理 | 优点 | 缺点 |
---|---|---|---|
基于消息队列 | 通过消息队列保证最终一致性 | 高可靠性,解耦系统 | 实现复杂,依赖中间件 |
补偿机制 | 失败后执行反向操作 | 简单易实现 | 无法保证实时一致性 |
延迟双删策略 | 更新前后删除缓存 | 缓解脏读问题 | 不能完全避免不一致 |
保障双写一致性没有银弹,需要根据业务场景选择合适方案。注意每种方案完善重试机制,最好结合版本控制,每种方案的监控+告警+兜底三位一体必不可少。
本文展示的基于主流的解决方案进行讲述。在实际应用中还需结合监控(如缓存命中率、不一致告警)和压力测试持续优化。
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!