HAProxy를 통한 RabbitMQ MQTT의 로드밸런싱

1. 개요
RabbitMQ MQTT서버를 리눅스(moramlinux)와 Windows 2012 Server(MoramDBSvr)에 설치하여 클러스터링을 걸어놓은 상태이다. 이를 이용하여 리눅스(mosamlinux)에 HAProxy를 설치하여 아래처럼 4530 포트로 요청이 들어왔을때 192.168.0.206 또는 192.168.0.207 서버로 전달하기 위하여 기술한 내용이다.

2. 설치

add-apt-repository ppa:vbernat/haproxy-1.6
apt-get update
apt-get dist-upgrade
apt-get install haproxy

3. 환경설정
/etc/haproxy/haproxy.cfg 파일을 열어 수정한다.

global
        log /dev/log    local0
        log /dev/log    local1 notice
        chroot /var/lib/haproxy
        stats socket /run/haproxy/admin.sock mode 660 level admin
        stats timeout 30s
        user haproxy
        group haproxy
        daemon

        # Default SSL material locations
        ca-base /etc/ssl/certs
        crt-base /etc/ssl/private

        # Default ciphers to use on SSL-enabled listening sockets.
        # For more information, see ciphers(1SSL). This list is from:
        #  https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
        ssl-default-bind-ciphers ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+3DES:!aNULL:!MD5:!DSS
        ssl-default-bind-options no-sslv3

        maxconn    4096 # 최대연결수


defaults
        log     global
        #mode    http
        mode    tcp
        #option  httplog
        option  tcplog
        option  dontlognull
        option  redispatch
        retries 3

        timeout connect 5000
        timeout client  50000
        timeout server  50000


listen  rabbitmq
        bind 192.168.0.207:4530
        mode tcp
        balance roundrobin
        server rabbitmq-mosaicdbsvr 192.168.0.206:1883 check inter 5s rise 2 fall 3
        server rabbitmq-moramlinux 192.168.0.207:1883 check inter 5s rise 2 fall 3

4. 리스타드

service haproxy restart

RabbitMQ MQTT 설치 및 환경설정

1. 다운로드 및 설치
a. 얼랑(erlang) 설치
http://www.erlang.org/downloads 에서 다운로드 하여 설치

1. 다운로드 및 설치
1) 얼랑(erlang) 설치
http://www.erlang.org/downloads 에서 다운로드 하여 설치
rabbitmq01

2) RabbitMq 설치
https://www.rabbitmq.com/download.html 에서 다운로드 하여 설치
rabbitmq02

2. 관리콘솔 활성화
웹을 통해 MQ의 상태 등을 관리하기 위한 관리콘솔로써,
설치폴더의 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin에서

rabbitmq-plugins enable rabbitmq_management 

을 입력하여 활성화

rabbitmq03

반영은 RabbitMQ 서비스를 재시작해야 한다.
관리자권한으로 명령프롬프트를 띄워 아래와 같이 입력

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin\rabbitmq-service stop
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin\rabbitmq-service start

그리고 관리콘솔로 로그인하기 위하여 웹상에서 아래와 같이 입력하면 된다.

http://localhost:15672/

기본 ID/PASSWORD는 guest/guest 임

3. 방화벽 설정
아래 포트를 방화벽에서 Open한다.
– 5672 : AMQP 포트
– 15672 : 웹 관리 콘솔
– 4369 : EPMD (Erlang Port Mapper Daemon)
– 1883 : MQTT 접속 포트

rabbitmq04

4. 사용자 계정 생성
1) http://localhost:15672/ 에 guest/guest 로 로그인
2) Admin > Users 메뉴로 이동하여 계정 등록
3) Admin 계정 등록
rabbitmq05

Guest 계정을 없애고 admin을 등록하기 위해 위와 같이 입력
참고: 모든권한을 주려면 Tags 선택시 Admin을 클릭해야 한다.

4) 서비스 계정 등록
아래와 같이 서비스로만 사용할 계정은 Tags가 아무것도 없도록 한다.

rabbitmq06

5) 퍼미션 부여
– 위에서 등록한 admin과 testuser 계정에 대하여 퍼미션을 부여하기 위해 해당 계정을 클릭
– Virtual Host를 선택하고 아래 Set Permission을 클릭
– 비밀번호를 넣고 사용자 정보 업데이트
rabbitmq07

