Storm学习笔记二
前面了解了Storm的一些基本概念,这篇笔记来搭建一个日志收集系统,Storm在系统中充当一个管道和过滤器,实现从Redis上读取数据,并进行数据索引。
这是架构的一个演变,通过第一种方式,就能实现最基本的日志收集和展示,这里加入redis和storm,提高系统的性能和扩展性。
依赖环境:
- JDK1.6.X
- Logstash-1.4.2
- ElasticSearch 0.9
- Kibana-3.1.2
- Redis
- Storm
- ZeroMQ
- Jzmq
- Zookeeper
需要自己实现的处理逻辑类:
LogSpout : 从Redis持续的读入日志信息,并封装输出给Stream
public class LogSpout extends BaseRichSpout{
private static Logger LOG = Logger.getLogger(LogSpout.class);
private Jedis jedis;
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
jedis = new Jedis(Conf.DEFAULT_JEDIS_HOST,Integer.parseInt(Conf.DEFAULT_JEDIS_PORT),2000);
this.collector = collector;
}
@Override
public void nextTuple() {
String content = jedis.rpop("logstash.test");
if(StringUtils.isEmpty(content)) {
Utils.sleep(1000);
}else {
JSONObject jsonObject = (JSONObject) JSONValue.parse(content);
LogEntry entry = new LogEntry(jsonObject);
collector.emit(new Values(entry));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(LogEntry.LOG_ENTRY));
}
LogRulesBolt:接收Stream中LogSpout的输出信息,并利用规则对内容调整过滤
public class LogRulesBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
public static Logger LOG = Logger.getLogger(LogRulesBolt.class);
private StatelessKnowledgeSession ksession;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add( ResourceFactory.newClassPathResource("/Syslog.drl",getClass()), ResourceType.DRL );
if (kbuilder.hasErrors() ) {
LOG.error(kbuilder.getErrors().toString() );
}
KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() );
ksession = kbase.newStatelessKnowledgeSession();
}
@Override
public void execute(Tuple input) {
LogEntry entry = (LogEntry)input.getValueByField(LogEntry.LOG_ENTRY);
if(entry == null){
LOG.info( "Received null or incorrect value from tuple" );
return;
}
ksession.execute(entry);
System.out.println(entry.isFilter());
if(!entry.isFilter()){
LOG.debug("Emitting from Rules Bolt");
collector.emit(new Values(entry));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(LogEntry.LOG_ENTRY));
}
}
IndexerBolt: 接收Stream中LogRulesBolt输出的信息,并调用远程ElansticSearch进行索引
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", true)
.put("cluster_name", "elasticsearch").build();
client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("10.100.15.177", 9300));
this.collector = collector;
}
@Override
public void execute(Tuple input) {
LogEntry entry = (LogEntry) input.getValueByField(LogEntry.LOG_ENTRY);
if(entry == null){
LOG.fatal("Received null");
return;
}
String toBeIndexed = entry.toJson().toJSONString();
if(!StringUtils.isEmpty(toBeIndexed)) {
LOG.info(toBeIndexed);
IndexResponse response = client.prepareIndex(INDEX_NAME, INDEX_TYPE).setSource(toBeIndexed)
.execute().actionGet();
if(response == null) {
LOG.error(String.format("Filed to index Tuple: %s", input.toString()));
}else {
if(response.getId() == null) {
LOG.error(String.format("Filed to index Tuple: %s", input.toString()));
}else {
collector.emit(new Values(entry,response.getId()));
}
}
}
}
LogTopology: 封装执行步骤
private TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("logSpout", new LogSpout(),1);
builder.setBolt("logRules", new LogRulesBolt(), 1).shuffleGrouping("logSpout");
builder.setBolt("indexer", new IndexerBolt(), 1).shuffleGrouping("logRules");
StormSubmitter.submitTopology(name, conf, builder.createTopology());
注意事项:
- 代码编译的JDK版本需要和Storm部署环境一致
- 需要将应用依赖打包提交
- 和Storm重复的Jar依赖需要通过scope=provided的方式隔离
- 和Storm依赖版本冲突的jar,以Storm的为准
提交命令:
./storm jar ../wx/hh.storm.jar com.winxuan.storm.log.LogTopology logStorm
提交后,Storm会一直执行LogSpout的nextTuple方法,从redis中获取指定key的日志信息,如果没有读取到则休眠1s,
读取到的日志被封装成自定义的日志对象,并提交给storm集群的stream,后续的rulesbolt和indexbolt分别从stream中拿到属于自己的tuple进行数据过滤以及数据索引。
正常执行的数据会被成功的索引到ElasticSearch,通过Kibana的API图形化界面,自定义一个面板,我们就能看到通过agent收集过来的日志信息,并能进行相应的图表展示和检索。
—

