Storm可同時處理窗口內的所有tuple。窗口可以從時間或數量上來劃分,由如下兩個因素決定:
要確保topo的過期時間大於窗口的大小加上滑動間隔
按照固定的時間間隔或者Tuple數量滑動窗口。
元組被單個窗口處理,一個元組只屬於一個窗口,不會有窗口重疊。
根據我自己的經驗其實一般用滾動就可以了
(時間和數量的排列組合):
storm支持追蹤源數據的時間戳。
Event time 和Process time
默認的時間戳是處理元組時的bolt窗口生成的,
Event time,事件時間,通常這個時間會帶在Tuple中;
Process time,到某一個處理環節的時間。
舉例:A今天早上9點告訴B,說C昨天晚上9點在濱江國際;
這條信息中,可以認為C在濱江國際的Event time是昨天晚上9點,B接收到這條信息的時間,即Process time,是今天早上9點。
windows按照時間劃分時,默認是Process time,也可以指定為Tuple中的Event time。
如果以Event time來劃分窗口:
public BaseWindowedBolt withTimestampField(String fieldName)
從當前最後一條數據算起,往前減去lag,得到一個時間,這個時間就是watermark;
認為watermark之前的數據都已經到了。收到06:01:00的數據時,認為06:00:00的數據都到了。給他們入window。
這樣實際是一個延時處理,等到了06:01:00時,我才開始將06:00:00的數據放入窗口。
如果很不巧,06:00:00的數據在06:01:00之後,lag為60s,不好意思,進不了窗口。此數據不會被處理,並且會在worker的日志中加一行INFO信息。
public class SlidingWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
publicvoidprepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
publicvoidexecute(TupleWindow inputWindow) {
for(Tuple tuple: inputWindow.get()) {
// do the windowing computation
...
}
// emit the results
collector.emit(new Values(computedValue));
}
}
publicstaticvoidmain(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("slidingwindowbolt",
new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
1).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
Storm的安裝步驟 http://www.linuxidc.com/Linux/2016-08/134184.htm
Kafka-Storm 集成部署 http://www.linuxidc.com/Linux/2016-03/129063.htm
Storm在Ubuntu環境下的單機部署 http://www.linuxidc.com/Linux/2016-03/129060.htm