티스토리 뷰

Getting started with RabbitMQ.

대용량 메세지 처리를 위한 오픈소스들이 즐비한 시점에 여러 오픈소스를 찾다 RabbitMQ에

대한 Reference가 잘 되어있는 듯하여 소개하고자 한다.

그전에 MessageQueue에 대해 간단히 소개하자면,  요즘과 같은 대용량 메세지 처리 시 

쏟아지는 데이터를 처리하는데 한계가 있기 때문에 MessageQueue 시스템을 이용하여,

대량의 메세지를 Queue에 저장 하여 처리하도록 설계된 시스템이다.


대표적인 MessageQueue로 kafka, RabbitMQ, ActiveMQ, ZeroMQ, JMS 등이 있다.


RabbitMQ의 특징

  • Robust messaging for applications
  • Easy to use
  • Runs on all major operating systems
  • Supports a huge number of developer platforms

  • Open source and commercially supported

어플리케이션에 특화된 강력한 메세징 처리, 쉬운사용법, 다양한 메이저 OS지원 , 다양한 

개발 플랫폼 지원, 그리고 OpenSource 라는점.

Rabbit MQ는 다양한 플랫폼과  Queue 방식을 지원한다.

기본적인 구조는 아래의 그림과 같다.

[그림1] RabbitMQ 기본 구조


Producer에서 생산한 Message를 hello라는 Queue에 넣고, Consumer는 Queue에 

발생한 데이터를 소비하는 형태의 일반적인 Message Queue방식이다.

여기에 메세지를  Queue에 Routing하는 Exchange 컨셉을 이해한다면

AMQP(Advanced Message Queuing Protocol) 기반의  RabbitMQ 에 대한 동작 원리를 

이해 했다고 볼 수 있다.

Exchange는 메세지를 Queue로 분배하는 Routing 기능을 수행하며, Exchange Type을

통해 여러 메세지 분배 알고리즘을 지정할 수 있다.

[그림 1] RabbitMQ 메세지 처리 구조 설계


또한 Exchange,  Queue 설정에 따라 지속, 삭제 등의 설정을 지정 할 수 있다.

Exchange Type 종류

- Direct     : routing key를 Queue와 1:N으로 매칭하는 방법이다.

