Flink源码分析:WindowOperator底层实现

上一篇文章介绍了 Flink窗口机制的执行流程,其实WindowOperator才是真正负责window中元素存储和计算流程的核心类。这篇文章主要就是分析一下WindowOperator的执行逻辑。

apply方法

接着上一篇从apply方法入手,先来看一下apply的代码逻辑。


private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
        // 生成operator的名字
    final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
    // 获取key的选择器
    KeySelector<T, K> keySel = input.getKeySelector();

    WindowOperator<K, T, Iterable<T>, R, W> operator;

    if (evictor != null) {
      @SuppressWarnings({"unchecked", "rawtypes"})
      TypeSerializer<StreamRecord<T>> streamRecordSerializer =
          (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

      ListStateDescriptor<StreamRecord<T>> stateDesc =
          new ListStateDescriptor<>("window-contents", streamRecordSerializer);

      operator =
        new EvictingWindowOperator<>(windowAssigner,
          windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
          keySel,
          input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
          stateDesc,
          function,
          trigger,
          evictor,
          allowedLateness,
          lateDataOutputTag);

    } else {
      // 定义ListState的状态描述
      ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
        input.getType().createSerializer(getExecutionEnvironment().getConfig()));

      // 构造windowOperator
      operator =
        new WindowOperator<>(windowAssigner,
          windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
          keySel,
          input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
          stateDesc,
          function,
          trigger,
          allowedLateness,
          lateDataOutputTag);
    }

    return input.transform(opName, resultType, operator);
  }

首先是生成operator的name,获取key的选择器,然后主要就是判断evictor是否为空,走不同的构造WindowOperator的逻辑,如果evictor不为空就构造EvictingWindowOperator对象,否则就构造WindowOperator对象,其实EvictingWindowOperator是WindowOperator的一个子类,只是多了一个删除数据的逻辑。我们下面以WindowOperator对象为主来进行分析。
先来看一下WindowOperator对象的继承关系图如下:

图片

简单说一下,WindowOperator继承了 AbstractUdfStreamOperator (这个前面也说过了,所以的operator都会继承自它),然后WindowOperator🈶实现了OneInputStreamOperator接口(这个前面也说过了),AbstractUdfStreamOperator又继承了AbstractStreamOperator这个对象,OneInputStreamOperator接口🈶继承了StreamOperator这个接口,AbstractStreamOperator对象也实现了StreamOperator接口。下面会在具体的分析。

WindowOperator构造方法


/**
   * Creates a new {@code WindowOperator} based on the given policies and user functions.
   */
  public WindowOperator(
      WindowAssigner<? super IN, W> windowAssigner,
      TypeSerializer<W> windowSerializer,
      KeySelector<IN, K> keySelector,
      TypeSerializer<K> keySerializer,
      StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
      InternalWindowFunction<ACC, OUT, K, W> windowFunction,
      Trigger<? super IN, ? super W> trigger,
      long allowedLateness,
      OutputTag<IN> lateDataOutputTag) {

    super(windowFunction);

    checkArgument(!(windowAssigner instanceof BaseAlignedWindowAssigner),
      "The " + windowAssigner.getClass().getSimpleName() + " cannot be used with a WindowOperator. " +
        "This assigner is only used with the AccumulatingProcessingTimeWindowOperator and " +
        "the AggregatingProcessingTimeWindowOperator");

    checkArgument(allowedLateness >= 0);

    checkArgument(windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized(),
        "window state serializer is not properly initialized");

    this.windowAssigner = checkNotNull(windowAssigner);
    this.windowSerializer = checkNotNull(windowSerializer);
    this.keySelector = checkNotNull(keySelector);
    this.keySerializer = checkNotNull(keySerializer);
    this.windowStateDescriptor = windowStateDescriptor;
    this.trigger = checkNotNull(trigger);
    this.allowedLateness = allowedLateness;
    this.lateDataOutputTag = lateDataOutputTag;

    setChainingStrategy(ChainingStrategy.ALWAYS);
  }

首先会根据给定的策略和自定义的方法构造WindowOperator对象,构造方法里面主要就是检查一系列的参数是否为空,然后初始化这些变量。

WindowOperator包含如下几个重要方法:

  • open:operator初始化的逻辑
  • processElement:新元素进入window的时候调用
  • onEventTime:event time计算触发时候的逻辑
  • onProcessingTime:processing time计算触发时候的逻辑

open方法:
先来看一下open方法里面都做了哪些初始化操作


