C#实现同Active MQ通讯的方法
本文实例讲述了C#实现同ActiveMQ通讯的方法。分享给大家供大家参考,具体如下:
内容概要:
主要以源码的形式介绍如何用C#实现同ActiveMQ的通讯。本文假设你已经正确安装JDK1.6.x,了解ActiveMQ并有一定的编程基础。
正文:
JMS程序的最终目的是生产和消费的消息能被其他程序使用,JMS的Message是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS程序格式的消息。
Message由消息头,属性和消息体三部份组成。
ActiveMQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。
示例代码:
usingSystem;
usingSystem.Collections.Generic;
usingSystem.Linq;
usingSystem.Text;
usingSystem.Windows;
usingSystem.Windows.Controls;
usingSystem.Windows.Data;
usingSystem.Windows.Documents;
usingSystem.Windows.Input;
usingSystem.Windows.Media;
usingSystem.Windows.Media.Imaging;
usingSystem.Windows.Navigation;
usingSystem.Windows.Shapes;
usingApache.NMS;
usingSystem.Diagnostics;
usingApache.NMS.Util;
usingSystem.Windows.Threading;
/*
*功能描述:C#使用ActiveMQ示例
*修改次数:2
*最后更新:byKagula,2012-07-31
*
*前提条件:
*[1]apache-activemq-5.4.2
*[2]Apache.NMS.ActiveMQ-1.5.6-bin
*[3]WinXPSP3
*[4]VS2008SP1
*[5]WPF工程With.NETFramework3.5
*
*启动
*
*不带安全控制方式启动
*[你的解压路径]\apache-activemq-5.4.2\bin\activemq.bat
*
*安全方式启动
*添加环境变量:ACTIVEMQ_ENCRYPTION_PASSWORD=activemq
*[你的解压路径]\apache-activemq-5.4.2\bin>activemqxbean:file:../conf/activemq-security.xml
*
*ActiveMQ管理地址
*http://127.0.0.1:8161/admin/
*添加访问"http://127.0.0.1:8161/admin/"的限制
*
*第一步:添加访问限制
*修改D:\apache\apache-activemq-5.4.2\conf\jetty.xml文件
*下面这行编码,原
*<propertyname="authenticate"value="true"/>
*修改为
*<propertyname="authenticate"value="false"/>
*
*第二步:修改登录用户名密码,缺省分别为admin,admin
*D:\apache\apache-activemq-5.4.2\conf\jetty-realm.properties
*
*用户管理(前提:以安全方式启动ActiveMQ)
*
*在[你的解压路径]\apache-activemq-5.4.2\conf\credentials.properties文件中修改默认的用户名密码
*在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中可以添加新的用户名
*e.g.添加oa用户,密码同用户名。
*<authenticationUserusername="oa"password="oa"groups="users,admins"/>
*
*在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中你还可以设置指定的Topic或Queue
*只能被哪些用户组read或write。
*
*
*配置C#withWPF项目
*项目的[Application]->[TargetFramework]属性设置为[.NETFramework3.5](这是VS2008WPF工程的默认设置)
*添加[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\lib\Apache.NMS\net-3.5\Apache.NMS.dll的引用
*Apache.NMS.dll相当于接口
*
*如果是以Debug方式调试
*把[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\build\net-3.5\debug\目录下的
*Apache.NMS.ActiveMQ.dll文件复制到你项目的Debug目录下
*Apache.NMS.ActiveMQ.dll相当于实现
*
*如果是以Release方式调试
*参考上文,去取Apache.NMS,Release目录下相应的DLL文件,并复制到你项目的Release目录下。
*
*
*参考资料
*[1]《C#调用ActiveMQ官方示例》http://activemq.apache.org/nms/examples.html
*[2]《ActiveMQNMS下载地址》http://activemq.apache.org/nms/activemq-downloads.html
*[3]《ActiveMQ在C#中的应用示例》https://www.nhooo.com/article/87956.htm
*[4]《NMSAPIReference》http://activemq.apache.org/nms/nms-api.html
*/
namespacetestActiveMQSubscriber
{
///<summary>
///InteractionlogicforWindow1.xaml
///</summary>
publicpartialclassWindow1:Window
{
privatestaticIConnectionFactoryconnFac;
privatestaticIConnectionconnection;
privatestaticISessionsession;
privatestaticIDestinationdestination;
privatestaticIMessageProducerproducer;
privatestaticIMessageConsumerconsumer;
protectedstaticITextMessagemessage=null;
publicWindow1()
{
InitializeComponent();
initAMQ("MyFirstTopic");
}
privatevoidinitAMQ(StringstrTopicName)
{
try
{
connFac=newNMSConnectionFactory(newUri("activemq:failover:(tcp://localhost:61616)"));
//新建连接
//connection=connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码
//如果你要持久“订阅”,则需要设置ClientId,这样程序运行当中被停止,恢复运行时,能拿到没接收到的消息!
connection.ClientId="testinglistener";
connection=connFac.CreateConnection();//如果你是缺省方式启动ActiveMQ服务,则不需填用户名、密码
//创建Session
session=connection.CreateSession();
//发布/订阅模式,适合一对多的情况
destination=SessionUtil.GetDestination(session,"topic://"+strTopicName);
//新建生产者对象
producer=session.CreateProducer(destination);
producer.DeliveryMode=MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留
//新建消费者对象:普通“订阅”模式
//consumer=session.CreateConsumer(destination);//不需要持久“订阅”
//新建消费者对象:持久"订阅"模式:
//持久“订阅”后,如果你的程序被停止工作后,恢复运行,
//从第一次持久订阅开始,没收到的消息还可以继续收
consumer=session.CreateDurableConsumer(
session.GetTopic(strTopicName)
,connection.ClientId,null,false);
//设置消息接收事件
consumer.Listener+=newMessageListener(OnMessage);
//启动来自ActiveMQ的消息侦听
connection.Start();
}
catch(Exceptione)
{
//初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!
Debug.WriteLine(e.Message);
}
}
privatevoidSendMsg2Topic_Click(objectsender,RoutedEventArgse)
{
//发送消息
ITextMessagerequest=session.CreateTextMessage(DateTime.Now.ToLocalTime()+""+tbMsg.Text);
producer.Send(request);
}
protectedvoidOnMessage(IMessagereceivedMsg)
{
//接收消息
message=receivedMsgasITextMessage;
//UI线程,显示收到的消息
Dispatcher.Invoke(DispatcherPriority.Normal,newAction(()=>
{
DateTimedt=newDateTime();
ListBoxItemlbi=newListBoxItem();
lbi.Content=DateTime.Now.ToLocalTime()+""+message.Text;
lbR.Items.Add(lbi);
}));
}
}
}
队列通讯方式,消费者例子
usingSystem;
usingSystem.Collections.Generic;
usingSystem.Linq;
usingSystem.Text;
usingApache.NMS;
usingSystem.Diagnostics;
usinglog4net;
usingApache.NMS.Util;
usingSystem.Collections;
namespaceCat8637AutoCallServer
{
publicclassSMTask
{
publicStringCallee{get;set;}
publicStringCheckNumber{get;set;}
publicintDeadline{get;set;}
publicoverrideStringToString()
{
returnString.Format("Callee={0},CheckNumber={1},Deadline={2}",
Callee,CheckNumber,Deadline);
}
}
/*
*负责接收任务,并把任务放在任务等待队列中。
*/
publicclassMQClient
{
privatestaticreadonlyILoglogger=LogManager.GetLogger(typeof(MQClient));
privatestaticIConnectionconnection=null;
privatestaticISessionsession=null;
Queue_voiceSMTasks=newQueue();
publicMQClient()
{
try
{
IConnectionFactoryfactory=newNMSConnectionFactory(newUri("activemq:failover:(tcp://localhost:61616)"));
//新建连接
//connection=connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码
connection=factory.CreateConnection();
session=connection.CreateSession();
IMessageConsumerconsumer=session.CreateConsumer(session.GetQueue("TaskIssue_VoiceSM"));
consumer.Listener+=newMessageListener(OnMessage);
connection.Start();
}
catch(Exceptionex)
{
Debug.WriteLine(ex.Message);
}
}
protectedvoidOnMessage(IMessagereceivedMsg)
{
IMessagemessage=receivedMsgasITextMessage;
SMTasksmTask=newSMTask();
smTask.Callee=message.Properties["Callee"]asString;
smTask.CheckNumber=message.Properties["Message"]asString;
smTask.Deadline=Convert.ToInt32(message.Properties["deadline"]asString);
logger.Info("Received:"+smTask.ToString());
lock(_voiceSMTasks)
{
_voiceSMTasks.Enqueue(smTask);
}
}
publicSMTaskGetVoiceSMTask()
{
SMTaskresult=null;
lock(_voiceSMTasks)
{
if(_voiceSMTasks.Count>0)
{
result=_voiceSMTasks.Dequeue()asSMTask;
}
}
returnresult;
}
}
}
队列通讯方式,生产者例子
privatevoidSend_Click(objectsender,RoutedEventArgse)
{
try
{
IDestinationdestination=SessionUtil.GetDestination(session,"queue://TaskIssue_VoiceSM");
//新建生产者对象
IMessageProducerproducer=session.CreateProducer(destination);
producer.DeliveryMode=MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留
ITextMessagerequest=session.CreateTextMessage();
request.NMSCorrelationID="TestVoiceSM";//这里我填了应用程序的名称。
request.Properties["Callee"]=tbCallee.Text;
request.Properties["Message"]=tbCheckNumber.Text;
request.Properties["deadline"]=tbValidDuration.Text;
producer.Send(request);
}
catch(Exceptionex)
{
//初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!
Debug.WriteLine(ex.Message);
}
}
privatevoidWindow_Closed(objectsender,EventArgse)
{
try
{
if(session==null)
return;
//if(connection==null)
//return;
session.Close();
//connection.Close();
}
catch(Exceptionex)
{
Debug.WriteLine(ex.Message);
}
}
更多关于C#相关内容感兴趣的读者可查看本站专题:《C#窗体操作技巧汇总》、《C#常见控件用法教程》、《WinForm控件用法总结》、《C#程序设计之线程使用技巧总结》、《C#操作Excel技巧总结》、《C#中XML文件操作技巧汇总》、《C#数据结构与算法教程》、《C#数组操作技巧总结》及《C#面向对象程序设计入门教程》
希望本文所述对大家C#程序设计有所帮助。