Java多线程编程实战之模拟大量数据同步
背景
最近对于Java多线程做了一段时间的学习,笔者一直认为,学习东西就是要应用到实际的业务需求中的。否则要么无法深入理解,要么硬生生地套用技术只是达到炫技的效果。
不过笔者仍旧认为自己对于多线程掌握不够熟练,不敢轻易应用到生产代码中。这就按照平时工作中遇到的实际问题,脑补了一个很可能存在的业务场景:
已知某公司管理着1000个微信服务号,每个服务号有1w~50w粉丝不等。假设该公司每天都需要将所有微信服务号的粉丝数据通过调用微信API的方式更新到本地数据库。
需求分析
对此需求进行分析,主要存在以下问题:
- 单个服务号获取粉丝id,只能每次1w按顺序拉取
- 微信的API对于服务商的并发请求数量有限制
单个服务号获取粉丝id,只能每次1w按顺序拉取。这个问题决定了单个公众号在拉取粉丝id上,无法分配给多个线程执行。
微信的API对于服务商的并发请求数量有限制。这点最容易被忽略,如果我们同时有过多的请求,则会导致接口被封禁。这里可以通过信号量来控制同时执行的线程数量。
为了尽快完成数据同步,根据实际情况:整个数据同步可分为读数据和写数据两个部分。读数据是通过API获取,走网络IO,速度较慢;写数据是写到数据库,速度较快。所以得出结论:需要分配较多的线程进行读数据,较少的线程进行写数据。
设计要点
首先,我们需要确定开启多少个线程(在生产中往往是使用线程池),线程数量需要根据服务器性能来决定,这里我们定为40个读取数据线程(将1000个公众号分为40份,分别在40个线程中执行),1个写入数据线程。(具体开多少个线程,取决于线程池的容量,以及可以分配给此业务的数量。具体的数字需要根据实际情况测试得出,比服务器阈值低一些较好。当然,配置允许范围内越大越好)
其次,考虑到微信对于API并发请求的限制,需要限制同时执行的线程数,使用java.util.concurrent.Semaphore进行控制,这里我们限制为20个(具体的信号量凭证数,取决于同一时间能够执行的线程,跟API限制,服务器性能有关)。
然后,我们需要知道数据何时读取、写入完毕,以控制程序逻辑以及终止程序,这里我们使用java.util.concurrent.CountDownLatch进行控制。
最后,我们需要一个数据结构,用来在多个线程中共享处理的数据,此处同步数据的场景非常适合使用队列,这里我们使用线程安全的java.util.concurrent.ConcurrentLinkedQueue来进行处理。(需要注意的是,在实际开发中,队列不能够无限制地增长,这将会很快消耗掉内存,我们需要根据实际情况对队列长度做控制。例如,可以通过控制读取线程数和写入线程数的比例来控制队列的长度)
模拟代码
由于本文重点关注多线程的使用,模拟代码只体现多线程操作的方法。代码里添加了大量的注释,方便各位读者阅读理解。
JDK:1.8
importjava.util.Arrays; importjava.util.List; importjava.util.Queue; importjava.util.concurrent.ConcurrentLinkedQueue; importjava.util.concurrent.CountDownLatch; importjava.util.concurrent.Semaphore; importjava.util.concurrent.TimeUnit; /** *N个线程向队列添加数据 *一个线程消费队列数据 */ publicclassQueueTest{ privatestaticListdata=Arrays.asList("a","b","c","d","e"); privatestaticfinalintOFFER_COUNT=40;//开启的线程数量 privatestaticSemaphoresemaphore=newSemaphore(20);//同一时间执行的线程数量(大多用于控制API调用次数或数据库查询连接数) publicstaticvoidmain(String[]args)throwsInterruptedException{ Queue queue=newConcurrentLinkedQueue<>();//处理队列,需要处理的数据,放置到此队列中 CountDownLatchofferLatch=newCountDownLatch(OFFER_COUNT);//offer线程latch,每完成一个,latch减一,lacth的count为0时表示offer处理完毕 CountDownLatchpollLatch=newCountDownLatch(1);//poll线程latch,latch的count为0时,表示poll处理完毕 RunnableofferRunnable=()->{ try{ semaphore.acquire();//信号量控制 }catch(InterruptedExceptione){ e.printStackTrace(); } try{ for(Stringdatum:data){ queue.offer(datum); TimeUnit.SECONDS.sleep(2);//模拟取数据很慢的情况 } }catch(InterruptedExceptione){ e.printStackTrace(); }finally{ //在finally中执行latch.countDown()以及信号量释放,避免因异常导致没有正常释放 offerLatch.countDown(); semaphore.release(); } }; RunnablepollRunnable=()->{ intcount=0; try{ while(offerLatch.getCount()>0||queue.size()>0){//只要offer的latch未执行完,或queue仍旧有数据,则继续循环 Stringpoll=queue.poll(); if(poll!=null){ System.out.println(poll); count++; } //无论是否poll到数据,均暂停一小段时间,可降低CPU消耗 TimeUnit.MILLISECONDS.sleep(100); } System.out.println("totalcount:"+count); }catch(InterruptedExceptione){ e.printStackTrace(); }finally{ //在finally中执行latch.countDown(),避免因异常导致没有正常释放 pollLatch.countDown(); } }; //启动线程(生产环境中建议使用线程池) newThread(pollRunnable).start();//启动一个poll线程 for(inti=0;i 到这里,本文结束。以上是笔者脑补的一个常见需求的解决方案。
注意:多线程编程对实际环境和需求有很大的依赖,需要根据实际的需求情况对各个参数做调整。实际在使用中,需要尽量模拟生产环境的数据情况来进行测试,对服务器执行期间的并发数,CPU、内存、网络IO、磁盘IO做好观察。并适当地调低并发数,以给服务器留有处理其他请求的余量。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。