实现多级缓存的策略方法
保证多级缓存数据一致性是一个复杂的任务,尤其是在分布式和高并发环境中。以下是一些常见的方法和策略,可以帮助实现多级缓存的数据一致性
1. 缓存失效策略
1.1 主动失效
在更新数据库时,主动使相关缓存失效。
步骤:
- 更新数据库
- 删除或失效缓存
public class CacheService { private LocalCache localCache; private RedisCache redisCache; private Database database;
public void updateData(Data data) { database.update(data);
String cacheKey = "data:" + data.getId(); localCache.delete(cacheKey); redisCache.delete(cacheKey); } }
|
1.2 延迟双删(Lazy Delete)
在更新数据库前后都删除缓存,确保缓存数据的准确性。
步骤:
- 更新前删除缓存
- 更新数据库
- 更新后再删除一次缓存
public class CacheService { private LocalCache localCache; private RedisCache redisCache; private Database database;
public void updateData(Data data) { String cacheKey = "data:" + data.getId(); localCache.delete(cacheKey); redisCache.delete(cacheKey); database.update(data);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } localCache.delete(cacheKey); redisCache.delete(cacheKey); } }
|
2. 双写一致性策略
2.1 先更新数据库,再更新缓存
步骤:
- 更新数据库
- 更新缓存
public class CacheService { private LocalCache localCache; private RedisCache redisCache; private Database database;
public void updateData(Data data) { database.update(data);
String cacheKey = "data:" + data.getId(); localCache.set(cacheKey, data); redisCache.set(cacheKey, data); } }
|
2.2 先删除缓存,再更新数据库
public class CacheService { private LocalCache localCache; private RedisCache redisCache; private Database database;
public void updateData(Data data) { String cacheKey = "data:" + data.getId(); localCache.delete(cacheKey); redisCache.delete(cacheKey);
database.update(data); } }
|
3. 缓存更新策略
3.1 写通过(Write-through)
在写入数据库时,同时更新缓存。
步骤:
- 更新数据库
- 更新缓存
public class CacheService { private LocalCache localCache; private RedisCache redisCache; private Database database;
public void updateData(Data data) { database.update(data);
String cacheKey = "data:" + data.getId(); localCache.set(cacheKey, data); redisCache.set(cacheKey, data); } }
|
3.2 写回(Write-back)
先更新缓存,定期将缓存数据写回数据库。
步骤:
- 更新缓存
- 定期将缓存数据写回数据库
public class CacheService { private LocalCache localCache; private RedisCache redisCache; private Database database;
public void updateData(Data data) { String cacheKey = "data:" + data.getId(); localCache.set(cacheKey, data); redisCache.set(cacheKey, data);
scheduleWriteBack(data); }
private void scheduleWriteBack(Data data) { } }
|
4. 分布式锁
使用分布式锁(如 Redis 的 RedLock)来确保多节点环境下的一致性。
步骤:
- 获取分布式锁
- 更新数据库和缓存
- 释放分布式锁
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool;
public class CacheService { private LocalCache localCache; private RedisCache redisCache; private Database database; private JedisPool jedisPool;
public void updateDataWithLock(Data data) { String lockKey = "lock:data:" + data.getId(); try (Jedis jedis = jedisPool.getResource()) { String lock = acquireLock(jedis, lockKey, 10000); if (lock != null) { try { updateData(data); } finally { releaseLock(jedis, lockKey, lock); } } } }
private void updateData(Data data) { database.update(data);
String cacheKey = "data:" + data.getId(); localCache.set(cacheKey, data); redisCache.set(cacheKey, data); }
private String acquireLock(Jedis jedis, String lockKey, int timeout) { }
private void releaseLock(Jedis jedis, String lockKey, String lock) { } }
|
5. 异步更新
使用异步机制,在更新数据库后异步更新缓存。
步骤:
- 更新数据库
- 异步更新缓存
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class CacheService { private LocalCache localCache; private RedisCache redisCache; private Database database; private ExecutorService executorService = Executors.newFixedThreadPool(10);
public void updateData(Data data) { database.update(data);
String cacheKey = "data:" + data.getId(); executorService.submit(() -> updateCache(cacheKey, data)); }
private void updateCache(String cacheKey, Data data) { localCache.set(cacheKey, data); redisCache.set(cacheKey, data); } }
|
6. 使用消息队列
通过消息队列通知其他缓存节点更新缓存。这里举例使用rabbitmq
简单实现,也可以用其他mq
或者Canal
实现
步骤:
- 更新数据库
- 发送缓存失效消息到消息队列
- 各缓存节点订阅消息队列,接收到消息后失效缓存
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
public class RabbitMQConfig { public static final String QUEUE_NAME = "cache_invalidation_queue";
public static Channel createChannel() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); return connection.createChannel(); } }
public class CacheService { private LocalCache localCache; private RedisCache redisCache; private Database database; private Channel channel;
public CacheService() throws Exception { this.channel = RabbitMQConfig.createChannel(); channel.queueDeclare(RabbitMQConfig.QUEUE_NAME, false, false, false, null); }
public void updateData(Data data) throws Exception { database.update(data);
String cacheKey = "data:" + data.getId();
channel.basicPublish("", RabbitMQConfig.QUEUE_NAME, null, cacheKey.getBytes()); } }
import com.rabbitmq.client.*;
public class CacheInvalidationListener { private LocalCache localCache; private RedisCache redisCache; private Channel channel;
public CacheInvalidationListener() throws Exception { this.channel = RabbitMQConfig.createChannel(); channel.queueDeclare(RabbitMQConfig.QUEUE_NAME, false, false, false, null); }
public void startListener() throws Exception { Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String cacheKey = new String(body, "UTF-8"); localCache.delete(cacheKey); redisCache.delete(cacheKey); } }; channel.basicConsume(RabbitMQConfig.QUEUE_NAME, true, consumer); }
public static void main(String[] args) throws Exception { CacheInvalidationListener listener = new CacheInvalidationListener(); listener.startListener(); } }
|
TIPS:异步更新和消息队列思想的区别
异步更新:适用于单节点或小规模系统,依赖于应用内部的异步处理机制,较为简单但在分布式环境中扩展性差。
使用消息队列:适用于分布式和大规模系统,依赖于外部消息队列系统,在多个节点之间确保数据一致性,更具扩展性但实现和运维复杂度较高。
结论
通过结合使用主动失效、延迟双删、双写一致性、写通过、写回、分布式锁和异步更新等策略,可以有效地保证多级缓存的数据一致性。选择合适的策略取决于具体的应用场景和需求O(∩_∩)O