获取ActiveMQ.DLQ的死信消息
使用ActiveMQ的持久消息后,大量死信消息会堆积在ActiveMQ.DLQ队列,如果数量增大,会对MQ的内存和磁盘造成一些影响。
DLQ队列中的消息来源,这里没有完全去测试,目前来看有客户端消费出现异常或者被事务回滚的消息。
DLQ队列可以通过MQ配置来禁用,这样就不会堆积到DLQ队列,但对于这部分信息的状态就无法获取,好一点的方式是DLQ队列保留,增加对DLQ队列的处理措施,例如获取队列内的数据,找出原因,并做补救,如果通过MQ传递可靠消息,这个保留处理的步骤应该是必须的,另一方面是从消费端的应用上解决出现死信消息的原因。
DLQ是一个队列,通过普通的队列消费方式就可以拿到其中的消息,消息本身是一个ActiveMQObjectMessage.
通过Spring-JMS来处理这部分消息,spring的配置:
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactoryConsumer"/> <property name="destination" ref="dlqQueues"/> <property name="concurrentConsumers" value="5" /> <property name="messageListener"> <bean class="com.winxuan.mq.dlq.ReceiveDLQ"/> </property> </bean>
剩下就是实现一个messagelistener接口的类:
package com.winxuan.mq.dlq; import java.lang.reflect.Method; import java.sql.Date; import java.text.SimpleDateFormat; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import org.springframework.remoting.support.RemoteInvocation; /** * @author HideHai * */ public class ReceiveDLQ implements MessageListener{ @Override public void onMessage(Message message) { System.out.println(String.format("消息类型:%s",message.getClass()) ); if(message instanceof ObjectMessage){ processObjectMessage((ObjectMessage) message); } System.out.println(String.format("================================================================")); } private void processObjectMessage(ObjectMessage objectMessage){ try { System.out.println(String.format("消息编号:%s", objectMessage.getJMSMessageID()) ); System.out.println(String.format("消息目标队列:%s", objectMessage.getJMSDestination()) ); System.out.println(String.format("消息关联编号:%s", objectMessage.getJMSCorrelationID()) ); System.out.println(String.format("消息时间:%s", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date(objectMessage.getJMSTimestamp())))); Enumeration<Object> enumeration = objectMessage.getPropertyNames(); while(enumeration.hasMoreElements()){ String pKey = enumeration.nextElement().toString(); System.out.println(String.format("%s:%s",pKey,objectMessage.getStringProperty(pKey))); } Map<String,String> nonMap = getNonPublicMethod(objectMessage); for(Entry<String,String> entry:nonMap.entrySet()){ System.out.println(String.format("%s:%s",entry.getKey(),entry.getValue())); } Object object = objectMessage.getObject(); if(object instanceof RemoteInvocation){ //使用Spring封装后的远程方法 RemoteInvocation invocation = (RemoteInvocation) object; System.out.println(String.format("消息调用方法名称:%s,参数值:", invocation.getMethodName())); Object[] args = invocation.getArguments(); Class[] clazz = invocation.getParameterTypes(); if(args != null){ int argsLenth = args.length; for(int i = 0 ; i < argsLenth;i++){ System.out.println(String.format("类型: %s - 内容:%s",clazz[i],args[i])); } } } } catch (JMSException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } private Map<String,String> getNonPublicMethod(ObjectMessage objectMessage) throws Exception{ Map<String, String> map = new HashMap<String, String>(); Method[] methods = objectMessage.getClass().getMethods(); String[] name = new String[methods.length]; // Field.setAccessible(methods, true); for (int i = 0; i < name.length; i++) { name[i] = methods[i].getName(); // System.out.println(name[i]); if(name[i].equals("getOriginalDestination")){ //原始目的地 map.put("originalDestination", methods[i].invoke(objectMessage).toString()); break; }else if(name[i].equals("getJMSTimestamp")){ } } return map; } }
将消息转换为ObjectMessage获取其中的属性.
通过Debug,发现Message类中是有OriginalDestination属性的,此属性包含了原始的目的信息,在分析DLQ时,这个数据时很有用的,这部分信息目前是protected,需要通过反射获取.类似的还有进/出队列的时间等..
result:
消息类型:class org.apache.activemq.command.ActiveMQObjectMessage 消息编号:ID:n2.host.com-58921-1397145694637-0:2:1:1:28752 消息目标队列:queue://ActiveMQ.DLQ 消息关联编号:null 消息时间:2014-04-13 21:55:39 originalExpiration:0 dlqDeliveryFailureCause:java.lang.Throwable: TopicSubDiscard. ID:ID:n1.host.com-4277-1397145903677-0:1:1:1 originalDestination:topic://distribution.product 消息调用方法名称:updateStock,参数值: 类型: long - 内容:1200839005 类型: int - 内容:75