Java API方式调用Kafka各种协议的方法
众所周知,Kafka自己实现了一套二进制协议(binaryprotocol)用于各种功能的实现,比如发送消息,获取消息,提交位移以及创建topic等。具体协议规范参见:Kafka协议 这套协议的具体使用流程为:
1.客户端创建对应协议的请求
2.客户端发送请求给对应的broker
3.broker处理请求,并发送response给客户端
虽然Kafka提供的大量的脚本工具用于各种功能的实现,但很多时候我们还是希望可以把某些功能以编程的方式嵌入到另一个系统中。这时使用JavaAPI的方式就显得异常地灵活了。本文我将尝试给出JavaAPI底层框架的一个范例,同时也会针对“创建topic”和“查看位移”这两个主要功能给出对应的例子。需要提前说明的是,本文给出的范例并没有考虑Kafka集群开启安全的情况。另外Kafka的KIP4应该一直在优化命令行工具以及各种管理操作,有兴趣的读者可以关注这个KIP。
本文中用到的API依赖于kafka-clients,所以如果你使用Maven构建的话,请加上:
org.apache.kafka kafka-clients 0.10.2.0
如果是gradle,请加上:
compilegroup:'org.apache.kafka',name:'kafka-clients',version:'0.10.2.0'
底层框架
/**
*发送请求主方法
*@paramhost目标broker的主机名
*@paramport目标broker的端口
*@paramrequest请求对象
*@paramapiKey请求类型
*@return序列化后的response
*@throwsIOException
*/
publicByteBuffersend(Stringhost,intport,AbstractRequestrequest,ApiKeysapiKey)throwsIOException{
Socketsocket=connect(host,port);
try{
returnsend(request,apiKey,socket);
}finally{
socket.close();
}
}
/**
*发送序列化请求并等待response返回
*@paramsocket连向目标broker的socket
*@paramrequest序列化后的请求
*@return序列化后的response
*@throwsIOException
*/
privatebyte[]issueRequestAndWaitForResponse(Socketsocket,byte[]request)throwsIOException{
sendRequest(socket,request);
returngetResponse(socket);
}
/**
*发送序列化请求给socket
*@paramsocket连向目标broker的socket
*@paramrequest序列化后的请求
*@throwsIOException
*/
privatevoidsendRequest(Socketsocket,byte[]request)throwsIOException{
DataOutputStreamdos=newDataOutputStream(socket.getOutputStream());
dos.writeInt(request.length);
dos.write(request);
dos.flush();
}
/**
*从给定socket处获取response
*@paramsocket连向目标broker的socket
*@return获取到的序列化后的response
*@throwsIOException
*/
privatebyte[]getResponse(Socketsocket)throwsIOException{
DataInputStreamdis=null;
try{
dis=newDataInputStream(socket.getInputStream());
byte[]response=newbyte[dis.readInt()];
dis.readFully(response);
returnresponse;
}finally{
if(dis!=null){
dis.close();
}
}
}
/**
*创建Socket连接
*@paramhostName目标broker主机名
*@paramport目标broker服务端口,比如9092
*@return创建的Socket连接
*@throwsIOException
*/
privateSocketconnect(StringhostName,intport)throwsIOException{
returnnewSocket(hostName,port);
}
/**
*向给定socket发送请求
*@paramrequest请求对象
*@paramapiKey请求类型,即属于哪种请求
*@paramsocket连向目标broker的socket
*@return序列化后的response
*@throwsIOException
*/
privateByteBuffersend(AbstractRequestrequest,ApiKeysapiKey,Socketsocket)throwsIOException{
RequestHeaderheader=newRequestHeader(apiKey.id,request.version(),"client-id",0);
ByteBufferbuffer=ByteBuffer.allocate(header.sizeOf()+request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte[]serializedRequest=buffer.array();
byte[]response=issueRequestAndWaitForResponse(socket,serializedRequest);
ByteBufferresponseBuffer=ByteBuffer.wrap(response);
ResponseHeader.parse(responseBuffer);
returnresponseBuffer;
}
有了这些方法的铺垫,我们就可以创建具体的请求了。
创建topic
/**
*创建topic
*由于只是样例代码,有些东西就硬编码写到程序里面了(比如主机名和端口),各位看官自行修改即可
*@paramtopicNametopic名
*@parampartitions分区数
*@paramreplicationFactor副本数
*@throwsIOException
*/
publicvoidcreateTopics(StringtopicName,intpartitions,shortreplicationFactor)throwsIOException{
Maptopics=newHashMap<>();
//插入多个元素便可同时创建多个topic
topics.put(topicName,newCreateTopicsRequest.TopicDetails(partitions,replicationFactor));
intcreationTimeoutMs=60000;
CreateTopicsRequestrequest=newCreateTopicsRequest.Builder(topics,creationTimeoutMs).build();
ByteBufferresponse=send("localhost",9092,request,ApiKeys.CREATE_TOPICS);
CreateTopicsResponse.parse(response,request.version());
}
查看位移
/**
*获取某个consumergroup下的某个topic分区的位移
*@paramgroupIDgroupid
*@paramtopictopic名
*@paramparititon分区号
*@throwsIOException
*/
publicvoidgetOffsetForPartition(StringgroupID,Stringtopic,intparititon)throwsIOException{
TopicPartitiontp=newTopicPartition(topic,parititon);
OffsetFetchRequestrequest=newOffsetFetchRequest.Builder(groupID,singletonList(tp))
.setVersion((short)2).build();
ByteBufferresponse=send("localhost",9092,request,ApiKeys.OFFSET_FETCH);
OffsetFetchResponseresp=OffsetFetchResponse.parse(response,request.version());
OffsetFetchResponse.PartitionDatapartitionData=resp.responseData().get(tp);
System.out.println(partitionData.offset);
}
/** *获取某个consumergroup下所有topic分区的位移信息 *@paramgroupIDgroupid *@return(topic分区-->分区信息)的map *@throwsIOException */ publicMapgetAllOffsetsForGroup(StringgroupID)throwsIOException{ OffsetFetchRequestrequest=newOffsetFetchRequest.Builder(groupID,null).setVersion((short)2).build(); ByteBufferresponse=send("localhost",9092,request,ApiKeys.OFFSET_FETCH); OffsetFetchResponseresp=OffsetFetchResponse.parse(response,request.version()); returnresp.responseData(); }
okay,上面就是“创建topic”和“查看位移”的样例代码,各位看官可以参考着这两个例子构建其他类型的请求。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。