Establish Apache ActiveMQ from Scratch

Abstract: Apache ActiveMQ is an open source message broker which fosters the communication from more than one client or server. This is a tutorial. This tutorial tells you how to establish ActiveMQ environment in your computer and write a producer and a consumer to communicate HelloWorld with C++.

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件,是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可运行。这篇博文将介绍如何在自己的电脑上搭建起ActiveMQ的环境并完成一个消息发送者和消息接收者互相通信HelloWorld的一个C++演示程序。
注:以下均为64位的安装过程,32位同理。

1 安装Java SDK

首先先确认自己的电脑有没有安装Java SDK,如果没安装Java SDK会报错,做法如下(如果已安装则跳过下面步骤):

  • 下载jre-8u45-windows-x64.exe(链接)和java_ee_sdk-6u4-jdk7-windows-x64.exe(链接
  • 安装JRE(Java SE Runtime Environment)jre-8u45-windows-x64.exe,安装路径选默认:C:\Program Files\Java\jre1.8.0_144
  • 安装J2EEjava_ee_sdk-6u4-jdk7-windows-x64.exe,假如其被放在E盘下,打开CMD,进入安装程序所在目录 > e:回车
  • 关键步骤:java_ee_sdk-6u4-jdk7-windows-x64.exe -j "C:\Program Files\Java\jre1.8.0_144"
  • 安装开始,可以选择自己设置的安装路径,例如D:\glassfish3,其它选项均可选择默认进行安装
  • 安装SDK成功。

2 ActiveMQ服务端的安装

2.1 下载

首先下载apache-activemq-5.15.0-bin.zip压缩文件,官方下载地址:http://activemq.apache.org/download.html
解压文件到本地目录下,从它的目录来说,还是很简单的:

  • bin存放的是脚本文件
  • conf存放的是基本配置文件
  • data存放的是日志文件
  • docs存放的是说明文档
  • examples存放的是简单的实例
  • lib存放的是activemq所需jar包
  • webapps用于存放项目的目录

2.2 启动

找到\bin\win64目录下的activemq.bat文件,双击运行即可。

2.3 测试

ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动:netstat -an|find “61616”

1
2
C:\Documents and Settings\Administrator>netstat -an|find "61616" 
TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING

2.4 监控

ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。
admin:http://127.0.0.1:8161/admin/,用户名和密码都是admin。
至此,服务端启动完毕。停止服务器,只需要按着Ctrl+Shift+C,之后输入Y即可。

3 依赖库的编译

首先编译依赖库:

libapr-1.dlllibapr-1.liblibapriconv-1.liblibaprutil-1.liblibcppunit.liblibactivemq-cpp.lib ,一共1个dll文件、5个lib文件,而且生成有先后顺序,生成方法如下。

3.1 生成libapr

下载apr-1.6.2-win32-src.zip ,解压缩后用cmake转成vs2010 x64的工程,把生成出来的apr.h文件拷回解压缩的include文件夹,这时候的include文件夹也是之后生成库文件需要依赖的头文件,命名为apr

打开cmake生成的后的文件夹里面APR.sln ,选择好所需要的配置(如Release)和平台(如x64),编译libapr-1工程,生成libapr-1.dlllibapr-1.lib

3.2 生成libapriconv

下载apr-iconv-1.2.1-win32-src-r2.zip,解压缩打开apriconv.dsp,并转成vs2010版本。

apr文件夹拷进include文件夹,对工程属性进行配置,选择好所需要的配置(如Release)和平台(如x64),编译apriconv工程,生成apriconv-1.lib

3.3 生成libaprutil

下载apr-util-1.6.0-win32-src.zip,解压缩打开aprutil.dsp,并转成vs2010版本。

apr文件夹拷进include文件夹,对工程属性进行配置,选择好所需要的配置(如Release)和平台(如x64),编译aprutil工程,生成aprutil-1.lib

3.4 生成libcppunit

下载cppunit-1.12.1.tar.gz,解压缩打开\src\cppunit\cppunit.dsp,并转成vs2010版本。

选择好所需要的配置(如Release)和平台(如x64),编译cppunit工程,生成cppunit.lib ,为了统一名字更名为libcppunit.lib

3.5 生成libactivemq

下载activemq-cpp-library-3.9.4-src.zip,解压缩打开\vs2010-build\activemq-cpp.sln

将libapr、libapriconv、libaprutil、libcppunit自带的头文件文件夹拷到vs2010-build\include文件夹下,对工程属性进行配置,选择好所需要的配置(如Release)和平台(如x64),编译activemq-cpp工程,生成libactivemq-cpp.lib

4 写出一个通过ActiveMQ通信的HelloWorld

新建一个vs2010的工程,在inc文件夹下拷贝libapr、libapriconv、libaprutil、libcppunit自带的头文件文件夹,以及libactivemq自带头文件文件夹中的examples和main文件夹,一共6个文件夹。

在\lib\x64文件夹下拷贝libapr-1.liblibapriconv-1.liblibaprutil-1.liblibcppunit.liblibactivemq-cpp.lib ,一共5个文件。

在\bin\x64文件夹下拷贝libapr-1.dll (注:\bin\x86平台不需要此dll)。

在工程中新建main.cpp,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// START SNIPPET: demo

#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/lang/System.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>

using namespace activemq::core;
using namespace decaf::util::concurrent;
using namespace decaf::util;
using namespace decaf::lang;
using namespace cms;
using namespace std;

class HelloWorldProducer : public Runnable {
private:

Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
int numMessages;
bool useTopic;
bool sessionTransacted;
std::string brokerURI;

private:

HelloWorldProducer(const HelloWorldProducer&);
HelloWorldProducer& operator=(const HelloWorldProducer&);

public:

HelloWorldProducer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false) :
connection(NULL),
session(NULL),
destination(NULL),
producer(NULL),
numMessages(numMessages),
useTopic(useTopic),
sessionTransacted(sessionTransacted),
brokerURI(brokerURI) {
}

virtual ~HelloWorldProducer(){
cleanup();
}

void close() {
this->cleanup();
}

virtual void run() {

try {

// Create a ConnectionFactory
auto_ptr<ConnectionFactory> connectionFactory(
ConnectionFactory::createCMSConnectionFactory(brokerURI));

// Create a Connection
connection = connectionFactory->createConnection();
connection->start();

// Create a Session
if (this->sessionTransacted) {
session = connection->createSession(Session::SESSION_TRANSACTED);
} else {
session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
}

// Create the destination (Topic or Queue)
if (useTopic) {
destination = session->createTopic("TEST.FOO");
} else {
destination = session->createQueue("TEST.FOO");
}

// Create a MessageProducer from the Session to the Topic or Queue
producer = session->createProducer(destination);
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

// Create the Thread Id String
string threadIdStr = Long::toString(Thread::currentThread()->getId());

// Create a messages
string text = (string) "Hello world! from thread " + threadIdStr;

for (int ix = 0; ix < numMessages; ++ix) {
std::auto_ptr<TextMessage> message(session->createTextMessage(text));
message->setIntProperty("Integer", ix);
printf("Sent message #%d from thread %s\n", ix + 1, threadIdStr.c_str());
producer->send(message.get());
}

} catch (CMSException& e) {
e.printStackTrace();
}
}

private:

void cleanup() {

if (connection != NULL) {
try {
connection->close();
} catch (cms::CMSException& ex) {
ex.printStackTrace();
}
}

// Destroy resources.
try {
delete destination;
destination = NULL;
delete producer;
producer = NULL;
delete session;
session = NULL;
delete connection;
connection = NULL;
} catch (CMSException& e) {
e.printStackTrace();
}
}
};

