歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> Linux教程 >> JMS之ActiveMQ Linux下安裝與應用實例

JMS之ActiveMQ Linux下安裝與應用實例

日期:2017/2/28 13:58:46   编辑:Linux教程

JMS之ActiveMQ Linux下安裝與應用實例

1.下載activeMQ安裝包,拷貝到/activeMQ目錄下

apache-activemq-5.10.0-bin.tar.gz,下載地址http://activemq.apache.org/download.html

2.解壓文件到運行目錄

[root@linuxidc softs]# tar -xzvf /server/apache-activemq-5.10.0-bin.tar.gz

3.為了方便管理,重命名

[root@linuxidc softs]# mv apache-activemq-5.10.0 activemq-5.10.0

[root@linuxidc softs]# cd activemq-5.10.0/
[root@linuxidc activemq-5.10.0]# ll
total 6304
-rwxr-xr-x 1 root root 6371237 Jun 5 2014 activemq-all-5.10.0.jar
drwxr-xr-x 5 root root 4096 Jan 11 23:31 bin
drwxr-xr-x 2 root root 4096 Jan 11 23:31 conf
drwxr-xr-x 2 root root 4096 Jan 11 23:31 data
drwxr-xr-x 2 root root 4096 Jan 11 23:31 docs
drwxr-xr-x 8 root root 4096 Jan 11 23:31 examples
drwxr-xr-x 6 root root 4096 Jan 11 23:31 lib
-rw-r--r-- 1 root root 40580 Jun 5 2014 LICENSE
-rw-r--r-- 1 root root 3334 Jun 5 2014 NOTICE
-rw-r--r-- 1 root root 2610 Jun 5 2014 README.txt
drwxr-xr-x 7 root root 4096 Jan 11 23:31 webapps
drwxr-xr-x 3 root root 4096 Jan 11 23:31 webapps-demo
[root@linuxidc activemq-5.10.0]# cd bin/
[root@linuxidc bin]# ll
total 152
-rwxr-xr-x 1 root root 22126 Jun 5 2014 activemq
-rwxr-xr-x 1 root root 5665 Jun 5 2014 activemq-admin
-rw-r--r-- 1 root root 15954 Jun 5 2014 activemq.jar
-rwxr-xr-x 1 root root 6189 Jun 5 2014 diag
drwxr-xr-x 2 root root 4096 Jan 11 23:31 linux-x86-32
drwxr-xr-x 2 root root 4096 Jan 11 23:31 linux-x86-64
drwxr-xr-x 2 root root 4096 Jan 11 23:31 macosx
-rwxr-xr-x 1 root root 83820 Jun 5 2014 wrapper.jar

4.啟動服務

[root@linuxidc bin]# ./activemq start
INFO: Using default configuration
(you can configure options in one of these file: /etc/default/activemq /root/.activemqrc)

INFO: Invoke the following command to create a configuration file
./activemq setup [ /etc/default/activemq | /root/.activemqrc ]

INFO: Using java '/softs/jdk1.6.0_30/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/softs/activemq-5.10.0/data/activemq-linuxidc.pid' (pid '28962')

5.查看是否啟動成功

[root@linuxidc bin]#
[root@linuxidc bin]# ps -ef | grep activemq
root 28962 1 32 23:32 pts/0 00:00:04 /softs/jdk1.6.0_30/bin/java -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/softs/activemq-5.10.0/conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/softs/activemq-5.10.0/tmp -Dactivemq.classpath=/softs/activemq-5.10.0/conf; -Dactivemq.home=/softs/activemq-5.10.0 -Dactivemq.base=/softs/activemq-5.10.0 -Dactivemq.conf=/softs/activemq-5.10.0/conf -Dactivemq.data=/softs/activemq-5.10.0/data -jar /softs/activemq-5.10.0/bin/activemq.jar start
root 29011 28898 0 23:32 pts/0 00:00:00 grep activemq
[root@linuxidc bin]#
[root@linuxidc bin]#

6.停止服務

[root@linuxidc data]#
[root@linuxidc data]# kill 28962
[root@linuxidc data]#
[root@linuxidc data]# ps -ef | grep activemq
root 29078 28898 0 23:42 pts/0 00:00:00 grep activemq
[root@linuxidc data]#

到此環境准備成功

demo應用

package com.wzh.activemq;

import java.io.Serializable;

