天天看點

大資料實戰之環境搭建(十)

  1. Html5, 雲計算,移動網際網路,大資料你知道多少,如果不知道多少,請抓緊時間學習。

今天要說的是消息隊列ActiveMQ,這個和cassandra一樣也是apache的産品,開源的,支援很多用戶端,比如java,C#,ruby,python等。ActiveMQ 是一個完全支援JMS和J2EE規範的 JMS Provider實作。在介紹jms之前,先要介紹一下它的規範,俗話說無規矩不成方圓,就像序列化和反序列化一樣,大家要遵循一定的規則才能互動。

JMS的基本構件有

(1).連接配接工廠, 是用戶端用來建立連接配接的對象,ActiveMQConnectionFactory類。

(2). 連接配接,JMS Connection封裝了用戶端和JMS提供者之間的一個虛拟連接配接,ActiveMQConnectio類。

(3). 會話,JMS Session是生産者和消費者之間Produce message和Consume message的上下文。

會話用于建立消息的生産者,消費者和消息。這個上下文保持了會話的事務性,即發送消息和接受消息被組合到了一個事務中。

(4). 目的地,這個目的地表明了生産消息的目标和消費消息的目标,Destination對象。

JMS規範中定義了兩種消息傳遞域,一種是點對點,一種是釋出/訂閱。點對點的特點是一個消息隻能有一個消費者,消費者和生産者之間沒有時間上的相關性,無論消費者在生産者發送消息的時候是否是出于運作狀态,他都可以提取到消息。訂閱釋出模式的特點是一個生産者可以有多個消費者,這種模式存在時間上的相關性,消費者隻能消費自從他訂閱之後的消息,之前的消息是不能消費的。但是JMS規範允許客戶建立持久訂閱,即使是在生産者在消費者發送消息的時候處于一個未激活的狀态,等他激活以後,依然可以消費未激活之前生産者發送的消息

(5).生産者,是由會話建立的一個對象,它主要職責是将消息發送到目的地

(6).消費者,是由會話建立的一個對象,它主要是接收生産者發送到目的地的消息

消費者可以同步消費和異步消費。

OK,概念就先講到這裡,我們看一下環境

首先從apache 網站上下載下傳ActiveMQ linux版本

大資料實戰之環境搭建(十)

在這裡我下載下傳的版本是5.8.0。OK我們把它放到FTP Server上,在CentOS上拷貝至opt檔案夾。

大資料實戰之環境搭建(十)

ok我們用指令解壓

大資料實戰之環境搭建(十)

解壓完成後,直接運作啟動腳本

[root@bogon opt]# ls
apache-activemq-5.8.0  apache-activemq-5.8.0-bin.tar.gz
[root@bogon opt]# cd apache-activemq-5.8.0
[root@bogon apache-activemq-5.8.0]# ls
activemq-all-5.8.0.jar  docs     NOTICE           webapps-demo
bin                     example  README.txt       WebConsole-README.txt
conf                    lib      user-guide.html
data                    LICENSE  webapps
[root@bogon apache-activemq-5.8.0]# cd bin
[root@bogon bin]# ls
activemq        activemq.jar  linux-x86-32  macosx
activemq-admin  diag          linux-x86-64  wrapper.jar
[root@bogon bin]# sh activemq      

OK,開始啟動,到這一步啟動成功

大資料實戰之環境搭建(十)

我們看一下它的管理界面,預設端口是8161,在conf目錄下的jetty.xml中

<property name="connectors">
<list>
<bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector">
<property name="port" value="8161" />
</bean>
<!--
Enable this connector if you wish to use https with web console
-->
<!--
<bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
<property name="port" value="8162" />
<property name="keystore" value="file:${activemq.conf}/broker.ks" />
<property name="password" value="password" />
</bean>
-->
</list>
</property>      

OK,我們在浏覽器中輸入Http://localhost:8161/admin,提示輸入使用者名和密碼,使用者名:admin,密碼admin,

大資料實戰之環境搭建(十)

這個使用者名和密碼可以在conf/jetty-realm.properties檔案中去修改。

大資料實戰之環境搭建(十)

OK,管理界面如下

大資料實戰之環境搭建(十)

因為哥一直做.net開發,是以還是以.net來做個demo,來做個消息的發送可接收。首先需要下載下傳.net用戶端。去網站上找到下載下傳.net 用戶端的地方

大資料實戰之環境搭建(十)
大資料實戰之環境搭建(十)

在這裡我下載下傳的是1.5.3版本,因為1.6.0版本連接配接有問題。下載下傳下來之後,我們使用.net 4.0版本

大資料實戰之環境搭建(十)

OK,建立一個solution,建立兩個WinForm的工程,如下

大資料實戰之環境搭建(十)

注意要引用上面的兩個dll,這兩個dll分别在build和lib下

大資料實戰之環境搭建(十)

我們看一下代碼,沒什麼好解釋的,先看生産者代碼

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
namespace ActiveMQSender
{
public partial class FrmMsgSender : Form
{
IConnectionFactory factory;
IConnection connection;
ISession session;
IDestination destination;
IMessageProducer prod;
ITextMessage streamMessage;
~FrmMsgSender()
{
session.Close();
connection.Close();
}
public FrmMsgSender()
{
InitializeComponent();
this.Init();
}
private void Init()
{
factory = new ConnectionFactory("tcp://192.168.192.128:61616/");
connection = factory.CreateConnection("admin", "admin");
session = connection.CreateSession();
destination = new ActiveMQTopic("Bruce Test");
prod = session.CreateProducer(destination);
streamMessage = prod.CreateTextMessage();
}
private void btnSend_Click(object sender, EventArgs e)
{
streamMessage.Text=txtSendMsg.Text.Trim();
prod.Send(streamMessage,MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
}
}
}      

再看消費者代碼

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
namespace FrmMsgReceiver
{
public partial class FrmMsgReceiver : Form
{
IConnectionFactory factory;
IConnection connection;
ISession session;
IDestination destination;
IMessageConsumer consumer;
~FrmMsgReceiver()
{
session.Close();
connection.Close();
}
public FrmMsgReceiver()
{
InitializeComponent();
}
private void Init()
{
factory = new ConnectionFactory("tcp://192.168.192.128:61616/");
connection = factory.CreateConnection("admin", "admin");
session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
destination = new ActiveMQTopic("Bruce Test");
consumer = session.CreateConsumer(destination);
}
private void FrmMsgReceiver1_Load(object sender, EventArgs e)
{
this.Init();
IMessage streamMessage = consumer.Receive();
this.txtReceiveMsg.Text = (streamMessage as ActiveMQStreamMessage).ReadString();
}
}
}      

OK,就是這麼簡單,我也不寫運作過程了,啟動VMPlayer後,機器的記憶體已經使用了90%了,可憐的dell 1420。

大資料實戰之環境搭建(十)

繼續閱讀