class HelloWorldConsumer : public ExceptionListener,
public MessageListener,
public Runnable {

private:

CountDownLatch latch;
CountDownLatch doneLatch;
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
long waitMillis;
bool useTopic;
bool sessionTransacted;
std::string brokerURI;

private:

HelloWorldConsumer(const HelloWorldConsumer&);
HelloWorldConsumer& operator=(const HelloWorldConsumer&);

public:

HelloWorldConsumer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) :
latch(1),
doneLatch(numMessages),
connection(NULL),
session(NULL),
destination(NULL),
consumer(NULL),
waitMillis(waitMillis),
useTopic(useTopic),
sessionTransacted(sessionTransacted),
brokerURI(brokerURI) {
}

virtual ~HelloWorldConsumer() {
cleanup();
}

void close() {
this->cleanup();
}

void waitUntilReady() {
latch.await();
}

virtual void run() {

try {

// Create a ConnectionFactory
auto_ptr<ConnectionFactory> connectionFactory(
ConnectionFactory::createCMSConnectionFactory(brokerURI));

// Create a Connection
connection = connectionFactory->createConnection();
connection->start();
connection->setExceptionListener(this);

// Create a Session
if (this->sessionTransacted == true) {
session = connection->createSession(Session::SESSION_TRANSACTED);
} else {
session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
}

// Create the destination (Topic or Queue)
if (useTopic) {
destination = session->createTopic("TEST.FOO");
} else {
destination = session->createQueue("TEST.FOO");
}

// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session->createConsumer(destination);

consumer->setMessageListener(this);

std::cout.flush();
std::cerr.flush();

// Indicate we are ready for messages.
latch.countDown();

// Wait while asynchronous messages come in.
doneLatch.await(waitMillis);

} catch (CMSException& e) {
// Indicate we are ready for messages.
latch.countDown();
e.printStackTrace();
}
}