@Override
  public void open() throws Exception {
    super.open();

    this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    timestampedCollector = new TimestampedCollector<>(output);

    internalTimerService =
        getInternalTimerService("window-timers", windowSerializer, this);

    triggerContext = new Context(null, null);
    processContext = new WindowContext(null);

    windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
      @Override
      public long getCurrentProcessingTime() {
        return internalTimerService.currentProcessingTime();
      }
    };

    // create (or restore) the state that hold the actual window contents
    // NOTE - the state may be null in the case of the overriding evicting window operator
    // 在这里它是不为空的就是刚才WindowedStream里面创建的
    if (windowStateDescriptor != null) {
      windowState = (InternalAppendingState<K, W, IN, ACC, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
    }

    // create the typed and helper states for merging windows
    // 这个是session window才会有的合并窗口用的
    if (windowAssigner instanceof MergingWindowAssigner) {

      // store a typed reference for the state of merging windows - sanity check
      if (windowState instanceof InternalMergingState) {
        windowMergingState = (InternalMergingState<K, W, IN, ACC, ACC>) windowState;
      }
      // TODO this sanity check should be here, but is prevented by an incorrect test (pending validation)
      // TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
      // TODO activate the sanity check once resolved
//      else if (windowState != null) {
//        throw new IllegalStateException(
//            "The window uses a merging assigner, but the window state is not mergeable.");
//      }

      @SuppressWarnings("unchecked")
      final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, W>>) (Class<?>) Tuple2.class;

      final TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>(
          typedTuple,
          new TypeSerializer[] {windowSerializer, windowSerializer});

      final ListStateDescriptor<Tuple2<W, W>> mergingSetsStateDescriptor =
          new ListStateDescriptor<>("merging-window-set", tupleSerializer);

      // get the state that stores the merging sets
      mergingSetsState = (InternalListState<K, VoidNamespace, Tuple2<W, W>>)
          getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
      mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
    }
  }

open方法里面主要就是初始化窗口的状态,如果是session window的话,会多初始化一个关于合并窗口的状态。

processElement方法

当数据达到window的时候,会调用windowoperator的processElement方法


@Override
  public void processElement(StreamRecord<IN> element) throws Exception {
    // windowAssigner先把数据分配到不同的窗口中
    final Collection<W> elementWindows = windowAssigner.assignWindows(
      element.getValue(), element.getTimestamp(), windowAssignerContext);

    //if element is handled by none of assigned elementWindows
    // 如果元素不是由指定的元素窗口处理的 (标记数据是否还需要处理 如果元素被处理过 返回fasle)
    boolean isSkippedElement = true;
    // 获取当前的key
    final K key = this.<K>getKeyedStateBackend().getCurrentKey();
    // 判断是否是session window
    if (windowAssigner instanceof MergingWindowAssigner) {
      MergingWindowSet<W> mergingWindows = getMergingWindowSet();

      for (W window: elementWindows) {

        // adding the new window might result in a merge, in that case the actualWindow
        // is the merged window and we work with that. If we don't merge then
        // actualWindow == window
        W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
          @Override
          public void merge(W mergeResult,
              Collection<W> mergedWindows, W stateWindowResult,
              Collection<W> mergedStateWindows) throws Exception {

            if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
              throw new UnsupportedOperationException("The end timestamp of an " +
                  "event-time window cannot become earlier than the current watermark " +
                  "by merging. Current watermark: " + internalTimerService.currentWatermark() +
                  " window: " + mergeResult);
            } else if (!windowAssigner.isEventTime()) {
              long currentProcessingTime = internalTimerService.currentProcessingTime();
              if (mergeResult.maxTimestamp() <= currentProcessingTime) {
                throw new UnsupportedOperationException("The end timestamp of a " +
                  "processing-time window cannot become earlier than the current processing time " +
                  "by merging. Current processing time: " + currentProcessingTime +
                  " window: " + mergeResult);
              }
            }

            triggerContext.key = key;
            triggerContext.window = mergeResult;

            triggerContext.onMerge(mergedWindows);

            for (W m: mergedWindows) {
              triggerContext.window = m;
              triggerContext.clear();
              deleteCleanupTimer(m);
            }

            // merge the merged state windows into the newly resulting state window
            windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
          }
        });

        // drop if the window is already late
        if (isWindowLate(actualWindow)) {
          mergingWindows.retireWindow(actualWindow);
          continue;
        }
        isSkippedElement = false;

        W stateWindow = mergingWindows.getStateWindow(actualWindow);
        if (stateWindow == null) {
          throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
        }

        windowState.setCurrentNamespace(stateWindow);
        windowState.add(element.getValue());

        triggerContext.key = key;
        triggerContext.window = actualWindow;

        TriggerResult triggerResult = triggerContext.onElement(element);
        // 窗口触发的时候
        if (triggerResult.isFire()) {
          // 从状态里面把数据拿出来
          ACC contents = windowState.get();
          if (contents == null) {
            continue;
          }
          emitWindowContents(actualWindow, contents);
        }

        if (triggerResult.isPurge()) {
          windowState.clear();
        }
        registerCleanupTimer(actualWindow);
      }

      // need to make sure to update the merging state in state
      mergingWindows.persist();
    } else {
      // 循环处理每一个窗口
      for (W window: elementWindows) {

        // drop if the window is already late
        // 如果窗口已经晚了就删除 如果水印在结束时间戳加上允许的延迟之后
        // 如果watermark超过了window_end_time + allowlate_time 就不需要处理了
        if (isWindowLate(window)) {
          continue;
        }
        // 标记为fasle
        isSkippedElement = false;
        // 设置窗口状态的namespace
        windowState.setCurrentNamespace(window);
        // 把数据先保存在windowState里面
        windowState.add(element.getValue());

        triggerContext.key = key;
        triggerContext.window = window;

        TriggerResult triggerResult = triggerContext.onElement(element);
        // 判断窗口是否触发
        if (triggerResult.isFire()) {
          ACC contents = windowState.get();
          if (contents == null) {
            continue;
          }
          // 发送数据到我们定义的function里面 触发窗口的计算逻辑
          emitWindowContents(window, contents);
        }
        // 如果是purge就清除窗口状态的数据
        if (triggerResult.isPurge()) {
          windowState.clear();
        }
        // 注册一个timer去删除窗口里面的数据
        registerCleanupTimer(window);
      }
    }

    // side output input event if
    // element not handled by any window
    // late arriving tag has been set
    // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
    // 设置了晚到时间 并且在晚到时间内数据达到 如果定义了测流输出就把数据用测流输出 否则就删除晚到的数据
    if (isSkippedElement && isElementLate(element)) {
      if (lateDataOutputTag != null){
        sideOutput(element);
      } else {
        // 迟到被丢弃的数据 + 1
        this.numLateRecordsDropped.inc();
      }
    }
  }

