Java HTTP协议收发MQ 消息代码实例详解
1.准备环境
在工程POM文件添加HTTPJava客户端的依赖。
org.eclipse.jetty jetty-client 9.3.4.RC1 com.aliyun.openservices ons-client 1.1.11
2.运行代码配置(user.properties)
您需要设置配置文件(user.properties)的相关内容,具体请参考申请MQ资源。
#您在控制台创建的Topic Topic=xxx #公测url URL=http://publictest-rest.ons.aliyun.com #阿里云身份验证码 Ak=xxx #阿里云身份验证密钥 Sk=xxx #MQ控制台创建的ProducerID ProducerID=xxx #MQ控制台创建的ConsumerID ConsumerID=xxx
说明:URL中的Key,Tag以及POSTContent-Type没有任何的限制,只要确保Key和Tag相同唯一即可,可以放在user.properties里面。
3.HTTP发送消息示例代码
您可以按以下说明设置相应参数并测试HTTP消息发送功能。
packagecom.aliyun.openservice.ons.http.demo;
importjava.nio.charset.Charset;
importjava.util.Date;
importjava.util.Properties;
importorg.eclipse.jetty.client.HttpClient;
importorg.eclipse.jetty.client.api.ContentProvider;
importorg.eclipse.jetty.client.api.ContentResponse;
importorg.eclipse.jetty.client.api.Request;
importorg.eclipse.jetty.client.util.StringContentProvider;
importcom.aliyun.openservices.ons.api.impl.authority.AuthUtil;
publicclassHttpProducer{
publicstaticStringSIGNATURE="Signature";
publicstaticStringNUM="num";
publicstaticStringCONSUMERID="ConsumerID";
publicstaticStringPRODUCERID="ProducerID";
publicstaticStringTIMEOUT="timeout";
publicstaticStringTOPIC="Topic";
publicstaticStringAK="AccessKey";
publicstaticStringBODY="body";
publicstaticStringMSGHANDLE="msgHandle";
publicstaticStringTIME="time";
publicstaticvoidmain(String[]args)throwsException{
HttpClienthttpClient=newHttpClient();
httpClient.setMaxConnectionsPerDestination(1);
httpClient.start();
Propertiesproperties=newProperties();
properties.load(HttpProducer.class.getClassLoader().getResourceAsStream("user.properties"));
Stringtopic=properties.getProperty("Topic");//请在user.properties配置您的Topic
Stringurl=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/
Stringak=properties.getProperty("Ak");//请在user.properties配置您的Ak
Stringsk=properties.getProperty("Sk");//请在user.properties配置您的Sk
Stringpid=properties.getProperty("ProducerID");//请在user.properties配置您的ProducerID
Stringdate=String.valueOf(newDate().getTime());
Stringsign=null;
Stringbody="helloonshttp";
StringNEWLINE="\n";
StringsignString;
for(inti=0;i<10;i++){
date=String.valueOf(newDate().getTime());
Requestreq=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");
ContentProvidercontent=newStringContentProvider(body);
req.content(content);
signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date;
System.out.println(signString);
sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")),sk);
req.header(SIGNATURE,sign);
req.header(AK,ak);
req.header(PRODUCERID,pid);
ContentResponseresponse;
response=req.send();
System.out.println("sendmsg:"+response.getStatus()+response.getContentAsString());
}
}
}
4.HTTP接收消息示例代码
请按以下说明设置相应参数并测试HTTP消息接收功能。
packagecom.aliyun.openservice.ons.http.demo;
importjava.nio.charset.Charset;
importjava.util.Date;
importjava.util.List;
importjava.util.Properties;
importorg.eclipse.jetty.client.HttpClient;
importorg.eclipse.jetty.client.api.ContentProvider;
importorg.eclipse.jetty.client.api.ContentResponse;
importorg.eclipse.jetty.client.api.Request;
importorg.eclipse.jetty.client.util.StringContentProvider;
importorg.eclipse.jetty.http.HttpMethod;
importcom.alibaba.fastjson.JSON;
importcom.aliyun.openservice.ons.mqtt.demo.MqttProducer;
importcom.aliyun.openservices.ons.api.impl.authority.AuthUtil;
publicclassHttpConsumer{
publicstaticStringSIGNATURE="Signature";
publicstaticStringNUM="num";
publicstaticStringCONSUMERID="ConsumerID";
publicstaticStringPRODUCERID="ProducerID";
publicstaticStringTIMEOUT="timeout";
publicstaticStringTOPIC="Topic";
publicstaticStringAK="AccessKey";
publicstaticStringBODY="body";
publicstaticStringMSGHANDLE="msgHandle";
publicstaticStringTIME="time";
publicstaticvoidmain(String[]args)throwsException{
HttpClienthttpClient=newHttpClient();
httpClient.setMaxConnectionsPerDestination(1);
httpClient.start();
Propertiesproperties=newProperties();
properties.load(HttpConsumer.class.getClassLoader().getResourceAsStream("user.properties"));
Stringtopic=properties.getProperty("Topic");//请在user.properties配置您的topic
Stringurl=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/
Stringak=properties.getProperty("Ak");//请在user.properties配置您的Ak
Stringsk=properties.getProperty("Sk");//请在user.properties配置您的Sk
Stringcid=properties.getProperty("ConsumerID");//请在user.properties配置您的ConsumerID
Stringdate=String.valueOf(newDate().getTime());
Stringsign=null;
StringNEWLINE="\n";
StringsignString;
System.out.println(NEWLINE+NEWLINE);
while(true){
try{
date=String.valueOf(newDate().getTime());
Requestreq=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&num="+32);
req.method(HttpMethod.GET);
ContentResponseresponse;
signString=topic+NEWLINE+cid+NEWLINE+date;
sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")),sk);
req.header(SIGNATURE,sign);
req.header(AK,ak);
req.header(CONSUMERID,cid);
longstart=System.currentTimeMillis();
response=req.send();
System.out.println("getcost:"+(System.currentTimeMillis()-start)/1000
+""+response.getStatus()+""+response.getContentAsString());
Listlist=null;
if(response.getContentAsString()!=null&&!response.getContentAsString().isEmpty()){
list=JSON.parseArray(response.getContentAsString(),SimpleMessage.class);
}
if(list==null||list.size()==0){
Thread.sleep(100);
continue;
}
System.out.println("sizeis:"+list.size());
for(SimpleMessagesimpleMessage:list){
date=String.valueOf(newDate().getTime());
System.out.println("receivemsg:"+simpleMessage.getBody()+"borntime"+simpleMessage.getBornTime());
req=httpClient.POST(url+"message/?msgHandle="+simpleMessage.getMsgHandle()+"&topic="+topic+"&time="+date);
req.method(HttpMethod.DELETE);
signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date;
sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")),sk);
req.header(SIGNATURE,sign);
req.header(AK,ak);
req.header(CONSUMERID,cid);
response=req.send();
System.out.println("deletemsg:"+response.toString());
}
Thread.sleep(100);
}catch(Exceptione){
e.printStackTrace();
}
}
}
}
5.HTTP示例程序工具类
(1)消息封装类:SimpleMessage.java
packagecom.aliyun.openservice.ons.http.demo;
publicclassSimpleMessage{
privateStringbody;
privateStringmsgId;
privateStringbornTime;
privateStringmsgHandle;
privateintreconsumeTimes;
privateStringtag;
publicvoidsetTag(Stringtag){
this.tag=tag;
}
publicStringgetTag(){
returntag;
}
publicintgetReconsumeTimes(){
returnreconsumeTimes;
}
publicvoidsetReconsumeTimes(intreconsumeTimes){
this.reconsumeTimes=reconsumeTimes;
}
publicvoidsetMsgHandle(StringmsgHandle){
this.msgHandle=msgHandle;
}
publicStringgetMsgHandle(){
returnmsgHandle;
}
publicStringgetBody(){
returnbody;
}
publicvoidsetBody(Stringbody){
this.body=body;
}
publicStringgetMsgId(){
returnmsgId;
}
publicvoidsetMsgId(StringmsgId){
this.msgId=msgId;
}
publicStringgetBornTime(){
returnbornTime;
}
publicvoidsetBornTime(StringbornTime){
this.bornTime=bornTime;
}
}
(2)字符串签名类:MD5.java
packagecom.aliyun.openservice.ons.http.demo;
importjava.io.UnsupportedEncodingException;
importjava.nio.charset.Charset;
importjava.security.MessageDigest;
importjava.sql.SQLException;
importjava.util.Date;
importjava.util.HashMap;
importjava.util.Map;
importjava.util.concurrent.ConcurrentHashMap;
importjava.util.concurrent.locks.ReentrantLock;
importorg.slf4j.LoggerFactory;
publicclassMD5{
privatestaticfinalorg.slf4j.Loggerlog=LoggerFactory.getLogger(MD5.class);
privatestaticchar[]digits={'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};
privatestaticMaprDigits=newHashMap(16);
static{
for(inti=0;i>>4];
out[j++]=digits[0x0F&bt[i]];
}
if(log.isDebugEnabled()){
log.debug("[hash]"+newString(out));
}
returnnewString(out);
}
publicbyte[]string2bytes(Stringstr){
if(null==str){
thrownewNullPointerException("Argumentisnotallowedempty");
}
if(str.length()!=32){
thrownewIllegalArgumentException("Stringlengthmustequals32");
}
byte[]data=newbyte[16];
char[]chs=str.toCharArray();
for(inti=0;i<16;++i){
inth=rDigits.get(chs[i*2]).intValue();
intl=rDigits.get(chs[i*2+1]).intValue();
data[i]=(byte)((h&0x0F)<<4|l&0x0F);
}
returndata;
}
}
希望本篇文章对您有所帮助
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。