// Called from the consumer since this class is a registered MessageListener.
virtual void onMessage(const Message* message) {

static int count = 0;

try {
count++;
const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message);
string text = "";

if (textMessage != NULL) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}

printf("Message #%d Received: %s\n", count, text.c_str());

} catch (CMSException& e) {
e.printStackTrace();
}

// Commit all messages.
if (this->sessionTransacted) {
session->commit();
}

// No matter what, tag the count down latch until done.
doneLatch.countDown();
}

// If something bad happens you see it here as this class is also been
// registered as an ExceptionListener with the connection.
virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
printf("CMS Exception occurred. Shutting down client.\n");
ex.printStackTrace();
exit(1);
}

private:

void cleanup() {
if (connection != NULL) {
try {
connection->close();
} catch (cms::CMSException& ex) {
ex.printStackTrace();
}
}

// Destroy resources.
try {
delete destination;
destination = NULL;
delete consumer;
consumer = NULL;
delete session;
session = NULL;
delete connection;
connection = NULL;
} catch (CMSException& e) {
e.printStackTrace();
}
}
};

int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {

activemq::library::ActiveMQCPP::initializeLibrary();
{
std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------\n";


// Set the URI to point to the IP Address of your broker.
// add any optional params to the url to enable things like
// tightMarshalling or tcp logging etc. See the CMS web site for
// a full list of configuration options.
//
// http://activemq.apache.org/cms/
//
// Wire Format Options:
// =========================
// Use either stomp or openwire, the default ports are different for each
//
// Examples:
// tcp://127.0.0.1:61616 default to openwire
// tcp://127.0.0.1:61616?wireFormat=openwire same as above
// tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead
//
// SSL:
// =========================
// To use SSL you need to specify the location of the trusted Root CA or the
// certificate for the broker you want to connect to. Using the Root CA allows
// you to use failover with multiple servers all using certificates signed by
// the trusted root. If using client authentication you also need to specify
// the location of the client Certificate.
//
// System::setProperty( "decaf.net.ssl.keyStore", "<path>/client.pem" );
// System::setProperty( "decaf.net.ssl.keyStorePassword", "password" );
// System::setProperty( "decaf.net.ssl.trustStore", "<path>/rootCA.pem" );
//
// The you just specify the ssl transport in the URI, for example:
//
// ssl://localhost:61617
//
std::string brokerURI =
"failover:(tcp://localhost:61616"
// "?wireFormat=openwire"
// "&transport.useInactivityMonitor=false"
// "&connection.alwaysSyncSend=true"
// "&connection.useAsyncSend=true"
// "?transport.commandTracingEnabled=true"
// "&transport.tcpTracingEnabled=true"
// "&wireFormat.tightEncodingEnabled=true"
")";

//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in both consumer an producer.
//============================================================
bool useTopics = false;
bool sessionTransacted = false;
int numMessages = 2000;

long long startTime = System::currentTimeMillis();

HelloWorldProducer producer(brokerURI, numMessages, useTopics);
HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted);

// Start the consumer thread.
Thread consumerThread(&consumer);
consumerThread.start();

// Wait for the consumer to indicate that its ready to go.
consumer.waitUntilReady();

// Start the producer thread.
Thread producerThread(&producer);
producerThread.start();

// Wait for the threads to complete.
producerThread.join();
consumerThread.join();

long long endTime = System::currentTimeMillis();
double totalTime = (double)(endTime - startTime) / 1000.0;

consumer.close();
producer.close();

std::cout << "Time to completion = " << totalTime << " seconds." << std::endl;
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";

}
activemq::library::ActiveMQCPP::shutdownLibrary();
}

// END SNIPPET: demo

代码中的参数useTopic可以进行相关配置,从而实现是进行主题式还是序列式的通信。

配置好对工程属性进行配置,选择好所需要的配置(如Release)和平台(如x64),编译工程,启动ActiveMQ后运行工程,即可看到 producer和consumer在进行HelloWorld的通信。