redis报错:read error on connection的两种原因分析
最近线上php模块偶现 read error on connection;具体报错日志如下
Uncaught exception 'RedisException' with message 'read error on connection'
通过分析和学习之后,发现两种原因可能导致 phpredis 返回 'read error on connection':
执行超时使用已经断开的连接下面将对这两种情况进行具体的分析。
一、执行超时超时又可以分两种情况:一种是客户端设置的超时时间过短导致的;另外一种是客户端未设置超时时间,但是服务端执行时间超过了默认超时时间设置。
1.1 模拟复现1.1.1 客户端设置超时时间过短测试环境的 get 操作 执行耗时约 0.1ms 数量级;因此客户端设置执行超时时间为0.01ms, 测试脚本如下:
<?php$rds=new Redis();try { $ret=$rds->pconnect("127.0.0.1", 6390); if ($ret==false) { echo "Connect return false"; exit; } //设置超时时间为 0.1ms $rds->setOption(3,0.0001); $rds->get("aa");} catch (Exception $e) { var_dump ($e);}
手动执行该脚本会捕获'read error on connection'异常;
image1.1.2 客户端未设置超时时间,使用默认超时时间客户端未设置超时时间,但是在命令执行的过程中,超时达到php设置的默认值,详见 phpredis subscribe超时问题及解决 分析
1.2 原因分析1.2.1 strace 分析通过strace 查看执行过程可以发现发送 get aa 指令后,poll 想要拉取 POLLIN 事件的时候等待超时:
image1.2.2 代码逻辑分析php连接redis 使用的是phpredis扩展,在phpredis源码中全文搜索 'read error on connection' 可以发现 此错误位于 phpredis/library.c 文件的 redis_sock_gets 函数,详见 phpredis ;
phpredis 的 library.c 文件的 redis_sock_gets 函数
/* * Processing for variant reply types (think EVAL) */PHP_REDIS_API intredis_sock_gets(RedisSock *redis_sock, char *buf, int buf_size, size_t *line_size){ // Handle EOF if(-1==redis_check_eof(redis_sock, 0)) { return -1; } if(php_stream_get_line(redis_sock->stream, buf, buf_size, line_size) ==NULL) { char *errmsg=NULL; if (redis_sock->port < 0) { spprintf(&errmsg, 0, "read error on connection to %s", ZSTR_VAL(redis_sock->host)); } else { spprintf(&errmsg, 0, "read error on connection to %s:%d", ZSTR_VAL(redis_sock->host), redis_sock->port); } // Close our socket redis_sock_disconnect(redis_sock, 1); // Throw a read error exception REDIS_THROW_EXCEPTION(errmsg, 0); efree(errmsg); return -1; } /* We don't need \r\n */ *line_size-=2; buf[*line_size]='\0'; /* Success! */ return 0;}
附: 这个msg 看着比线上的msg 多了 host 和 port , 是因为最近合并分支的原因,如图
image从源码中可以发现如果php_stream_get_line读取stream数据为NUll的时候就会抛出read error on connection这个错误。那么什么时候php_stream_get_line会返回NULL呢, 对应于php源码的php-src/main/streams/streams.c 文件 , 详见php-src;
/* If buf==NULL, the buffer will be allocated automatically and will be of an * appropriate length to hold the line, regardless of the line length, memory * permitting */PHPAPI char *_php_stream_get_line(php_stream *stream, char *buf, size_t maxlen, size_t *returned_len){ size_t avail=0; size_t current_buf_size=0; size_t total_copied=0; int grow_mode=0; char *bufstart=buf; if (buf==NULL) { grow_mode=1; } else if (maxlen==0) { return NULL; } /* * If the underlying stream operations block when no new data is readable, * we need to take extra precautions. * * If there is buffered data available, we check for a EOL. If it exists, * we pass the data immediately back to the caller. This saves a call * to the read implementation and will not block where blocking * is not necessary at all. * * If the stream buffer contains more data than the caller requested, * we can also avoid that costly step and simply return that data. */ for (;;) { avail=stream->writepos - stream->readpos; if (avail > 0) { size_t cpysz=0; char *readptr; const char *eol; int done=0; readptr=(char*)stream->readbuf + stream->readpos; eol=php_stream_locate_eol(stream, NULL); if (eol) { cpysz=eol - readptr + 1; done=1; } else { cpysz=avail; } if (grow_mode) { /* allow room for a NUL. If this realloc is really a realloc * (ie: second time around), we get an extra byte. In most * cases, with the default chunk size of 8K, we will only * incur that overhead once. When people have lines longer * than 8K, we waste 1 byte per additional 8K or so. * That seems acceptable to me, to avoid making this code * hard to follow */ bufstart=erealloc(bufstart, current_buf_size + cpysz + 1); current_buf_size +=cpysz + 1; buf=bufstart + total_copied; } else { if (cpysz >=maxlen - 1) { cpysz=maxlen - 1; done=1; } } memcpy(buf, readptr, cpysz); stream->position +=cpysz; stream->readpos +=cpysz; buf +=cpysz; maxlen -=cpysz; total_copied +=cpysz; if (done) { break; } } else if (stream->eof) { break; } else { /* XXX: Should be fine to always read chunk_size */ size_t toread; if (grow_mode) { toread=stream->chunk_size; } else { toread=maxlen - 1; if (toread > stream->chunk_size) { toread=stream->chunk_size; } } php_stream_fill_read_buffer(stream, toread); if (stream->writepos - stream->readpos==0) { break; } } } if (total_copied==0) { if (grow_mode) { assert(bufstart==NULL); } return NULL; } buf[0]='\0'; if (returned_len) { *returned_len=total_copied; } return bufstart;}
从 php_stream_get_line方法中可以看出 只有 bufstart=NULL的时候才会返回NULL,bufstart=NULL说明并未在buf缓冲和stream中接收到任何数据,包括终止符。
1.3 解决方案客户端设置合理的超时时间,有两种方式:
1.3.1 int_set1.3.2 setOptionini_set('default_socket_timeout', -1);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
注: -1均表示不超时,也可以将超时设置为自己希望的时间, 前面复现时就是设为为0.01ms
二、重新使用已经断开的连接使用已经断开的连接也有可能导致 'read error on connection', 这里需要区分 'Connection closed' 和 'Connection lost'。
2.1 连接断开2.1.1 Connection closed测试脚本如下,客户端主动关闭连接,但是下文接着使用该断开的链接,然后抛出异常返回 connection closed
<?php$rds=new Redis();try { $ret=$rds->pconnect("127.0.0.1", 6390); if ($ret==false) { echo "Connect return false"; exit; } $rds->close(); var_dump($rds->get("aa"));} catch (Exception $e) { var_dump ($e);}
测试结果如下:
image2.1.2 Connection lost参考Work around PHP bug of liveness checking 编写测试脚本 test.php 如下,连接上redis之后,在执行命令前kill redis 进程:
<?php$rds=new Redis();try { $ret=$rds->pconnect("127.0.0.1", 6390); if ($ret==false) { echo "Connect return false"; exit; } echo "Press any key to continue ..."; fgetc(STDIN); var_dump($rds->get("aa"));} catch (Exception $e) { var_dump ($e);}
如果
执行步骤如下
终端执行 php test.php 脚本另开一个终端 kill redis 进程第一个终端任意输入、回车此时会出现 'Connection lost'
image2.1.3 read error on connection连接上redis之后,不断执行命令的过程中,如果连接断开,会返回 read error on connection。测试脚本如下:
<?php$rds=new Redis();try { $ret=$rds->pconnect("127.0.0.1", 6390); if ($ret==false) { echo "Connect return false"; exit; } while(1){ $rds->get("aa"); }} catch (Exception $e) { var_dump ($e);}
如果
执行步骤如下
终端执行 php test.php 脚本另开一个终端 kill redis 进程此时抛出异常:
image或者新打开终端连接上redis服务端,执行client kill ,如下:
image正在执行的php脚本同样会捕获该异常read error on connection。
2.2 php-fpm & pconnect在cli 模式下, 通过php通过 pconnect 连接redis服务端,虽然业务代码,显示调用close, 但是实际上该连接并未断开,fpm 会维护到redis 的连接,下个请求再次执行pconnect 的时候并不会真正请求redis 建立连接。这样同样会带来一个问题,假如这个连接已经断开了,下个请求可能直接使用上个断开的连接,对此,phpredis 在其源码也有注释,详见php-src
image因此php-fpm reuse 一个断开的连接可能导致此类错误。
此种情况最简单的解决方案就是改长链接为短链接了
三、小结网上有很多关于 执行超时及其解决方案的分析,但是对于连接断开重新使用的分析较少,故此分析之,一方面用作记录,另一方面希望能够给面临同样问题的小伙伴一点帮助。
四、参考[1] redis read error on connection和Redis server went away错误排查
[2] Work around PHP bug of liveness checking
[3] phpredis subscribe超时问题及解决
[4] php-src
[5] phpredis
</pre>
作者 | Mark_MMXI
来源 | CSDN博客,责编 | 夕颜
出品 | CSDN(ID:CSDNnews)
缓存的存在是为了在高并发情形下,缓解DB压力,提高业务系统体验。业务系统访问数据,先去缓存中进行查询,假如缓存存在数据直接返回缓存数据,否则就去查询数据库再返回值。
Redis是一种缓存工具,是一种缓存解决方案,但是引入Redis又有可能出现缓存穿透、缓存击穿、缓存雪崩等问题。本文就对缓存雪崩问题进行较深入剖析,并通过场景模型加深理解,基于场景使用对应的解决方案尝试解决。
缓存原理及Redis解决方案
首先,我们来看一下缓存的工作原理图:
Redis 本质上是一个 Key-Value 类型的内存数据库。因为是纯内存操作,Redis 的性能非常出色,每秒可以处理超过 10、万次读写操作。Redis 还有一个优势就是是支持保存多种数据结构,例如 String、List、Set、Sorted Set、hash等。
缓存雪崩
2.1 缓存雪崩解释
缓存雪崩的情况是说,当某一时刻发生大规模的缓存失效的情况,比如你的缓存服务宕机了,DB直接负载大量请求压力导致挂掉。
2.2 模拟缓存雪崩
按照缓存雪崩的解释,其实我们要模拟,只需要达到以下几个点:
同一时刻大规模缓存失效。
失效的时刻有大量的查询请求冲击DB
@Test
public void testQuery{
ExecutorService es=Executors.newFixedThreadPool(10);
int loop=1000;
int init=2000;
//查询1k个key放进缓存
for (int i=init; i < loop+init; i++) {
userService.queryById(i);
}
//缓存过期时间为1s,等待1s同时过期
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace;
}
//开始了使用多线程疯狂查询
for (int i=0; i < 100; i++) {
es.execute( -> {
for (int k=init; k < loop+init; k++) {
userService.queryById(k);
}
});
}
}
为了加快崩坏的速度,把数据库的最大连接数调整成5,同时增大数据库表的数据量达到百万级别。
然后执行测试程序,很快程序就报错并停止,详细错误如下:
Exception in thread "pool-1-thread-12" org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection is closed
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:74)
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41)
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44)
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:270)
at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.convertLettuceAccessException(LettuceStringCommands.java:799)
at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.get(LettuceStringCommands.java:68)
at org.springframework.data.redis.connection.DefaultedRedisConnection.get(DefaultedRedisConnection.java:260)
at org.springframework.data.redis.cache.DefaultRedisCacheWriter.lambda$get$1(DefaultRedisCacheWriter.java:109)
at org.springframework.data.redis.cache.DefaultRedisCacheWriter.execute(DefaultRedisCacheWriter.java:242)
at org.springframework.data.redis.cache.DefaultRedisCacheWriter.get(DefaultRedisCacheWriter.java:109)
at org.springframework.data.redis.cache.RedisCache.lookup(RedisCache.java:88)
at org.springframework.cache.support.AbstractValueAdaptingCache.get(AbstractValueAdaptingCache.java:58)
at org.springframework.cache.interceptor.AbstractCacheInvoker.doGet(AbstractCacheInvoker.java:73)
at org.springframework.cache.interceptor.CacheAspectSupport.findInCaches(CacheAspectSupport.java:554)
at org.springframework.cache.interceptor.CacheAspectSupport.findCachedItem(CacheAspectSupport.java:519)
at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:401)
at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:345)
at org.springframework.cache.interceptor.CacheInterceptor.invoke(CacheInterceptor.java:61)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at com.example.demo.user.service.impl.UserServiceImpl$EnhancerBySpringCGLIB$ba6638d2.queryById(<generated>)
at com.example.demo.DemoApplicationTests$1.run(DemoApplicationTests.java:55)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.lettuce.core.RedisException: Connection is closed
at io.lettuce.core.protocol.DefaultEndpoint.validateWrite(DefaultEndpoint.java:195)
at io.lettuce.core.protocol.DefaultEndpoint.write(DefaultEndpoint.java:137)
at io.lettuce.core.protocol.CommandExpiryWriter.write(CommandExpiryWriter.java:112)
2020-03-08 22:31:14.432 ERROR 37892 --- [eate-1895102622] com.alibaba.druid.pool.DruidDataSource : create connection SQLException, url: jdbc:mysql://localhost:3306/redis_demo?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC, errorCode 1040, state 08004
java.sql.SQLNonTransientConnectionException: Data source rejected establishment of connection, message from server: "Too many connections"
主要问题出在数据库连接已经满了,无法获取数据库连接进行查询,这个现象是就是缓存雪崩的效果。
‘
2.3 解决缓存雪崩
2.3.1 分析雪崩场景
用图来说,实际上就是没有了redis这层担着上层流量压力
其实从这张图来看,对于我们一般的应用,客户端去访问应用到数据库的整个链路过程,其实在面临大流量的时候,我们一般是以"倒三角"模型进行流量缓冲,什么是“倒三角”模型
通过"倒三角"模型,按照并发需要优化系统,在面临雪崩这种情形,可以按照“倒三角”模型进行优化,注意雪崩是理论上没办法彻底解决的,可能到最终得提高硬件配置。
2.3.1 雪崩优化方案
经过分析得解决雪崩方案:
1.随机缓存过期时间,能一定程度缓解雪崩
2.使用锁或队列、设置过期标志更新缓存
3.添加本地缓存实现多级缓存
4.添加熔断降级限流,缓冲压力
2.3.1.1 随机缓存时间
随机缓存时间意在避免大量热点key同时失效。
接下来,我们基于Redis+SpringBoot+SpringCache基础项目搭建这个项目继续进行实践。
由于是使用了SpringCache,我们最优的方案就是直接在@Cacheable等注解上面加参数,比如像表达式之类的,让数据放进缓存的时候按照表达式/参数值定义过期时间。
因此我们先查看原有的RedisCache是怎么样的put逻辑
RedisCacheManager创建Cache
protected RedisCache createRedisCache(String name, @able RedisCacheConfiguration cacheConfig) {
return new RedisCache(name, this.cacheWriter, cacheConfig !=? cacheConfig : this.defaultCacheConfig);
}
打开RedisCache.class,查看put 方法如下:
public void put(Object key, @able Object value) {
Object cacheValue=this.preProcessCacheValue(value);
if (!this.isAllowValues && cacheValue==) {
throw new IllegalArgumentException(String.format("Cache '%s' does not allow '' values. Avoid storing via '@Cacheable(unless=\"#result==\")' or configure RedisCache to allow '' via RedisCacheConfiguration.", this.name));
} else {
this.cacheWriter.put(this.name, this.createAndConvertCacheKey(key), this.serializeCacheValue(cacheValue), this.cacheConfig.getTtl);
}
}
这里this.cacheConfig.getTtl 就是缓存的过期时间,可以看到数据的缓存过期时间是从全局缓存配置里面获取的过期时间配置的,而我需要实现的是让某个cache下每个key随机时间过期,因此我们需要改动这里 this.cacheConfig.getTtl,我们在createRedisCache的时候改变这个值就行了。
1. 基于java动态执行字符串代码,返回过期时间。
实现基于Spring.expression的ExpressService
/**
* @title: ExpressUtil
* @projectName redisdemo
* @description: 动态执行字符串代码
* @author lps
* @date 2020/3/912:01
*/
@Slf4j
public class ExpressService {
private ExpressionParser spelExpressionParser;
private ParserContext parserContext;
// 表达式解析上下文
private StandardEvaluationContext evaluationContext;
public static enum ExpressType {
/**
* ${}表达式格式
*/
TYPE_FIRST,
/**
* #{}表达式格式
*/
TYPE_SECOND
}
private static final String PRE_TYPE_1="${";
private static final String PRE_TYPE_2="#{";
private static final String SUF_STR="}";
private ExpressService(String pre, String suf) {
spelExpressionParser=new SpelExpressionParser;
log.debug("表达式前缀={},表达式后缀={}", pre, suf);
evaluationContext=new StandardEvaluationContext;
// 增加map解析方案
evaluationContext.addPropertyAccessor(new MapAccessor);
parserContext=new TemplateParserContext(pre, suf);
}
/**
*
* <p>
* 创建表达式处理服务对象 默认为创建#{}格式表达式 通过ExpressType指定表达式格式,现有两种${}和#{}
* </p>
*
*
* @param type
* 表达式格式类型
* @return 表达式解析对象
*/
public static ExpressService createExpressService(ExpressType type) {
if (type==ExpressType.TYPE_FIRST) {
log.debug("生成表达式,表达式前缀={}", PRE_TYPE_1);
return new ExpressService(PRE_TYPE_1, SUF_STR);
} else {
return new ExpressService(PRE_TYPE_2, SUF_STR);
}
}
public Object expressParse(String express, Object data) throws Exception {
log.debug("解析表达式信息={}", express);
Expression expression=spelExpressionParser.parseExpression(express, this.parserContext);
return expression.getValue(evaluationContext, data);
}
}
测试调用:
@Test
public void testExpress{
ExpressService express=ExpressService.createExpressService;
try {
//固定超时时间
System.out.println("ttl="+express.expressParse("#{60}", ));
//调用方法生成随机过期时间
System.out.println("ttl="+express.expressParse("#{T(org.apache.commons.lang3.RandomUtils).nextInt(60,200)}", ));
} catch (Exception e) {
e.printStackTrace;
}
}
2. 设计name拼接ttl规则
由于createRedisCache只有两个参数name以及cacheConfig,而只有name是对于单个cache来说的,cacheConfig是对于全局cache来说,因此我们需要设计name参数中指定cache的name以及过期时间的规则。
name赋值规则:name|ttlFun
eg:
@Cacheable(cacheName="test|#{T(org.apache.commons.lang3.RandomUtils).nextInt(60,200)}")
@Cacheable(cacheName="test|#{60}")
3. 编写解析name代码
/**
* 分隔符|
*/
private static final String SEPERATE_LINE="|";
public MyRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) {
super(cacheWriter, defaultCacheConfiguration);
}
protected RedisCache createRedisCache(String name, @able RedisCacheConfiguration cacheConfig) {
// ``name赋值规则:name|ttlFun ``
if(name.contains(SEPERATE_LINE)){
String cacheName=name.substring(0,name.indexOf(SEPERATE_LINE));
String expression=name.substring(name.indexOf(SEPERATE_LINE)+1);
try{
ExpressService express=ExpressService.createExpressService;
long ttl=Long.parseLong(express.expressParse(expression, ).toString);
cacheConfig=cacheConfig.entryTtl(Duration.ofSeconds(ttl));
return super.createRedisCache(cacheName, cacheConfig);
}catch (Exception e){
e.printStackTrace;
return super.createRedisCache(name, cacheConfig);
}
}
return super.createRedisCache(name, cacheConfig);
}
4. 修改CacheConfig
将原本的RedisManager替换成#3编写的MyRedisManager
/**
* 配置缓存管理器
*/
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
//关键点,spring cache 的注解使用的序列化都从这来,没有这个配置的话使用的jdk自己的序列化,实际上不影响使用,只是打印出来不适合人眼识别
RedisCacheConfiguration cacheConfig=RedisCacheConfiguration.defaultCacheConfig
// 将 key 序列化成字符串
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer))
// 将 value 序列化成 json
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer))
// 设置缓存过期时间,单位秒
.entryTtl(Duration.ofSeconds(cacheExpireTime))
// 不缓存空值
.disableCachingValues;
/* RedisCacheManager redisCacheManager=RedisCacheManager.builder(factory)
.cacheDefaults(cacheConfig)
.build;*/
//修改RedisCacheManager 为MyRedisCacheManager
MyRedisCacheManager redisCacheManager=new MyRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(factory), cacheConfig);
return redisCacheManager;
}
5. 测试
编写单元测试
@Test
public void testQueryIdWithExpress{
Assert.assertNot(userService.queryById(3333));
}
重新定义查询的cache定义
@Override
@Cacheable( value="ca1|#{60}",key="#id" ,unless="#result==")
// @Cacheable( value="ca1|#{T(org.apache.commons.lang3.RandomUtils).nextInt(100,200)}",key="#id" ,unless="#result==")
public User queryById(int id) {
return this.userDao.queryById(id);
}
当value=ca1|#{60}的时候,通过查看Redis的TTL 剩余为58s
当value=ca1|#{T(org.apache.commons.lang3.RandomUtils).nextInt(100,200)}的时候,随机100-220范围内秒数,通过查看Redis的TTL 剩余为107s
这时候使用random的方式就可以实现随机过期时间了,随机数最好选择符合高斯(正态)分布的会比较好。
new Random.nextGaussian
2.3.1.2 互斥锁排队
业界比价普遍的一种做法,即根据key获取value值为空时,锁上,从数据库中load数据后再释放锁。若其它线程获取锁失败,则等待一段时间后重试。这里要注意,分布式环境中要使用分布式锁,单机的话用普通的锁(synchronized、Lock)就够了。
这样做思路比较清晰,也从一定程度上减轻数据库压力,但是锁机制使得逻辑的复杂度增加,吞吐量也降低了,有点治标不治本。
1.使用setnx的方式设置互斥锁
public User queryById(int id) {
try {
if (redisTemplate.hasKey(id+"")) {
return (User) redisTemplate.opsForValue.get(id+"");
} else {
//获取锁
if(lock(id+"")){
// 数据库查询
User user=userDao.queryById(id);
redisTemplate.opsForValue.set(id+"",user, Duration.ofSeconds(3000));
//释放锁
redisTemplate.delete(LOCK_PREFIX + "id");
}
}
} catch (Exception e) {
e.printStackTrace;
}
return (User) redisTemplate.opsForValue.get(""+id);
}
private static String LOCK_PREFIX="prefix";
private static long LOCK_EXPIRE=3000;
/**
* 互斥锁实现
*/
public boolean lock(String key) {
String lock=LOCK_PREFIX + key;
return (Boolean) redisTemplate.execute((RedisCallback) connection -> {
long expireAt=System.currentTimeMillis + LOCK_EXPIRE + 1;
//SETNX
Boolean acquire=connection.setNX(lock.getBytes, String.valueOf(expireAt).getBytes);
if (acquire) {
return true;
} else {
byte value=connection.get(lock.getBytes);
if (Objects.non(value) && value.length > 0) {
long expireTime=Long.parseLong(new String(value));
//判断锁是否过期
if (expireTime < System.currentTimeMillis) {
byte oldValue=connection.getSet(lock.getBytes, String.valueOf(System.currentTimeMillis + LOCK_EXPIRE + 1).getBytes);
return Long.parseLong(new String(oldValue)) < System.currentTimeMillis;
}
}
}
return false;
});
}
2.3.1.3 设置过期标志更新缓存
定时更新缓存,阻塞部分请求,达到缓冲作用,也可以设置key永不过期
2.3.1.4 多级缓存
这个方案主要在redis宕机,或者key在更新进缓存的中间,可以响应业务应用,减轻压力
2.3.1.5 熔断降级限流
这个方案是直接在业务应用之上进行请求流量控制,减轻下层压力
原文链接:https://blog.csdn.net/qq_28540443/article/details/104746655
发表评论