参考:
示例代码:
package com.lky.topology;import java.math.BigInteger;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.junit.Test;import com.lky.util.FileUtil;import com.lky.util.RunStorm;import backtype.storm.Config;import backtype.storm.coordination.BatchOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.testing.MemoryTransactionalSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBatchBolt;import backtype.storm.topology.base.BaseTransactionalBolt;import backtype.storm.transactional.ICommitter;import backtype.storm.transactional.TransactionAttempt;import backtype.storm.transactional.TransactionalTopologyBuilder;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;@SuppressWarnings({ "deprecation", "serial", "rawtypes" })/** * @Title: TransactionalGlobalCount.java * @Package com.lky.topology * @Description: 事务topology(模拟实时统计消息数量) * @author lky * @date 2015年10月25日 上午11:23:12 * @version V1.0 */public class TransactionalGlobalCount { private static Log log=LogFactory.getLog(TransactionalGlobalCount.class); public static final int PARTITION_TAKE_PER_BATCH = 3; public static final Map >> DATA = new HashMap >>() { { put(0, new ArrayList