본문으로 바로가기

[JMS]Activemq 메시지 소비자(Consumer)

category JAVA 2019. 2. 18. 19:27
반응형
package activemq;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.xml.bind.Marshaller.Listener;

import org.apache.activemq.ActiveMQConnectionFactory;


public class MQConsumer implements MessageListener, ExceptionListener {
	
	private boolean run;
	private int maximumMessage;
	private Session session;
	private Destination destination;
	private MessageConsumer MConsumer;
	private String route = "";
	private String user = ActiveMQConnectionFactory.DEFAULT_USER;
	private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
	private String url = "";	
    private long receiveTimeOut;

	public void run() {		
		try {
			run = true;
			
			Connection conn = null;
			System.out.println("연결된 주소 : " + url );
			System.out.println(route);
			
			
			//ActiveMQ 연결팩토리 생성
			ActiveMQConnectionFactory CF = new ActiveMQConnectionFactory(url);
			conn = CF.createConnection();
			//연결시작
			conn.setExceptionListener(this);
			conn.start();
			
			/**
			 * AUTO.ACKNOWLEDGE
			 * 자동으로 수신확인
			 */
			//세션 연결 
			session = conn.createSession(false, session.AUTO_ACKNOWLEDGE);
			//목적지설정
			destination = session.createTopic(route);
			
			//메시지 컨슈머가 널일 때
			MessageConsumer consumer = null;
			consumer = session.createConsumer(destination);
			if(maximumMessage > 0) {
				 try {
					Consume(conn, session, consumer);
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if(session == null) {
				System.out.println("연결X");
			}
			else {
				System.out.println("연결 성공");
				if(receiveTimeOut == 0) {
					 consumer.setMessageListener(this);
				}
			}
		}catch(JMSException e){
				e.printStackTrace();
			}
	}		
			
			//메시지 수신 메소드
			public void onMessage(Message message) {
				try {
				if(message instanceof TextMessage) {
					TextMessage textMessage = (TextMessage) message;
					System.out.println("Receive : " + textMessage.getText() + " ");
				}else {
				
					/**
					 * AUTO.ACKNOWLEDGE
					 * 자동으로 수신확인
					 * CLIENT_ACKNOWLEDGE
					 * 클라이언트 확인(소비확인)
					 */
				} if (session.AUTO_ACKNOWLEDGE == Session.CLIENT_ACKNOWLEDGE) {
	                message.acknowledge();
	            }

			}catch(JMSException e) {
				e.printStackTrace();
			 }	
			}	

			public synchronized void onException(JMSException e) {
				
			}
			 protected void Consume(Connection conn, Session session, MessageConsumer consumer) throws JMSException, IOException {
			        
			        
			       if(run) {
			    	   for(int i = 0 ; i<=10;) {
			            Message message = consumer.receive(1000);
			            System.out.println(message);
			            if (message != null) {
			                i++;
			                onMessage(message);
			            }
			            }
			        }
			    }
			
	
public void Close(Connection conn, Session session, MessageConsumer consumer) throws JMSException {
	
	 conn.close();
	 session.close();
	 consumer.close();
	 
}
public static void main(String[] args) {
	
	MQConsumer mqc = new MQConsumer();
	mqc.run();
	
	
	
}
}
반응형