티스토리 뷰
Getting started with RabbitMQ.
대용량 메세지 처리를 위한 오픈소스들이 즐비한 시점에 여러 오픈소스를 찾다 RabbitMQ에
대한 Reference가 잘 되어있는 듯하여 소개하고자 한다.
그전에 MessageQueue에 대해 간단히 소개하자면, 요즘과 같은 대용량 메세지 처리 시
쏟아지는 데이터를 처리하는데 한계가 있기 때문에 MessageQueue 시스템을 이용하여,
대량의 메세지를 Queue에 저장 하여 처리하도록 설계된 시스템이다.
대표적인 MessageQueue로 kafka, RabbitMQ, ActiveMQ, ZeroMQ, JMS 등이 있다.
RabbitMQ의 특징
- Robust messaging for applications
- Easy to use
- Runs on all major operating systems
- Supports a huge number of developer platforms
- Open source and commercially supported
[그림1] RabbitMQ 기본 구조
Producer에서 생산한 Message를 hello라는 Queue에 넣고, Consumer는 Queue에
발생한 데이터를 소비하는 형태의 일반적인 Message Queue방식이다.
여기에 메세지를 Queue에 Routing하는 Exchange 컨셉을 이해한다면
AMQP(Advanced Message Queuing Protocol) 기반의 RabbitMQ 에 대한 동작 원리를
이해 했다고 볼 수 있다.
Exchange는 메세지를 Queue로 분배하는 Routing 기능을 수행하며, Exchange Type을
통해 여러 메세지 분배 알고리즘을 지정할 수 있다.
[그림 1] RabbitMQ 메세지 처리 구조 설계
또한 Exchange, Queue 설정에 따라 지속, 삭제 등의 설정을 지정 할 수 있다.
Exchange Type 종류
- Direct : routing key를 Queue와 1:N으로 매칭하는 방법이다.
- Topic : 와일드카드 형태의 routing key를 Queue에 매칭한다. (* : 한개이상, # 0개이상)
- Fanout : 메세지를 모든큐에 라우팅한다.
- Headers : key-value 형태의 multiple attribute를 이용하여
라우팅을 수행하며 x-match=any인 경우 attribute 한개이상 매칭,
x-match=all일경우 모든 attribute가 매칭되어야 한다.
참조 : https://www.rabbitmq.com/tutorials/amqp-concepts.html
Exchange Type 주요 속성
auto-delete : 모든 큐 사용 후 삭제여부 결정 (true : 삭제, false : 제거)
durable : 서버를 재시작 할 경우 유지상태 설정 (true : 유지, false : 제거)
Queue Type 주요 속성
exclusive : 현재 연결한 client에서만 액세스 가능하며, 클라이언트 연결 종료 시
삭제된다. ( 다른 클라이언트의 연결은 허용되지 않는다. )
auto-delete : 모든 소비자의 사용 종료 후 삭제여부 결정 (true : 삭제, false : 제거)
어플리케이션에서 명시적으로 auto-delete queues의 삭제가
가능하다.
durable : 서버를 재시작 할 경우 유지상태 설정 (true : 유지, false : 제거)
참조 : https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.auto-delete
설치 및 구현
설치 및 간단한 Chatting Client & Server 구현 방법을 소개한다.
설치환경
OS : Ubuntu 14.04
Nodejs : 0.10.33
RabbitMQ : 3.2.4
RabbitMQ Server 설치
apt-get을 이용하여 설치한다.
$ sudo apt-get install rabbitmq-server
RabbitMQ Web Management Tool 설치
$ sudo rabbitmq-plugins enable rabbitmq_management
$ sudo service rabbitmq-server restart
구현
Nodejs의 Socket.io를 이용하여 서버 구현 후 테스트를 진행한다.
MessageQueue.js (Server)
/*! * Message Queue * * @author rocksea * @since 2014.11.26 */ var amqp = require('amqp'); var rabbit = amqp.createConnection($config.amqp); var count = 1; var ctag = new Array(); var messageQueue = function(socket){ rabbit.on('ready', function () { socket.on('connection', function (conn) { conn.on('message', function (message) { try { var exchange = 'share-1'; if (message.type == "register") { // Connect to RabbitMQ try { if(conn.exchange === undefined){ conn.exchange = rabbit.exchange(exchange, { type: 'topic', autoDelete: false, durable: false, exclusive: false, confirm: true }); } if(conn.q === undefined){ conn.q = rabbit.queue('share-1-'+message.userId, { durable: true, autoDelete: false, exclusive: false }, function () { //conn.channel = 'share-1.' + message.userId; conn.channel = 'share-1.'+message.userId;
conn.q.bind(conn.exchange, 'share-1.*');
conn.q.bind(conn.exchange, 'share-1.'+message.userId);
//conn.q.bind(conn.exchange, 'share-1.' + message.userId); conn.q.subscribe(function (message) { $logger.debug("[MSG] ---> " + JSON.stringify(message)); conn.write(JSON.stringify(message) + "\n"); }).addCallback(function(ok) { ctag[conn.channel] = ok.consumerTag; }); }); } } catch (err) { $logger.error("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack); } } else if (message.type == "chat") { var reply = { type: 'push', message: message.message, userId: message.userId, //visitorNick: obj.channel, customField1: '', //time: utils.getDateTime(), channel: 'share-1.' + message.userId //channel: 'share-1' }; conn.exchange.publish('share-1.' + message.userId, reply); //conn.exchange.publish('my-queue-*', reply); } } catch (err) { $logger.error("ERROR ----> " + err.stack); } }); conn.on('disconnect', function () { try { // Unsubscribe MQ if(ctag[conn.channel] !== null && ctag[conn.channel] !== undefined){ conn.q.unsubscribe(ctag[conn.channel]); $logger.debug("### unsubscribe : " + ctag[conn.channel]); delete ctag[conn.channel]; } $logger.info("Disconnected Socket [%s]",conn.channel); conn.disconnect(); //sub.queue.destroy(); //해당 queue를 삭제한다. } catch (er) { $logger.error(":::::::: Exception Socket (ON-Disconnect) ::::::::>>>>>>> " + er.stack); } }); }); }); }
index.html (Client)
<html> <head> <title>Socket and Redis in Node.js</title> <script src="/socket.io/socket.io.js"></script> <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.6.4/jquery.min.js"></script> </head> <body> <div id="username"> <input type="text" name="usernameTxt" /> <input type="button" name="setUsername" value="Set Username" /> </div> <div id="sendChat" style="display:none;"> <input type="text" name="chatTxt" /> <input type="button" name="sendBtn" value="Send" /> <input type="button" name="closeBtn" value="Close" /> </div> <br /> <div id="content"></div> <script> $(document).ready(function() { var username = "anonymous"; $('input[name=setUsername]').click(function(){ if($('input[name=usernameTxt]').val() != ""){ username = $('input[name=usernameTxt]').val(); var msg = {type:'register', userId: username, routingKey : 'rocksea' }; socket.json.send(msg); } $('#username').slideUp("slow",function(){ $('#sendChat').slideDown("slow"); }); }); var socket = new io.connect('http://192.168.0.142:3300'); var content = $('#content'); socket.on('connect', function() { console.log("Connected"); }); socket.on('message', function(message){ var msg = $.parseJSON(message); content.append(msg.userId + " : " + msg.message + '<br />'); }) ; socket.on('disconnect', function() { console.log('disconnected'); content.html("<b>Disconnected!</b>"); }); socket.on('connect', function() { var msg = {routingKey : 'rocksea' }; //sub.emit('bindQueue',msg); }); socket.on('data', function(message){ alert("## message : " + message.message); }); $("input[name=sendBtn]").click(function(){ var message = $("input[name=chatTxt]").val(); var msg = {type: 'chat', userId: username, message: message, routingKey : 'rocksea' }; socket.json.send(msg); $("input[name=chatTxt]").val(""); }); $("input[name=closeBtn]").click(function(){ alert('close'); socket.disconnect(); }); }); </script> </body> </html>
Test
채팅 아이디를 등록 한 뒤 메세지 전송 및 수신 테스트 진행.
[그림 2] 아이디 생성 ( Queue 생성 )
[그림 3] 메세지 전송 및 수신
queue의 durable 속성을 true로 설정 후 테스트 진행.
[그림 4] 재접속 테스트
[그림 5] 재접속 후 미수신 메세지 수신
MessageQueue 관리
웹을 이용하여 MessageQueue를 관제 할 수 있다.
( Queue Binding, 삭제, 접속 체널, 전송량 등을 확인 할 수 있다. )
접속 기본 포트 - http://hostname:15672
추후 벤치마킹 테스트 자료도 추가 할 것이다.
'Developer' 카테고리의 다른 글
[Ajax] Crossdomain Problem. (0) | 2014.12.17 |
---|---|
[Python#2] Using Flask For fastest Web Applications. (0) | 2014.12.11 |
[Ubuntu] Upgrading newer version from older version. (0) | 2014.11.11 |
[Bigdata#1] Hadoop Cluster and Hive Installation Guide. (0) | 2014.11.06 |
[Qmail] remove mail queue . (0) | 2014.10.29 |
- Total
- Today
- Yesterday
- mongoDB
- it
- Python Django
- JBOSS
- ubuntu
- 가정법
- Python
- 대명사 구문
- hadoop
- memcached
- 영작
- 해외여행
- 영문법
- 비지니스 영어
- Business English
- hdfs
- redis
- 도덕경
- 다낭
- 여행
- 비교구문
- maven
- k8s
- 조동사
- NGINX
- PostgreSQL
- 베트남
- AWS
- 스페인 여행
- nodejs
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |