博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm高级原语-Transactional topology
阅读量:6999 次
发布时间:2019-06-27

本文共 5151 字,大约阅读时间需要 17 分钟。

参考:

示例代码:

package com.lky.topology;import java.math.BigInteger;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.junit.Test;import com.lky.util.FileUtil;import com.lky.util.RunStorm;import backtype.storm.Config;import backtype.storm.coordination.BatchOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.testing.MemoryTransactionalSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBatchBolt;import backtype.storm.topology.base.BaseTransactionalBolt;import backtype.storm.transactional.ICommitter;import backtype.storm.transactional.TransactionAttempt;import backtype.storm.transactional.TransactionalTopologyBuilder;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;@SuppressWarnings({ "deprecation", "serial", "rawtypes" })/** * @Title: TransactionalGlobalCount.java  * @Package com.lky.topology  * @Description: 事务topology(模拟实时统计消息数量) * @author lky  * @date 2015年10月25日 上午11:23:12  * @version V1.0 */public class TransactionalGlobalCount {    private static Log log=LogFactory.getLog(TransactionalGlobalCount.class);    public static final int PARTITION_TAKE_PER_BATCH = 3;    public static final Map
>> DATA = new HashMap
>>() { { put(0, new ArrayList
>() { { add(new Values("cat")); add(new Values("dog")); add(new Values("chicken")); add(new Values("cat")); add(new Values("dog")); add(new Values("apple")); } }); put(1, new ArrayList
>() { { add(new Values("cat")); add(new Values("dog")); add(new Values("apple")); add(new Values("banana")); } }); put(2, new ArrayList
>() { { add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("dog")); add(new Values("dog")); add(new Values("dog")); add(new Values("dog")); } }); } }; public static class Value { int count = 0; BigInteger txid; } public static Map
DATABASE = new HashMap
(); public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT"; /** * @Title: TransactionalGlobalCount.java * @Package com.lky.topology * @Description: processing阶段(可以并行处理) * @author lky * @date 2015年10月25日 下午12:14:26 * @version V1.0 */ public static class BatchCount extends BaseBatchBolt { BatchOutputCollector collector; Object id; Integer _count = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { this.collector = collector; this.id = id; } @Override public void execute(Tuple tuple) { _count++; log.info("-------------->"+_count); } @Override public void finishBatch() { log.info("--------"+_count+"----------"); collector.emit(new Values(id, _count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "count")); } } /** * @Title: TransactionalGlobalCount.java * @Package com.lky.topology * @Description: committer 汇总阶段(强顺序流) * @author lky * @date 2015年10月25日 下午12:14:54 * @version V1.0 */ public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter { BatchOutputCollector collector; TransactionAttempt id; Integer _size = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { this.collector = collector; this.id = id; } @Override public void execute(Tuple tuple) { Integer sum = tuple.getInteger(1); log.info("sum---------->"+sum); if (sum > 0) { _size += sum; } } @Override public void finishBatch() { Value oldValue = DATABASE.get(GLOBAL_COUNT_KEY); Value newValue; // 如果没有存储过,或者有新的事务到达,更新 if (null == oldValue || !oldValue.txid.equals(id.getTransactionId())) { newValue = new Value(); newValue.txid = id.getTransactionId(); if (null == oldValue) { newValue.count = _size; } else { newValue.count = _size + oldValue.count; collector.emit(new Values(id, newValue.count)); FileUtil.strToFile(Integer.valueOf(newValue.count).toString(), "sum.txt", true); } DATABASE.put(GLOBAL_COUNT_KEY, newValue); } else { newValue = oldValue; } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "size")); } } @Test public void test() { MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3); builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout"); builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count"); Config config = new Config(); config.setDebug(true); config.setMaxSpoutPending(3); RunStorm.runStormLocally(builder.buildTopology(), "ss", config, 5); }}

转载于:https://www.cnblogs.com/dmir/p/4908611.html

你可能感兴趣的文章