java实现memcache服务器的示例代码
什么是Memcache?
Memcache集群环境下缓存解决方案
Memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。简单的说就是将数据调用到内存中,然后从内存中读取,从而大大提高读取速度。
Memcache是danga的一个项目,最早是LiveJournal服务的,最初为了加速LiveJournal访问速度而开发的,后来被很多大型的网站采用。
Memcached是以守护程序方式运行于一个或多个服务器中,随时会接收客户端的连接和操作
为什么会有Memcache和memcached两种名称?
其实Memcache是这个项目的名称,而memcached是它服务器端的主程序文件名,知道我的意思了吧。一个是项目名称,一个是主程序文件名,在网上看到了很多人不明白,于是混用了。
Memcached是高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提升访问速度。Memcached由DangaInteractive开发,用于提升LiveJournal.com访问速度的。LJ每秒动态页面访问量几千次,用户700万。Memcached将数据库负载大幅度降低,更好的分配资源,更快速访问。
这篇文章将会涉及以下内容:
- JavaSocket多线程服务器
- JavaIO
- Concurrency
- Memcache特性和协议
Memcache
Memcacheisanin-memorykey-valuestoreforsmallchunksofarbitrarydata(strings,objects)fromresultsofdatabasecalls,APIcalls,orpagerendering.
即内存缓存数据库,是一个键值对数据库。该数据库的存在是为了将从其他服务中获取的数据暂存在内存中,在重复访问时可以直接从命中的缓存中返回。既加快了访问速率,也减少了其他服务的负载。这里将实现一个单服务器版本的Memcache,并且支持多个客户端的同时连接。
客户端将与服务器建立telnet连接,然后按照Memcache协议与服务器缓存进行交互。这里实现的指令为get,set和del。先来看一下各个指令的格式
set
set属于存储指令,存储指令的特点时,第一行输入基本信息,第二行输入其对应的value值。
set
[noreply]\r\n
\r\n
如果存储成功,将会返回STORED,如果指令中包含noreply属性,则服务器将不会返回信息。
该指令中每个域的内容如下:
- key:键
- flags:16位无符号整数,会在get时随键值对返回
- exptime:过期时间,以秒为单位
- bytes:即将发送的value的长度
- noreply:是否需要服务器响应,为可选属性
如果指令不符合标准,服务器将会返回ERROR。
get
get属于获取指令,该指令特点如下:
get
*\r\n
它支持传入多个key的值,如果缓存命中了一个或者多个key,则会返回相应的数据,并以END作为结尾。如果没有命中,则返回的消息中不包含该key对应的值。格式如下:
VALUE\r\n \r\n VALUE \r\n \r\n END del
删除指令,该指令格式如下:
del[noreply]\r\n
如果删除成功,则返回DELETED\r\n,否则返回NOT_FOUND。如果有noreply参数,则服务器不会返回响应。
JAVASOCKET
JAVASOCKET需要了解的只是包括TCP协议,套接字,以及IO流。这里就不详细赘述,可以参考我的这系列文章,也建议去阅读JAVANetworkProgramming。一书。
代码实现
这里贴图功能出了点问题,可以去文末我的项目地址查看类图。
这里采用了指令模式和工厂模式实现指令的创建和执行的解耦。指令工厂将会接收commandLine并且返回一个Command实例。每一个Command都拥有execute方法用来执行各自独特的操作。这里只贴上del指令的特殊实现。
/**
*各种指令
*目前支持get,set,delete
*
*以及自定义的
*error,end
*/
publicinterfaceCommand{
/**
*执行指令
*@paramreader
*@paramwriter
*/
voidexecute(Readerreader,Writerwriter);
/**
*获取指令的类型
*@return
*/
CommandTypegetType();
}
/**
*指令工厂单一实例
*/
publicclassCommandFactory{
privatestaticCommandFactorycommandFactory;
privatestaticCache- memcache;
privateCommandFactory(){}
publicstaticCommandFactorygetInstance(Cache
- cache){
if(commandFactory==null){
commandFactory=newCommandFactory();
memcache=cache;
}
returncommandFactory;
}
/**
*根据指令的类型获取Command
*@paramcommandLine
*@return
*/
publicCommandgetCommand(StringcommandLine){
if(commandLine.matches("^set.*$")){
returnnewSetCommand(commandLine,memcache);
}elseif(commandLine.matches("^get.*$")){
returnnewGetCommand(commandLine,memcache);
}elseif(commandLine.matches("^del.*$")){
returnnewDeleteCommand(commandLine,memcache);
}elseif(commandLine.matches("^end$")){
returnnewEndCommand(commandLine);
}else{
returnnewErrorCommand(commandLine,ErrorCommand.ErrorType.ERROR);
}
}
}
/**
*删除缓存指令
*/
publicclassDeleteCommandimplementsCommand{
privatefinalStringcommand;
privatefinalCache- cache;
privateStringkey;
privatebooleannoReply;
publicDeleteCommand(finalStringcommand,finalCache
- cache){
this.command=command;
this.cache=cache;
initCommand();
}
privatevoidinitCommand(){
if(this.command.contains("noreply")){
noReply=true;
}
String[]info=command.split("");
key=info[1];
}
@Override
publicvoidexecute(Readerreader,Writerwriter){
BufferedWriterbfw=(BufferedWriter)writer;
Itemitem=cache.delete(key);
if(!noReply){
try{
if(item==null){
bfw.write("NOT_FOUND\r\n");
}else{
bfw.write("DELETED\r\n");
}
bfw.flush();
}catch(IOExceptione){
try{
bfw.write("ERROR\r\n");
bfw.flush();
}catch(IOExceptione1){
e1.printStackTrace();
}
e.printStackTrace();
}
}
}
@Override
publicCommandTypegetType(){
returnCommandType.SEARCH;
}
}
然后是实现内存服务器,为了支持先进先出功能,这里使用了LinkedTreeMap作为底层实现,并且重写了removeOldest方法。同时还使用CacheManager的后台线程及时清除过期的缓存条目。
publicclassMemcacheimplementsCache- { privateLoggerlogger=Logger.getLogger(Memcache.class.getName()); //利用LinkedHashMap实现LRU privatestaticLinkedHashMap
cache; privatefinalintmaxSize; //负载因子 privatefinalfloatDEFAULT_LOAD_FACTOR=0.75f; publicMemcache(finalintmaxSize){ this.maxSize=maxSize; //确保cache不会在达到maxSize之后自动扩容 intcapacity=(int)Math.ceil(maxSize/DEFAULT_LOAD_FACTOR)+1; this.cache=newLinkedHashMap (capacity,DEFAULT_LOAD_FACTOR,true){ @Override protectedbooleanremoveEldestEntry(Map.Entry eldest){ if(size()>maxSize){ logger.info("缓存数量已经达到上限,会删除最近最少使用的条目"); } returnsize()>maxSize; } }; //实现同步访问 Collections.synchronizedMap(cache); } publicsynchronizedbooleanisFull(){ returncache.size()>=maxSize; } @Override publicItemget(Stringkey){ Itemitem=cache.get(key); if(item==null){ logger.info("缓存中key:"+key+"不存在"); returnnull; }elseif(item!=null&&item.isExpired()){//如果缓存过期则删除并返回null logger.info("从缓存中读取key:"+key+"value:"+item.getValue()+"已经失效"); cache.remove(key); returnnull; } logger.info("从缓存中读取key:"+key+"value:"+item.getValue()+"剩余有效时间"+item.remainTime()); returnitem; } @Override publicvoidset(Stringkey,Itemvalue){ logger.info("向缓存中写入key:"+key+"value:"+value); cache.put(key,value); } @Override publicItemdelete(Stringkey){ logger.info("从缓存中删除key:"+key); returncache.remove(key); } @Override publicintsize(){ returncache.size(); } @Override publicintcapacity(){ returnmaxSize; } @Override publicIterator >iterator(){ returncache.entrySet().iterator(); } }
/**
*缓存管理器
*后台线程
*将cache中过期的缓存删除
*/
publicclassCacheManagerimplementsRunnable{
privateLoggerlogger=Logger.getLogger(CacheManager.class.getName());
//缓存
publicCache- cache;
publicCacheManager(Cache
- cache){
this.cache=cache;
}
@Override
publicvoidrun(){
while(true){
Iterator
>itemIterator=cache.iterator();
while(itemIterator.hasNext()){
Map.Entryentry=itemIterator.next();
Itemitem=entry.getValue();
if(item.isExpired()){
logger.info("key:"+entry.getKey()+"value"+item.getValue()+"已经过期,从数据库中删除");
itemIterator.remove();
}
}
try{
//每隔5秒钟再运行该后台程序
TimeUnit.SECONDS.sleep(5);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
}
最后是实现一个多线程的Socket服务器,这里就是将ServerSocket绑定到一个接口,并且将accept到的Socket交给额外的线程处理。
/**
*服务器
*/
publicclassIOServerimplementsServer{
privatebooleanstop;
//端口号
privatefinalintport;
//服务器线程
privateServerSocketserverSocket;
privatefinalLoggerlogger=Logger.getLogger(IOServer.class.getName());
//线程池,线程容量为maxConnection
privatefinalExecutorServiceexecutorService;
privatefinalCache- cache;
publicIOServer(intport,intmaxConnection,Cache
- cache){
if(maxConnection<=0)thrownewIllegalArgumentException("支持的最大连接数量必须为正整数");
this.port=port;
executorService=Executors.newFixedThreadPool(maxConnection);
this.cache=cache;
}
@Override
publicvoidstart(){
try{
serverSocket=newServerSocket(port);
logger.info("服务器在端口"+port+"上启动");
while(true){
try{
Socketsocket=serverSocket.accept();
logger.info("收到"+socket.getLocalAddress()+"的连接");
executorService.submit(newSocketHandler(socket,cache));
}catch(IOExceptione){
e.printStackTrace();
}
}
}catch(IOExceptione){
logger.log(Level.WARNING,"服务器即将关闭...");
e.printStackTrace();
}finally{
executorService.shutdown();
shutDown();
}
}
/**
*服务器是否仍在运行
*@return
*/
publicbooleanisRunning(){
return!serverSocket.isClosed();
}
/**
*停止服务器
*/
publicvoidshutDown(){
try{
if(serverSocket!=null){
serverSocket.close();
}
}catch(IOExceptione){
e.printStackTrace();
}
}
}
/**
*处理各个客户端的连接
*在获得end指令后关闭连接s
*/
publicclassSocketHandlerimplementsRunnable{
privatestaticLoggerlogger=Logger.getLogger(SocketHandler.class.getName());
privatefinalSocketsocket;
privatefinalCache- cache;
privatebooleanfinish;
publicSocketHandler(Sockets,Cache
- cache){
this.socket=s;
this.cache=cache;
}
@Override
publicvoidrun(){
try{
//获取socket输入流
finalBufferedReaderreader=newBufferedReader(newInputStreamReader(socket.getInputStream()));
//获取socket输出流
finalBufferedWriterwriter=newBufferedWriter(newOutputStreamWriter(socket.getOutputStream()));
CommandFactorycommandFactory=CommandFactory.getInstance(cache);
while(!finish){
finalStringcommandLine=reader.readLine();
logger.info("ip:"+socket.getLocalAddress()+"指令:"+commandLine);
if(commandLine==null||commandLine.trim().isEmpty()){
continue;
}
//使用指令工厂获取指令实例
finalCommandcommand=commandFactory.getCommand(commandLine);
command.execute(reader,writer);
if(command.getType()==CommandType.END){
logger.info("请求关闭连接");
finish=true;
}
}
}catch(IOExceptione){
e.printStackTrace();
logger.info("关闭来自"+socket.getLocalAddress()+"的连接");
}finally{
try{
if(socket!=null){
socket.close();
}
}catch(IOExceptione){
e.printStackTrace();
}
}
}
}
项目地址请戳这里,如果觉得还不错的话,希望能给个星哈><
参考资料
memcached官网
memcache协议
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。