- Topic     : 와일드카드 형태의 routing key를 Queue에 매칭한다. (* : 한개이상, #  0개이상)

- Fanout   : 메세지를 모든큐에 라우팅한다.

- Headers : key-value 형태의 multiple attribute를 이용하여 

                  라우팅을 수행하며 x-match=any인 경우 attribute 한개이상 매칭,

                  x-match=all일경우 모든 attribute가 매칭되어야 한다.

참조 : https://www.rabbitmq.com/tutorials/amqp-concepts.html


Exchange Type 주요 속성

auto-delete :  모든 큐 사용 후 삭제여부 결정 (true : 삭제, false : 제거)

durable :  서버를 재시작 할 경우 유지상태 설정 (true : 유지, false : 제거)


Queue Type 주요 속성

exclusive : 현재 연결한 client에서만 액세스 가능하며, 클라이언트 연결 종료 시 

                삭제된다. ( 다른 클라이언트의 연결은 허용되지 않는다. )

auto-delete :  모든 소비자의 사용 종료 후 삭제여부 결정 (true : 삭제, false : 제거)

                     어플리케이션에서 명시적으로 auto-delete queues의 삭제가

                     가능하다.

durable :  서버를 재시작 할 경우 유지상태 설정 (true : 유지, false : 제거)

참조 : https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.auto-delete 


설치 및 구현

설치 및 간단한 Chatting Client & Server 구현 방법을 소개한다.


설치환경

OS : Ubuntu 14.04

Nodejs : 0.10.33

RabbitMQ : 3.2.4


RabbitMQ Server 설치

apt-get을 이용하여 설치한다.

$ sudo apt-get install rabbitmq-server


RabbitMQ  Web Management Tool 설치

$ sudo rabbitmq-plugins enable rabbitmq_management

$ sudo service rabbitmq-server restart


구현

Nodejs의  Socket.io를 이용하여 서버 구현 후 테스트를 진행한다.


MessageQueue.js (Server)

/*! * Message Queue * * @author rocksea * @since 2014.11.26 */ var amqp = require('amqp'); var rabbit = amqp.createConnection($config.amqp); var count = 1; var ctag = new Array(); var messageQueue = function(socket){ rabbit.on('ready', function () { socket.on('connection', function (conn) { conn.on('message', function (message) { try { var exchange = 'share-1'; if (message.type == "register") { // Connect to RabbitMQ try { if(conn.exchange === undefined){ conn.exchange = rabbit.exchange(exchange, { type: 'topic', autoDelete: false, durable: false, exclusive: false, confirm: true }); } if(conn.q === undefined){ conn.q = rabbit.queue('share-1-'+message.userId, { durable: true, autoDelete: false, exclusive: false }, function () { //conn.channel = 'share-1.' + message.userId; conn.channel = 'share-1.'+message.userId;

conn.q.bind(conn.exchange, 'share-1.*');

conn.q.bind(conn.exchange, 'share-1.'+message.userId);

//conn.q.bind(conn.exchange, 'share-1.' + message.userId); conn.q.subscribe(function (message) { $logger.debug("[MSG] ---> " + JSON.stringify(message)); conn.write(JSON.stringify(message) + "\n"); }).addCallback(function(ok) { ctag[conn.channel] = ok.consumerTag; }); }); } } catch (err) { $logger.error("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack); } } else if (message.type == "chat") { var reply = { type: 'push', message: message.message, userId: message.userId, //visitorNick: obj.channel, customField1: '', //time: utils.getDateTime(), channel: 'share-1.' + message.userId //channel: 'share-1' }; conn.exchange.publish('share-1.' + message.userId, reply); //conn.exchange.publish('my-queue-*', reply); } } catch (err) { $logger.error("ERROR ----> " + err.stack); } }); conn.on('disconnect', function () { try { // Unsubscribe MQ if(ctag[conn.channel] !== null && ctag[conn.channel] !== undefined){ conn.q.unsubscribe(ctag[conn.channel]); $logger.debug("### unsubscribe : " + ctag[conn.channel]); delete ctag[conn.channel]; } $logger.info("Disconnected Socket [%s]",conn.channel); conn.disconnect(); //sub.queue.destroy(); //해당 queue를 삭제한다. } catch (er) { $logger.error(":::::::: Exception Socket (ON-Disconnect) ::::::::>>>>>>> " + er.stack); } }); }); }); }


index.html (Client)

<html>
<head>
<title>Socket and Redis in Node.js</title>
<script src="/socket.io/socket.io.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.6.4/jquery.min.js"></script>
</head>
<body>
<div id="username">
<input type="text" name="usernameTxt" /> <input type="button" name="setUsername" value="Set Username" />
</div>
<div id="sendChat" style="display:none;">
<input type="text" name="chatTxt" /> <input type="button" name="sendBtn" value="Send" /> <input type="button" name="closeBtn" value="Close" />
</div>
<br />
<div id="content"></div>
<script>
$(document).ready(function() {
  var username = "anonymous";
  $('input[name=setUsername]').click(function(){
    if($('input[name=usernameTxt]').val() != ""){
      username = $('input[name=usernameTxt]').val();
      var msg = {type:'register', userId: username, routingKey : 'rocksea' };
      socket.json.send(msg);
    }
    $('#username').slideUp("slow",function(){
      $('#sendChat').slideDown("slow");
    });
  });
  var socket = new io.connect('http://192.168.0.142:3300');
  var content = $('#content');

  socket.on('connect', function() {
    console.log("Connected");
  });

  socket.on('message', function(message){
    var msg = $.parseJSON(message);
    content.append(msg.userId + " : " + msg.message + '<br />');
  }) ;

  socket.on('disconnect', function() {
    console.log('disconnected');
    content.html("<b>Disconnected!</b>");
  });

  socket.on('connect', function() {
    var msg = {routingKey : 'rocksea' };
    //sub.emit('bindQueue',msg);
  });

  socket.on('data', function(message){
    alert("## message : " + message.message);
  });

  $("input[name=sendBtn]").click(function(){
    var message = $("input[name=chatTxt]").val();
    var msg = {type: 'chat', userId: username, message: message, routingKey : 'rocksea' };
    socket.json.send(msg);
    $("input[name=chatTxt]").val("");
  });

  $("input[name=closeBtn]").click(function(){
    alert('close');
    socket.disconnect();
  });
});
</script>
</body>
</html>


Test

채팅 아이디를 등록 한 뒤 메세지 전송 및 수신 테스트 진행.

[그림 2] 아이디 생성 ( Queue 생성 )


[그림 3] 메세지 전송 및 수신


queue의 durable 속성을 true로 설정 후 테스트 진행.

[그림 4] 재접속 테스트


[그림 5] 재접속 후 미수신 메세지  수신


MessageQueue 관리

웹을 이용하여 MessageQueue를 관제 할 수 있다.

( Queue Binding, 삭제, 접속 체널, 전송량 등을 확인 할 수 있다. )

접속 기본 포트 - http://hostname:15672

추후 벤치마킹 테스트 자료도 추가 할 것이다.

댓글