获取ActiveMQ.DLQ的死信消息
使用ActiveMQ的持久消息后,大量死信消息会堆积在ActiveMQ.DLQ队列,如果数量增大,会对MQ的内存和磁盘造成一些影响。
DLQ队列中的消息来源,这里没有完全去测试,目前来看有客户端消费出现异常或者被事务回滚的消息。
DLQ队列可以通过MQ配置来禁用,这样就不会堆积到DLQ队列,但对于这部分信息的状态就无法获取,好一点的方式是DLQ队列保留,增加对DLQ队列的处理措施,例如获取队列内的数据,找出原因,并做补救,如果通过MQ传递可靠消息,这个保留处理的步骤应该是必须的,另一方面是从消费端的应用上解决出现死信消息的原因。
DLQ是一个队列,通过普通的队列消费方式就可以拿到其中的消息,消息本身是一个ActiveMQObjectMessage.
通过Spring-JMS来处理这部分消息,spring的配置:
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactoryConsumer"/> <property name="destination" ref="dlqQueues"/> <property name="concurrentConsumers" value="5" /> <property name="messageListener"> <bean class="com.winxuan.mq.dlq.ReceiveDLQ"/> </property> </bean>
剩下就是实现一个messagelistener接口的类:
package com.winxuan.mq.dlq;
import java.lang.reflect.Method;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import org.springframework.remoting.support.RemoteInvocation;
/**
* @author HideHai
*
*/
public class ReceiveDLQ implements MessageListener{
@Override
public void onMessage(Message message) {
System.out.println(String.format("消息类型:%s",message.getClass()) );
if(message instanceof ObjectMessage){
processObjectMessage((ObjectMessage) message);
}
System.out.println(String.format("================================================================"));
}
private void processObjectMessage(ObjectMessage objectMessage){
try {
System.out.println(String.format("消息编号:%s", objectMessage.getJMSMessageID()) );
System.out.println(String.format("消息目标队列:%s", objectMessage.getJMSDestination()) );
System.out.println(String.format("消息关联编号:%s", objectMessage.getJMSCorrelationID()) );
System.out.println(String.format("消息时间:%s", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(
new Date(objectMessage.getJMSTimestamp()))));
Enumeration<Object> enumeration = objectMessage.getPropertyNames();
while(enumeration.hasMoreElements()){
String pKey = enumeration.nextElement().toString();
System.out.println(String.format("%s:%s",pKey,objectMessage.getStringProperty(pKey)));
}
Map<String,String> nonMap = getNonPublicMethod(objectMessage);
for(Entry<String,String> entry:nonMap.entrySet()){
System.out.println(String.format("%s:%s",entry.getKey(),entry.getValue()));
}
Object object = objectMessage.getObject();
if(object instanceof RemoteInvocation){ //使用Spring封装后的远程方法
RemoteInvocation invocation = (RemoteInvocation) object;
System.out.println(String.format("消息调用方法名称:%s,参数值:", invocation.getMethodName()));
Object[] args = invocation.getArguments();
Class[] clazz = invocation.getParameterTypes();
if(args != null){
int argsLenth = args.length;
for(int i = 0 ; i < argsLenth;i++){
System.out.println(String.format("类型: %s - 内容:%s",clazz[i],args[i]));
}
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
private Map<String,String> getNonPublicMethod(ObjectMessage objectMessage) throws Exception{
Map<String, String> map = new HashMap<String, String>();
Method[] methods = objectMessage.getClass().getMethods();
String[] name = new String[methods.length];
// Field.setAccessible(methods, true);
for (int i = 0; i < name.length; i++) {
name[i] = methods[i].getName();
// System.out.println(name[i]);
if(name[i].equals("getOriginalDestination")){ //原始目的地
map.put("originalDestination", methods[i].invoke(objectMessage).toString());
break;
}else if(name[i].equals("getJMSTimestamp")){
}
}
return map;
}
}
将消息转换为ObjectMessage获取其中的属性.
通过Debug,发现Message类中是有OriginalDestination属性的,此属性包含了原始的目的信息,在分析DLQ时,这个数据时很有用的,这部分信息目前是protected,需要通过反射获取.类似的还有进/出队列的时间等..
result:
消息类型:class org.apache.activemq.command.ActiveMQObjectMessage 消息编号:ID:n2.host.com-58921-1397145694637-0:2:1:1:28752 消息目标队列:queue://ActiveMQ.DLQ 消息关联编号:null 消息时间:2014-04-13 21:55:39 originalExpiration:0 dlqDeliveryFailureCause:java.lang.Throwable: TopicSubDiscard. ID:ID:n1.host.com-4277-1397145903677-0:1:1:1 originalDestination:topic://distribution.product 消息调用方法名称:updateStock,参数值: 类型: long - 内容:1200839005 类型: int - 内容:75