6) 명령어를 통한 계정 관리
– 계정생성 : rabbitmqctl add_user [사용자] [비밀번호] – 계정삭제 : rabbitmqctl delete_user [사용자] – 비밀번호 변경 : rabbitmqctl change_password [사용자] [새로운 비밀번호] – 계정리스트 보기 : rabbitmqctl list_users
– 태그부여 : rabbitmqctl set_user_tags [사용자] [태그]

5. MQTT 활성화
다음 명령을 통해 활성화

rabbitmq-plugins enable rabbitmq_mqtt

6. 환경설정 파일
– 해당사용자 계정의 하위 폴더에 있는 (Ex: C:\Users\yomile\AppData\Roaming\RabbitMQ) rabbitmq.config 파일 수정

[{rabbit,        [{tcp_listeners,    [5672]}]},
 {rabbitmq_mqtt, [{tcp_listeners,    [1883]}]}
].

hMailServer에서 데이터 디렉토리 변경

hMailServer에서 데이터(Data) 디렉토리를 변경하기 위해서는
일차적으로 hMailSever 서비스를 중지한다.

그 이후로 설치폴거의 bin 폴더에 있는 hMailServer.ini 을 열어
아래와 같이 수정해 주면 된다.

[Directories]
ProgramFolder=C:\Program Files (x86)\hMailServer
DatabaseFolder=C:\Program Files (x86)\hMailServer\Database
DataFolder=D:\hMailServer\Data
LogFolder=D:\hMailServer\Logs
TempFolder=D:\hMailServer\Temp
EventFolder=C:\Program Files (x86)\hMailServer\Events

전자정부프레임워크(3.5)에서 websocket 사용하기

다음은 전자정부프레임워크 3.5에서 웹소켓을 사용하기 위한 절차임

1. 요구사항
1) 전자정부프레임워크 : 3.5
2) 아파치 톰캣 : 7.0.70 이상(너무 버전이 낮아도 웹소켓을 지원안하므로 유의해야 함)
3) spring framework : 4.0.9.RELEASE
– 전자정부 프레임 3.5 버전에서 사용하는 스프링 버전, 더 높일수 있지만 다른 문제 발생할수 있음. 실제로 스프링 4.1에서는
그간 잘 써왔던 jackson의 지원이 중단되어 다른것으로 대체해야 하는등 문제가 발생한다.
4) JDK : 1.7 이상
5) SockJs : 익스플로러 낮은버전도 지원하기 위하여 사용

2. 메이븐 POM에 추가


4.0.9.RELEASE				
 
	org.springframework 
	spring-context 
	${springframework.version} 
 		

	org.springframework
	spring-core
	${springframework.version}
		

	org.springframework
	spring-beans
	${springframework.version}
		

	org.springframework
	spring-web
	${springframework.version}


	org.springframework
	spring-webmvc
	${springframework.version}



	org.springframework
	spring-websocket
	${springframework.version}

 
	javax.servlet 
	javax.servlet-api 
	3.1.0 
	provided 


	javax.websocket
	javax.websocket-api
	1.0
	
	provided            

3. 웹소켓 설정

@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSockConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer
{
	@Override
	public void registerWebSocketHandlers(WebSocketHandlerRegistry wshrRegistry)
	{
		// 웹소켓
		wshrRegistry.addHandler(monHandler(), "/websocket/monHandler.ws")
			.addInterceptors(new MonHandshakeInterceptor());
		
		// SockJs	
		wshrRegistry.addHandler(monHandler(), "/websocket/monHandler.sockjs")
			.addInterceptors(new MonHandshakeInterceptor())
			.withSockJS();
	}

	@Bean
	public MonHandler monHandler()
	{
		return new MonHandler();
	}

    @Override
    public void configureDefaultServletHandling(DefaultServletHandlerConfigurer configurer)
    {
        configurer.enable();
    }
}

4. DispatcherServletInitializer

public class DispatcherServletInitializer extends AbstractAnnotationConfigDispatcherServletInitializer {

	@Override
	protected Class[] getRootConfigClasses()
	{
		return null;
	}

	@Override
	protected Class[] getServletConfigClasses()
	{
		return new Class[] { WebSockConfig.class };
	}

	@Override
	protected String[] getServletMappings()
	{
		return new String[] { "/" };
		//return new String[] {"*.do", "*.json"};
	}

	@Override
	protected void customizeRegistration(Dynamic registration)
	{
		registration.setInitParameter("dispatchOptionsRequest", "true");
	}
}