public class User implements Serializable{

private static final long serialVersionUID = 1L;

private String username ;

private String password ;

public User(String username,String password){
this.username = username ;
this.password = password ;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

@Override
public String toString() {
// TODO Auto-generated method stub
return "[username="+username+",password="+password+"]" ;
}

}

點對點:
生產者:

package com.wzh.activemq;

import java.io.Serializable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class P2PMessageProducer {

protected String username = ActiveMQConnection.DEFAULT_USER;
protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
//protected String brokerURL = "tcp://127.0.0.1:61616";
protected String brokerURL = "tcp://120.24.85.167:61616";

protected static transient ConnectionFactory factory;
protected transient Connection connection;

public static void main(String[] args) {

try {
new P2PMessageProducer().sendObjectMessage(new User("wzh","q123456"));
new P2PMessageProducer().sendMapMessage();
new P2PMessageProducer().sendTextMessage("海,你好");
} catch (Exception e) {
e.printStackTrace();
}
}

public P2PMessageProducer() {

try {
factory = new ActiveMQConnectionFactory(username, password,
brokerURL);
connection = factory.createConnection();
connection.start();
} catch (JMSException jmse) {
close();
}
}

/**
* 初始化連接信息
*/
public P2PMessageProducer(String username, String password, String brokerURL)
throws JMSException {
this.username = username;
this.password = password;
this.brokerURL = brokerURL;

factory = new ActiveMQConnectionFactory(username, password, brokerURL);
connection = factory.createConnection();
try {
connection.start();
} catch (JMSException jmse) {
connection.close();
throw jmse;
}
}

/**
* 關閉連接
*/
public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}


protected void sendObjectMessage(Serializable serializable) throws JMSException {
Session session = null;
try {

session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("MessageQueue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

Message message = session.createObjectMessage(serializable);

producer.send(message);

session.commit();

} catch (JMSException e) {
try {
session.rollback() ;
} catch (JMSException e1) {
e1.printStackTrace();
}
throw e ;
} finally {
close();
}

}


protected void sendTextMessage(String text) throws JMSException {
Session session = null;
try {

session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("MessageQueue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

Message message = session.createTextMessage(text);

producer.send(message);
session.commit();

} catch (JMSException e) {
try {
session.rollback() ;
} catch (JMSException e1) {
e1.printStackTrace();
}
throw e ;
} finally {
close();
}

}

protected void sendMapMessage() throws JMSException {
Session session = null;
try {

session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("MessageQueue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

MapMessage message = session.createMapMessage();
message.setString("stock", "string");
message.setDouble("price", 11.14);
producer.send(message);

session.commit();

} catch (JMSException e) {
try {
session.rollback() ;
} catch (JMSException e1) {
e1.printStackTrace();
}
throw e ;
} finally {
close();
}

}

}

消費者:

package com.wzh.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class P2PMessageConsumer {

protected String username = ActiveMQConnection.DEFAULT_USER;
protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
//protected String brokerURL = "tcp://127.0.0.1:61616";
protected String brokerURL = "tcp://120.24.85.167:61616";

protected static transient ConnectionFactory factory;
protected transient Connection connection;

public static void main(String[] args) {
P2PMessageConsumer consumer = new P2PMessageConsumer();
consumer.receiveMessage();
}

public P2PMessageConsumer() {

try {
factory = new ActiveMQConnectionFactory(username, password,
brokerURL);
connection = factory.createConnection();
connection.start();
} catch (JMSException jmse) {
close();
}
}

public P2PMessageConsumer(String username, String password, String brokerURL)
throws JMSException {
this.username = username;
this.password = password;
this.brokerURL = brokerURL;

factory = new ActiveMQConnectionFactory(username, password, brokerURL);
connection = factory.createConnection();
try {
connection.start();
} catch (JMSException jmse) {
connection.close();
throw jmse;
}
}

public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}

protected void receiveMessage() {
Session session = null;
try {

session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("MessageQueue");
MessageConsumer consumer = session.createConsumer(destination);

while (true) {
Message message = consumer.receive();

if (null != message) {

if (message instanceof ObjectMessage) {
System.out.println("deal ObjectMessage....");
dealObjectMessage((ObjectMessage) message);
} else if (message instanceof MapMessage) {
System.out.println("deal MapMessage....");
dealMapMessage((MapMessage) message);
} else if (message instanceof TextMessage) {
System.out.println("deal TextMessage....");
dealTextMessage((TextMessage) message);
}

} else {
break;
}

}

} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
try {
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}

}

}

/**
*
* 處理 TextMessage消息
*
* @throws JMSException
*/
private void dealTextMessage(TextMessage message) throws JMSException {
String text = message.getText();
System.out.println("text = " + text);

}

/**
*
* 處理 MapMessage消息
*
* @throws JMSException
*/
private void dealMapMessage(MapMessage message) throws JMSException {
String stack = message.getString("stock");
Double price = message.getDouble("price");
System.out.println("stock = " + stack + " , price =" + price);
}

/**
* 處理ObjectMessage消息
*/
private void dealObjectMessage(ObjectMessage message) throws JMSException {

User user = (User) message.getObject();
System.out.println(user.toString());

}

}

發布與訂閱:
-----------------------
消息發布者

package com.wzh.activemq;

import java.io.Serializable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Publish {

protected String username = ActiveMQConnection.DEFAULT_USER;
protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
//protected String brokerURL = "tcp://127.0.0.1:61616";
protected String brokerURL = "tcp://120.24.85.167:61616";

protected static transient ConnectionFactory factory;
protected transient Connection connection;

public static void main(String[] args) {
try {
new Publish().sendObjectMessage(new User("wzh","q123456"));
new Publish().sendMapMessage();
new Publish().sendTextMessage("海,你好");
} catch (Exception e) {
e.printStackTrace();
}
}

public Publish() {

try {
factory = new ActiveMQConnectionFactory(username, password,
brokerURL);
connection = factory.createConnection();
connection.start();
} catch (JMSException jmse) {
close();
}
}

public Publish(String username, String password, String brokerURL)
throws JMSException {
this.username = username;
this.password = password;
this.brokerURL = brokerURL;

factory = new ActiveMQConnectionFactory(username, password, brokerURL);
connection = factory.createConnection();
try {
connection.start();
} catch (JMSException jmse) {
connection.close();
throw jmse;
}
}

public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}

protected void sendObjectMessage(Serializable serializable) throws JMSException {
Session session = null;
try {

session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

Message message = session.createObjectMessage(serializable);

producer.send(message);

session.commit();

} catch (JMSException e) {
try {
session.rollback() ;
} catch (JMSException e1) {
e1.printStackTrace();
}
throw e ;
} finally {
close();
}

}


protected void sendTextMessage(String text) throws JMSException {
Session session = null;
try {

session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

Message message = session.createTextMessage(text);

producer.send(message);
session.commit();

} catch (JMSException e) {
try {
session.rollback() ;
} catch (JMSException e1) {
e1.printStackTrace();
}
throw e ;
} finally {
close();
}

}

protected void sendMapMessage() throws JMSException {
Session session = null;
try {

session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

MapMessage message = session.createMapMessage();
message.setString("stock", "string");
message.setDouble("price", 11.14);
producer.send(message);

session.commit();

} catch (JMSException e) {
try {
session.rollback() ;
} catch (JMSException e1) {
e1.printStackTrace();
}
throw e ;
} finally {
close();
}

}
}