先是windowAssigner 把数据分配到不同的窗口中,然后获取当前的key,这个key就是keyby里面的那个key。然后又判断是否是session window 分别走两个不同的处理逻辑,因为session window和其他的window 的逻辑是不一样的,这里我们主要是分析不是session window的情况,也就是上面else里面的逻辑,循环处理每一个window,如果是迟到的窗口会直接忽略,设置当前窗口的namespace,把数据先保存到windowstate里面,判断窗口是否触发,如果触发就发送数据到我们定义的function里面 触发窗口的计算逻辑 ,如果触发了purge操作,则清空window中的内容 最后注册一个timer去删除窗口里面的数据 循环处理完后 判断数据是否晚到 并且在晚到时间内数据达到 如果定义了测流输出就把数据用测流输出 否则就删除晚到的数据 大体的逻辑就是这样,session window的逻辑和这个差不多,但是会有合并状态的过程,这里就不在分析了,有兴趣的可以自己看一下。

onEventTime方法

基于eventtime触发计算的时候会调用这个方法。

 @Override
  public void onEventTime(InternalTimer<K, W> timer) throws Exception {
    // 获取key和window
    triggerContext.key = timer.getKey();
    triggerContext.window = timer.getNamespace();

    MergingWindowSet<W> mergingWindows;
    // 如果是session window的话
    if (windowAssigner instanceof MergingWindowAssigner) {
      mergingWindows = getMergingWindowSet();
      // 获取给定的状态窗口 状态窗口是我们保留窗口的实际状态
      W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
      if (stateWindow == null) {
        // Timer firing for non-existent window, this can only happen if a
        // trigger did not clean up timers. We have already cleared the merging
        // window and therefore the Trigger state, however, so nothing to do.
        return;
      } else {
        // 设置当前窗口的命名空间
        windowState.setCurrentNamespace(stateWindow);
      }
    } else {
      // 设置当前窗口的命名空间
      windowState.setCurrentNamespace(triggerContext.window);
      mergingWindows = null;
    }

    TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
    // 判断是否需要触发计算
    if (triggerResult.isFire()) {
      // 获取窗口的数据 开始计算
      ACC contents = windowState.get();
      if (contents != null) {
        emitWindowContents(triggerContext.window, contents);
      }
    }
    // 如果是purge trigger 则删除窗口状态的数据
    if (triggerResult.isPurge()) {
      windowState.clear();
    }
    // 如果是event time类型,并且定时器触发时间是window的cleanup时间的时候,意味着该窗口的数据已经处理完毕,需要清除该窗口的所有状态
    if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
      clearAllState(triggerContext.window, windowState, mergingWindows);
    }
    // 持久化合并窗口的状态
    if (mergingWindows != null) {
      // need to make sure to update the merging state in state
      mergingWindows.persist();
    }
  }

基于processingtime的计算这里就不在分析了,跟上面的eventtime逻辑完全一样,只是TriggerResult 调用的时候时间不一样。
WindowOperator的源码还是比较多的,里面还有很多细节的地方,上面只是分析了主要的逻辑实现,细节方面还需要我们仔细去分析。

阅读原文

简介:主要专注于实时计算领域 会不定时更新 Flink Spark 系列原理以及源码分析的文章。欢迎关注微信公众号:JasonLee实时计算

声明:本文来自“JasonLee实时计算”,本文链接:https://www.zyxiao.com/p/298141  侵权投诉

发表评论

登录后才能评论
网站客服
网站客服
内容投稿 侵权处理
分享本页
返回顶部