Storm学习笔记二

Storm学习笔记二

前面了解了Storm的一些基本概念,这篇笔记来搭建一个日志收集系统,Storm在系统中充当一个管道和过滤器,实现从Redis上读取数据,并进行数据索引。

T1

这是架构的一个演变,通过第一种方式,就能实现最基本的日志收集和展示,这里加入redis和storm,提高系统的性能和扩展性。

依赖环境:

  • JDK1.6.X
  • Logstash-1.4.2
  • ElasticSearch 0.9
  • Kibana-3.1.2
  • Redis
  • Storm
  • ZeroMQ
  • Jzmq
  • Zookeeper

需要自己实现的处理逻辑类:

LogSpout :    从Redis持续的读入日志信息,并封装输出给Stream

 

public class LogSpout extends BaseRichSpout{
	
	private static Logger LOG = Logger.getLogger(LogSpout.class);
	private Jedis jedis;
	private SpoutOutputCollector collector;

	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		jedis = new Jedis(Conf.DEFAULT_JEDIS_HOST,Integer.parseInt(Conf.DEFAULT_JEDIS_PORT),2000); 
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		String content = jedis.rpop("logstash.test");
		if(StringUtils.isEmpty(content)) {
			Utils.sleep(1000);
		}else {
			JSONObject jsonObject = (JSONObject) JSONValue.parse(content);
			LogEntry entry = new LogEntry(jsonObject);
			collector.emit(new Values(entry));
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields(LogEntry.LOG_ENTRY));
	}

 

LogRulesBolt:接收Stream中LogSpout的输出信息,并利用规则对内容调整过滤

 

public class LogRulesBolt extends BaseRichBolt {
	
	private static final long serialVersionUID = 1L;
	public static Logger LOG = Logger.getLogger(LogRulesBolt.class);
	private StatelessKnowledgeSession ksession;
	private OutputCollector collector;

	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
		KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
		kbuilder.add( ResourceFactory.newClassPathResource("/Syslog.drl",getClass()), ResourceType.DRL );
		if (kbuilder.hasErrors() ) {
		    LOG.error(kbuilder.getErrors().toString() );
		}
		KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
		kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() );	
		ksession = kbase.newStatelessKnowledgeSession();
	}

	@Override
	public void execute(Tuple input) {
		LogEntry entry = (LogEntry)input.getValueByField(LogEntry.LOG_ENTRY);
		if(entry == null){
			LOG.info( "Received null or incorrect value from tuple" );
			return;
		}
		ksession.execute(entry);
		System.out.println(entry.isFilter());
		if(!entry.isFilter()){
			LOG.debug("Emitting from Rules Bolt");
			collector.emit(new Values(entry));
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields(LogEntry.LOG_ENTRY));
	}

}

 

IndexerBolt: 接收Stream中LogRulesBolt输出的信息,并调用远程ElansticSearch进行索引

	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", true)
				.put("cluster_name", "elasticsearch").build();
		client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("10.100.15.177", 9300)); 
		this.collector = collector;
	}

	@Override
	public void execute(Tuple input) {
		LogEntry entry = (LogEntry) input.getValueByField(LogEntry.LOG_ENTRY);
		if(entry == null){
			LOG.fatal("Received null");
			return;
		}
		String toBeIndexed = entry.toJson().toJSONString();
		if(!StringUtils.isEmpty(toBeIndexed)) {
			LOG.info(toBeIndexed);
			IndexResponse response = client.prepareIndex(INDEX_NAME, INDEX_TYPE).setSource(toBeIndexed)
					.execute().actionGet();
			if(response == null) {
				LOG.error(String.format("Filed to index Tuple: %s", input.toString()));
			}else {
				if(response.getId() == null) {
					LOG.error(String.format("Filed to index Tuple: %s", input.toString()));
				}else {
					collector.emit(new Values(entry,response.getId()));
				}
			}
		}
	}

LogTopology: 封装执行步骤

private TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("logSpout", new LogSpout(),1);
		builder.setBolt("logRules", new LogRulesBolt(), 1).shuffleGrouping("logSpout");
		builder.setBolt("indexer", new IndexerBolt(), 1).shuffleGrouping("logRules");

StormSubmitter.submitTopology(name, conf, builder.createTopology());

注意事项:

  • 代码编译的JDK版本需要和Storm部署环境一致
  • 需要将应用依赖打包提交
  • 和Storm重复的Jar依赖需要通过scope=provided的方式隔离
  • 和Storm依赖版本冲突的jar,以Storm的为准

提交命令:

./storm jar ../wx/hh.storm.jar com.winxuan.storm.log.LogTopology logStorm

提交后,Storm会一直执行LogSpout的nextTuple方法,从redis中获取指定key的日志信息,如果没有读取到则休眠1s,

读取到的日志被封装成自定义的日志对象,并提交给storm集群的stream,后续的rulesbolt和indexbolt分别从stream中拿到属于自己的tuple进行数据过滤以及数据索引。

正常执行的数据会被成功的索引到ElasticSearch,通过Kibana的API图形化界面,自定义一个面板,我们就能看到通过agent收集过来的日志信息,并能进行相应的图表展示和检索。

log

留下回复