5. 인터셉터
– 인터셉터를 쓴 이유는 사실 SockJs에서 넘긴 파라미터를 처리하기 위해서이다. 즉 아래와 같이 userId를 넘겼을때
받을수 있는곳은 인터셉터이기 때문이다.

wsSockJs = new SockJS("http://localhost:8080/websocket/monHandler.sockjs?userId=yomile");
 

public class MonHandshakeInterceptor extends HttpSessionHandshakeInterceptor 
{
	@Override
	public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map mpAttributes) throws Exception
	{
		ServletServerHttpRequest sshRequest = (ServletServerHttpRequest)request;
        HttpServletRequest hsrRequest =  sshRequest.getServletRequest();

        String strUserId = hsrRequest.getParameter("userId");
        mpAttributes.put("userId", strUserId);
        return super.beforeHandshake(request, response, wsHandler, mpAttributes);
	}

	@Override
	public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex)
	{
		super.afterHandshake(request, response, wsHandler, ex);
	}
}

6. 웹소켓 핸들러
– 위의 인테셉터에서 mpAttributes에 파라미터값을 다시 넣어주게 되면 아래 웹소켓 핸들러에서는 WebSocketSession의 getAttributes를
이용하여 값을 빼어 낼수 있게 된다. 결국 JavaScript의 Sockjs에서 넣은 파라미터 값이 최종적으로 여기까지 올수 있게 된다.


//@Component
public class MonHandler extends TextWebSocketHandler 
{
    private static Logger mStatLogger = LoggerFactory.getLogger(MonHandler.class);

	private List mLstSession = null;

	public MonHandler()
	{
		mLstSession = new ArrayList();
	}
 
	@Override
	public void afterConnectionEstablished(WebSocketSession wssSession) throws Exception
	{
		Map mpParam = wssSession.getAttributes();
    	String strUserId = (String)mpParam.get("userId");
		mStatLogger.debug(" 접속, UserId:"+ strUserId +",SessionId:"+ wssSession.getId() +", 연결 IP:" + wssSession.getRemoteAddress().getHostName() );
		mLstSession.add(wssSession);
	}

	@Override
	protected void handleTextMessage(WebSocketSession wssCurSession, TextMessage tmMsg) throws Exception
	{
		mStatLogger.debug("*MonHandler.handleTextMessage");
		mStatLogger.info("{}로 부터 {} 받음", wssCurSession.getId(), tmMsg.getPayload());
		MonVO clsMonVO = MonVO.convert(tmMsg.getPayload());
		
		mStatLogger.debug(" -clsMonVO.getTarget():"+ clsMonVO.getTarget());
		mStatLogger.debug(" -clsMonVO.getMessage():"+ clsMonVO.getMessage());
		
		
		
		Map mpCurParam = wssCurSession.getAttributes();
		String strCurUserId = (String)mpCurParam.get("userId");
		mStatLogger.debug(" - CurUserId:"+ strCurUserId);
		
		
		//연결된 모든 클라이언트에게 메시지 전송
		for(WebSocketSession wssSession : mLstSession)
		{
			Map mpParam = wssSession.getAttributes();
			String strUserId = (String)mpParam.get("userId");
			if(clsMonVO.getTarget().equals(strUserId) == true)
			{
				// 전송대상한테만 메시지를 보낸다.	    	
				wssSession.sendMessage(new TextMessage(clsMonVO.getMessage()));
			}
		}
	}
    
	@Override  
	public void handleTransportError(WebSocketSession wssSession, Throwable exception) throws Exception
	{  
		mStatLogger.debug("*MonHandler.handleTransportError");
		if(wssSession.isOpen() == true) wssSession.close();  
		mLstSession.remove(wssSession);  
	}  
    

	@Override
	public void afterConnectionClosed(WebSocketSession wssSession, CloseStatus status) throws Exception
	{
		mStatLogger.debug("*MonHandler.afterConnectionClosed");
		mStatLogger.info("{} 연결 끊김.", wssSession.getId());
		mLstSession.remove(wssSession);
	}	
}

7. JSP 설정

<%@ page language="java" contentType="text/html; charset=EUC-KR" pageEncoding="EUC-KR"%>


	
		
		test
		
		
		
		
	

 


mysql autocommit 설정

1. autocommit 확인

SELECT @@AUTOCOMMIT; 

+--------------+
| @@autocommit |
+--------------+
|            0 |
+--------------+

2. autocommit 설정

SET AUTOCOMMIT = FALSE;