获取ActiveMQ.DLQ的死信消息

获取ActiveMQ.DLQ的死信消息

使用ActiveMQ的持久消息后,大量死信消息会堆积在ActiveMQ.DLQ队列,如果数量增大,会对MQ的内存和磁盘造成一些影响。

DLQ队列中的消息来源,这里没有完全去测试,目前来看有客户端消费出现异常或者被事务回滚的消息。

DLQ队列可以通过MQ配置来禁用,这样就不会堆积到DLQ队列,但对于这部分信息的状态就无法获取,好一点的方式是DLQ队列保留,增加对DLQ队列的处理措施,例如获取队列内的数据,找出原因,并做补救,如果通过MQ传递可靠消息,这个保留处理的步骤应该是必须的,另一方面是从消费端的应用上解决出现死信消息的原因。

DLQ是一个队列,通过普通的队列消费方式就可以拿到其中的消息,消息本身是一个ActiveMQObjectMessage.

通过Spring-JMS来处理这部分消息,spring的配置:

	<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactoryConsumer"/>
		<property name="destination" ref="dlqQueues"/>
		<property name="concurrentConsumers" value="5" />
		<property name="messageListener">
			<bean class="com.winxuan.mq.dlq.ReceiveDLQ"/>
		</property>
	</bean>

剩下就是实现一个messagelistener接口的类:

package com.winxuan.mq.dlq;

import java.lang.reflect.Method;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.springframework.remoting.support.RemoteInvocation;

/**
 * @author HideHai
 *
 */
public class ReceiveDLQ implements MessageListener{

	@Override
	public void onMessage(Message message) {
		System.out.println(String.format("消息类型:%s",message.getClass()) );
		if(message instanceof ObjectMessage){
			processObjectMessage((ObjectMessage) message);
		}
		System.out.println(String.format("================================================================"));
	}

	private void processObjectMessage(ObjectMessage objectMessage){
		try {
			System.out.println(String.format("消息编号:%s", objectMessage.getJMSMessageID()) );
			System.out.println(String.format("消息目标队列:%s", objectMessage.getJMSDestination()) );
			System.out.println(String.format("消息关联编号:%s", objectMessage.getJMSCorrelationID()) );
			System.out.println(String.format("消息时间:%s", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(
					new Date(objectMessage.getJMSTimestamp()))));
			Enumeration<Object> enumeration = objectMessage.getPropertyNames();
			while(enumeration.hasMoreElements()){
				String pKey = enumeration.nextElement().toString();
				System.out.println(String.format("%s:%s",pKey,objectMessage.getStringProperty(pKey)));
			}
			Map<String,String> nonMap = getNonPublicMethod(objectMessage);
			for(Entry<String,String> entry:nonMap.entrySet()){
				System.out.println(String.format("%s:%s",entry.getKey(),entry.getValue()));
			}
			Object object = objectMessage.getObject();
			if(object instanceof RemoteInvocation){		//使用Spring封装后的远程方法
				RemoteInvocation invocation = (RemoteInvocation) object;
				System.out.println(String.format("消息调用方法名称:%s,参数值:", invocation.getMethodName()));
				Object[] args = invocation.getArguments();
				Class[]	clazz = invocation.getParameterTypes();
				if(args != null){
					int argsLenth = args.length;
					for(int i = 0 ; i < argsLenth;i++){
						System.out.println(String.format("类型: %s - 内容:%s",clazz[i],args[i]));
					}
				}
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private Map<String,String> getNonPublicMethod(ObjectMessage objectMessage) throws Exception{
		Map<String, String> map = new HashMap<String, String>();
		Method[] methods = objectMessage.getClass().getMethods();
		String[]   name   =   new   String[methods.length]; 
//		Field.setAccessible(methods, true);
		for   (int  i =  0; i  <  name.length; i++)   { 
			name[i]   =   methods[i].getName();
//			System.out.println(name[i]);
			if(name[i].equals("getOriginalDestination")){		//原始目的地
				map.put("originalDestination", methods[i].invoke(objectMessage).toString());
				break;
			}else if(name[i].equals("getJMSTimestamp")){

			}
		}
		return map;
	}

}

将消息转换为ObjectMessage获取其中的属性.

通过Debug,发现Message类中是有OriginalDestination属性的,此属性包含了原始的目的信息,在分析DLQ时,这个数据时很有用的,这部分信息目前是protected,需要通过反射获取.类似的还有进/出队列的时间等..

result:

消息类型:class org.apache.activemq.command.ActiveMQObjectMessage
消息编号:ID:n2.host.com-58921-1397145694637-0:2:1:1:28752
消息目标队列:queue://ActiveMQ.DLQ
消息关联编号:null
消息时间:2014-04-13 21:55:39
originalExpiration:0
dlqDeliveryFailureCause:java.lang.Throwable: TopicSubDiscard. 
ID:ID:n1.host.com-4277-1397145903677-0:1:1:1
originalDestination:topic://distribution.product
消息调用方法名称:updateStock,参数值:
类型: long - 内容:1200839005
类型: int - 内容:75

 

留下回复