消息訂閱者:

package com.wzh.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Subscriber {

protected String username = ActiveMQConnection.DEFAULT_USER;
protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
//protected String brokerURL = "tcp://127.0.0.1:61616";
protected String brokerURL = "tcp://120.24.85.167:61616";

protected static transient ConnectionFactory factory;
protected transient Connection connection;

public static void main(String[] args) {
Subscriber consumer = new Subscriber();
consumer.receiveMessage();
}

public Subscriber() {

try {
factory = new ActiveMQConnectionFactory(username, password,
brokerURL);
connection = factory.createConnection();
connection.start();
} catch (JMSException jmse) {
close();
}
}

public Subscriber(String username, String password, String brokerURL)
throws JMSException {
this.username = username;
this.password = password;
this.brokerURL = brokerURL;

factory = new ActiveMQConnectionFactory(username, password, brokerURL);
connection = factory.createConnection();
try {
connection.start();
} catch (JMSException jmse) {
connection.close();
throw jmse;
}
}

public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}

protected void receiveMessage() {
Session session = null;
try {

session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
MessageConsumer consumer = session.createConsumer(topic);

consumer.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {

if (message instanceof ObjectMessage) {
System.out.println("deal ObjectMessage....");
dealObjectMessage((ObjectMessage) message);
} else if (message instanceof MapMessage) {
System.out.println("deal MapMessage....");
dealMapMessage((MapMessage) message);
} else if (message instanceof TextMessage) {
System.out.println("deal TextMessage....");
dealTextMessage((TextMessage) message);
}

}
}) ;

} catch (Exception e) {
e.printStackTrace();
} finally {
/*if (session != null) {
try {
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}*/

}

}

/**
*
* 處理 TextMessage消息
*
* @throws JMSException
*/
private void dealTextMessage(TextMessage message) {
try {
String text = message.getText();
System.out.println("text = " + text);
} catch (JMSException e) {
e.printStackTrace();
}

}

/**
*
* 處理 MapMessage消息
*
* @throws JMSException
*/
private void dealMapMessage(MapMessage message){
try {
String stack = message.getString("stock");
Double price = message.getDouble("price");
System.out.println("stock = " + stack + " , price =" + price);
} catch (JMSException e) {
e.printStackTrace();
}
}

/**
* 處理ObjectMessage消息
*/
private void dealObjectMessage(ObjectMessage message){

try {
User user = (User) message.getObject();
System.out.println(user.toString());
} catch (JMSException e) {
e.printStackTrace();
}

}

}

推薦閱讀:

Linux系統下ActiveMQ 安裝 http://www.linuxidc.com/Linux/2012-03/55623.htm

Ubuntu下的ACTIVEMQ服務器 http://www.linuxidc.com/Linux/2008-07/14587.htm

CentOS 6.5啟動ActiveMQ報錯解決 http://www.linuxidc.com/Linux/2015-08/120898.htm

Spring+JMS+ActiveMQ+Tomcat實現消息服務 http://www.linuxidc.com/Linux/2011-10/44632.htm

Linux環境下面ActiveMQ端口號設置和WEB端口號設置 http://www.linuxidc.com/Linux/2012-01/51100.htm

Copyright © Linux教程網 All Rights Reserved