spring异步service中处理线程数限制详解
情况简介
spring项目,controller异步调用service的方法,产生大量并发。
具体业务:
前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果。
处理方式:
controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库。
本文主要知识点:
多线程同时(异步)调用方法后,开启新线程,并限制线程数量。
代码如下:
@Service
publicclassLgtsAsyncServiceImpl{
/**logger日志.*/
publicstaticfinalLoggerLOGGER=Logger.getLogger(LgtsAsyncServiceImpl2.class);
privatefinalBlockingQueueque=newLinkedBlockingQueue<>();//待翻译的队列
privatefinalAtomicIntegerthreadCnt=newAtomicInteger(0);//当前翻译中的线程数
privatefinalVectorexistsKey=newVector<>();//保存已入队列的数据
privatefinalintmaxThreadCnt=2;//允许同时执行的翻译线程数
privatestaticfinalintNUM_OF_EVERY_TIME=50;//每次提交的翻译条数
privatestaticfinalStringtranslationFrom="zh";
@Async
publicvoidsaveAsync(Lgtst){
if(Objects.isNull(t)||StringUtils.isAnyBlank(t.getGco(),t.getCode())){
return;
}
offer(t);
save();
return;
}
privatebooleanoffer(Lgtst){
Stringkey=t.getGco()+"-"+t.getCode();
if(!existsKey.contains(key)){
existsKey.add(key);
booleanresult=que.offer(t);
//LOGGER.trace("待翻译文字["+t.getGco()+":"+t.getCode()+"]加入队列结果["+result
//+"],队列中数据总个数:"+que.size());
returnresult;
}
returnfalse;
}
@Autowired
privateLgtsServicelgtsService;
privatevoidsave(){
intcnt=threadCnt.incrementAndGet();//当前线程数+1
if(cnt>maxThreadCnt){
//已启动的线程大于设置的最大线程数直接丢弃
threadCnt.decrementAndGet();//+1的线程数再-回去
return;
}
GwallUseruser=UserUtils.getUser();
Threadthr=newThread(){
publicvoidrun(){
longsleepTime=30000l;
UserUtils.setUser(user);
booleancontinueFlag=true;
intmaxContinueCnt=5;//最大连续休眠次数,连续休眠次数超过最大休眠次数后,while循环退出,当前线程销毁
intcontinueCnt=0;//连续休眠次数
while(continueFlag){//队列不为空时执行
if(Objects.isNull(que.peek())){
try{
if(continueCnt>maxContinueCnt){
//连续休眠次数达到最大连续休眠次数,当前线程将销毁。
continueFlag=false;
continue;
}
//队列为空,准备休眠
Thread.sleep(sleepTime);
continueCnt++;
continue;
}catch(InterruptedExceptione){
//休眠失败,无需处理
e.printStackTrace();
}
}
continueCnt=0;//重置连续休眠次数为0
Listparams=newArrayList<>();
inttotalCnt=que.size();
que.drainTo(params,NUM_OF_EVERY_TIME);
StringBuilderutf8q=newStringBuilder();
Stringcode="";
ListneedRemove=newArrayList<>();
for(Lgtslgts:params){
if(StringUtils.isAnyBlank(code)){
code=lgts.getCode();
}
//移除existsKey中保存的key,以免下面翻译失败时再次加入队列时,加入不进去
Stringkey=lgts.getGco()+"-"+lgts.getCode();
existsKey.remove(key);
if(!code.equalsIgnoreCase(lgts.getCode())){//要翻译的目标语言与当前列表中的第一个不一致
offer(lgts);//重新将待翻译的语言放回队列
needRemove.add(lgts);
continue;
}
utf8q.append(lgts.getGco()).append("\n");
}
params.removeAll(needRemove);
LOGGER.debug("队列中共"+totalCnt+"个,获取"+params.size()+"个符合条件的待翻译内容,编码:"+code);
Stringto="en";
if(StringUtils.isAnyBlank(utf8q,to)){
LOGGER.warn("调用翻译出错,未找到["+code+"]对应的百度编码。");
continue;
}
Mapresult=getBaiduTranslation(utf8q.toString(),translationFrom,to);
if(Objects.isNull(result)||result.isEmpty()){//把没有获取到翻译结果的重新放回队列
for(Lgtslgts:params){
offer(lgts);
}
LOGGER.debug("本次翻译结果为空。");
continue;
}
intsucessCnt=0,ignoreCnt=0;
for(Lgtslgts:params){
lgts.setBdcode(to);
Stringgna=result.get(lgts.getGco());
if(StringUtils.isAnyBlank(gna)){
offer(lgts);//重新将待翻译的语言放回队列
continue;
}
lgts.setStat(1);
lgts.setGna(gna);
intsaveResult=lgtsService.saveIgnore(lgts);
if(0==saveResult){
ignoreCnt++;
}else{
sucessCnt++;
}
}
LOGGER.debug("待翻译个数:"+params.size()+",翻译成功个数:"+sucessCnt+",已存在并忽略个数:"+ignoreCnt);
}
threadCnt.decrementAndGet();//运行中的线程数-1
distory();//清理数据,必须放在方法最后,否则distory中的判断需要修改
}
/**
*如果是最后一个线程,清空队列和existsKey中的数据
*/
privatevoiddistory(){
if(0==threadCnt.get()){
//最后一个线程退出时,执行清理操作
existsKey.clear();
que.clear();
}
}
};
thr.setDaemon(true);//守护线程,如果主线程执行完毕,则此线程会自动销毁
thr.setName("baidufanyi-"+RandomUtils.nextInt(1000,9999));
thr.start();//启动插入线程
}
/**
*百度翻译
*
*@paramutf8q
*待翻译的字符串,需要utf8格式的
*@paramfrom
*百度翻译语言列表中的代码
*参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
*@paramto
*百度翻译语言列表中的代码
*参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
*@return翻译结果
*/
privateMapgetBaiduTranslation(Stringutf8q,Stringfrom,Stringto){
Mapresult=newHashMap<>();
StringbaiduurlStr="http://api.fanyi.baidu.com/api/trans/vip/translate";
if(StringUtils.isAnyBlank(baiduurlStr)){
LOGGER.warn("百度翻译API接口URL相关参数为空!");
returnresult;
}
Mapparams=buildParams(utf8q,from,to);
if(params.isEmpty()){
returnresult;
}
StringsendUrl=getUrlWithQueryString(baiduurlStr,params);
try{
HttpClienthttpClient=newHttpClient();
httpClient.setMethod("GET");
StringremoteResult=httpClient.pub(sendUrl,"");
result=convertRemote(remoteResult);
}catch(Exceptione){
LOGGER.info("百度翻译API返回结果异常!",e);
}
returnresult;
}
privateMapconvertRemote(StringremoteResult){
Mapresult=newHashMap<>();
if(StringUtils.isBlank(remoteResult)){
returnresult;
}
JSONObjectjsonObject=JSONObject.parseObject(remoteResult);
JSONArraytrans_result=jsonObject.getJSONArray("trans_result");
if(Objects.isNull(trans_result)||trans_result.isEmpty()){
returnresult;
}
for(Objectobject:trans_result){
JSONObjecttrans=(JSONObject)object;
result.put(trans.getString("src"),trans.getString("dst"));
}
returnresult;
}
privateMapbuildParams(Stringutf8q,Stringfrom,Stringto){
if(StringUtils.isBlank(from)){
from="auto";
}
Mapparams=newHashMap();
StringskStr="sk";
StringappidStr="appid";
if(StringUtils.isAnyBlank(skStr,appidStr)){
LOGGER.warn("百度翻译API接口相关参数为空!");
returnparams;
}
params.put("q",utf8q);
params.put("from",from);
params.put("to",to);
params.put("appid",appidStr);
//随机数
Stringsalt=String.valueOf(System.currentTimeMillis());
params.put("salt",salt);
//签名
Stringsrc=appidStr+utf8q+salt+skStr;//加密前的原文
params.put("sign",MD5Util.md5Encrypt(src).toLowerCase());
returnparams;
}
publicstaticStringgetUrlWithQueryString(Stringurl,Mapparams){
if(params==null){
returnurl;
}
StringBuilderbuilder=newStringBuilder(url);
if(url.contains("?")){
builder.append("&");
}else{
builder.append("?");
}
inti=0;
for(Stringkey:params.keySet()){
Stringvalue=params.get(key);
if(value==null){//过滤空的key
continue;
}
if(i!=0){
builder.append('&');
}
builder.append(key);
builder.append('=');
builder.append(encode(value));
i++;
}
returnbuilder.toString();
}
/**
*对输入的字符串进行URL编码,即转换为%20这种形式
*
*@paraminput
*原文
*@returnURL编码.如果编码失败,则返回原文
*/
publicstaticStringencode(Stringinput){
if(input==null){
return"";
}
try{
returnURLEncoder.encode(input,"utf-8");
}catch(UnsupportedEncodingExceptione){
e.printStackTrace();
}
returninput;
}
}
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。