歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> Linux教程 >> Storm Windowing storm滑動窗口簡介

Storm Windowing storm滑動窗口簡介

日期:2017/2/28 13:44:18   编辑:Linux教程

Storm Windowing

簡介

Storm可同時處理窗口內的所有tuple。窗口可以從時間或數量上來劃分,由如下兩個因素決定:

  • 窗口的長度,可以是時間間隔或Tuple數量;
  • 滑動間隔(sliding Interval),可以是時間間隔或Tuple數量;

要確保topo的過期時間大於窗口的大小加上滑動間隔

Sliding Window:滑動窗口

按照固定的時間間隔或者Tuple數量滑動窗口。

  • 如果滑動間隔和窗口大小一樣則等同於滾窗,
  • 如果滑動間隔大於窗口大小則會丟失數據,
  • 如果滑動間隔小於窗口大小則會窗口重疊。

Tumbling Window:滾動窗口

元組被單個窗口處理,一個元組只屬於一個窗口,不會有窗口重疊。
根據我自己的經驗其實一般用滾動就可以了

構造builder的時候支持以下的配置

(時間和數量的排列組合):

  • withWindow(Count windowLength, Count slidingInterval)
    滑窗 窗口長度:tuple數, 滑動間隔: tuple數
  • withWindow(Count windowLength)
    滑窗 窗口長度:tuple數, 滑動間隔: 每個tuple進來都滑
  • withWindow(Count windowLength, Duration slidingInterval)
    滑窗 窗口長度:tuple數, 滑動間隔: 時間間隔
  • withWindow(Duration windowLength, Duration slidingInterval)
    滑窗 窗口長度:時間間隔, 滑動間隔: 時間間隔
  • withWindow(Duration windowLength)
    滑窗 窗口長度:時間間隔, 滑動間隔: 每個tuple進來都滑
  • withWindow(Duration windowLength, Count slidingInterval)
    滑窗 窗口長度:時間間隔, 滑動間隔: 時間間隔
  • withTumblingWindow(BaseWindowedBolt.Count count)
    滾窗 窗口長度:Tuple數
  • withTumblingWindow(BaseWindowedBolt.Duration duration)
    滾窗 窗口長度:時間間隔

Tuple時間戳和亂序

storm支持追蹤源數據的時間戳。
Event time 和Process time
默認的時間戳是處理元組時的bolt窗口生成的,
Event time,事件時間,通常這個時間會帶在Tuple中;
Process time,到某一個處理環節的時間。
舉例:A今天早上9點告訴B,說C昨天晚上9點在濱江國際;
這條信息中,可以認為C在濱江國際的Event time是昨天晚上9點,B接收到這條信息的時間,即Process time,是今天早上9點。

配置時間戳字段(timestamp field)

windows按照時間劃分時,默認是Process time,也可以指定為Tuple中的Event time。
如果以Event time來劃分窗口:

  1. Tuple落入到哪個窗口,是看tuple裡的Event time。
  2. 窗口向後推進,主要依靠Event time的增長;
public BaseWindowedBolt withTimestampField(String fieldName)

延時(lag)和水位線(watermark)

從當前最後一條數據算起,往前減去lag,得到一個時間,這個時間就是watermark;
認為watermark之前的數據都已經到了。收到06:01:00的數據時,認為06:00:00的數據都到了。給他們入window。
這樣實際是一個延時處理,等到了06:01:00時,我才開始將06:00:00的數據放入窗口。

如果很不巧,06:00:00的數據在06:01:00之後,lag為60s,不好意思,進不了窗口。此數據不會被處理,並且會在worker的日志中加一行INFO信息。

public class SlidingWindowBolt extends BaseWindowedBolt {
    private OutputCollector collector;

    @Override
    publicvoidprepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    publicvoidexecute(TupleWindow inputWindow) {
      for(Tuple tuple: inputWindow.get()) {
        // do the windowing computation
        ...
      }
      // emit the results
      collector.emit(new Values(computedValue));
    }
}

publicstaticvoidmain(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();
     builder.setSpout("spout", new RandomSentenceSpout(), 1);
     builder.setBolt("slidingwindowbolt",
                     new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
                     1).shuffleGrouping("spout");
    Config conf = new Config();
    conf.setDebug(true);
    conf.setNumWorkers(1);

    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

}

Storm的安裝步驟 http://www.linuxidc.com/Linux/2016-08/134184.htm

Kafka-Storm 集成部署 http://www.linuxidc.com/Linux/2016-03/129063.htm

Storm在Ubuntu環境下的單機部署 http://www.linuxidc.com/Linux/2016-03/129060.htm

Copyright © Linux教程網 All Rights Reserved