package org.apache.flink.table.runtime.operators.window;

import java.time.ZoneId;
import java.util.Collection;
import java.util.Objects;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.operators.aggregate.RecordCounter;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.PanedWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.internal.GeneralWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.internal.MergingWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.internal.PanedWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperator.class */
public abstract class WindowOperator<K, W extends Window> extends AbstractStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W> {
    private static final long serialVersionUID = 1;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private final WindowAssigner<W> windowAssigner;
    private final Trigger<W> trigger;
    private final TypeSerializer<W> windowSerializer;
    private final LogicalType[] inputFieldTypes;
    private final LogicalType[] accumulatorTypes;
    private final LogicalType[] aggResultTypes;
    private final LogicalType[] windowPropertyTypes;
    protected final boolean produceUpdates;
    protected final ZoneId shiftTimeZone;
    private final int rowtimeIndex;
    private final long allowedLateness;
    protected final RecordCounter recordCounter;
    protected NamespaceAggsHandleFunctionBase<W> windowAggregator;
    protected transient InternalWindowProcessFunction<K, W> windowFunction;
    protected transient TimestampedCollector<RowData> collector;
    private transient InternalTimerService<W> internalTimerService;
    private transient InternalValueState<K, W, RowData> windowState;
    protected transient InternalValueState<K, W, RowData> previousState;
    private transient WindowOperator<K, W>.TriggerContext triggerContext;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;
    private transient Gauge<Long> watermarkLatency;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperator$TriggerContext.class */
    public class TriggerContext implements Trigger.OnMergeContext {
        private W window;
        private Collection<W> mergedWindows;

        private TriggerContext() {
        }

        public void open() throws Exception {
            WindowOperator.this.trigger.open(this);
        }

        boolean onElement(RowData rowData, long j) throws Exception {
            return WindowOperator.this.trigger.onElement(rowData, j, this.window);
        }

        boolean onProcessingTime(long j) throws Exception {
            return WindowOperator.this.trigger.onProcessingTime(j, this.window);
        }

        boolean onEventTime(long j) throws Exception {
            return WindowOperator.this.trigger.onEventTime(j, this.window);
        }

        void onMerge() throws Exception {
            WindowOperator.this.trigger.onMerge(this.window, this);
        }

        @Override // org.apache.flink.table.runtime.operators.window.triggers.Trigger.TriggerContext
        public long getCurrentProcessingTime() {
            return WindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.table.runtime.operators.window.triggers.Trigger.TriggerContext
        public long getCurrentWatermark() {
            return WindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.table.runtime.operators.window.triggers.Trigger.TriggerContext
        public MetricGroup getMetricGroup() {
            return WindowOperator.this.getMetricGroup();
        }

        @Override // org.apache.flink.table.runtime.operators.window.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            WindowOperator.this.internalTimerService.registerProcessingTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            WindowOperator.this.internalTimerService.registerEventTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.triggers.Trigger.TriggerContext
        public void deleteProcessingTimeTimer(long j) {
            WindowOperator.this.internalTimerService.deleteProcessingTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.triggers.Trigger.TriggerContext
        public void deleteEventTimeTimer(long j) {
            WindowOperator.this.internalTimerService.deleteEventTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.triggers.Trigger.TriggerContext
        public ZoneId getShiftTimeZone() {
            return WindowOperator.this.shiftTimeZone;
        }

        public void clear() throws Exception {
            WindowOperator.this.trigger.clear(this.window);
        }

        @Override // org.apache.flink.table.runtime.operators.window.triggers.Trigger.TriggerContext
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S) WindowOperator.this.getPartitionedState(this.window, WindowOperator.this.windowSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override // org.apache.flink.table.runtime.operators.window.triggers.Trigger.OnMergeContext
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows == null || this.mergedWindows.size() <= 0) {
                return;
            }
            try {
                State orCreateKeyedState = WindowOperator.this.getOrCreateKeyedState(WindowOperator.this.windowSerializer, stateDescriptor);
                if (!(orCreateKeyedState instanceof InternalMergingState)) {
                    throw new IllegalArgumentException("The given state descriptor does not refer to a mergeable state (MergingState)");
                }
                ((InternalMergingState) orCreateKeyedState).mergeNamespaces(this.window, this.mergedWindows);
            } catch (Exception e) {
                throw new RuntimeException("Error while merging state.", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperator$WindowContext.class */
    private class WindowContext implements InternalWindowProcessFunction.Context<K, W> {
        private WindowContext() {
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
            Objects.requireNonNull(stateDescriptor, "The state properties must not be null");
            return (S) WindowOperator.this.getPartitionedState(stateDescriptor);
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public K currentKey() {
            return (K) WindowOperator.this.currentKey();
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public long currentProcessingTime() {
            return WindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public long currentWatermark() {
            return WindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public ZoneId getShiftTimeZone() {
            return WindowOperator.this.shiftTimeZone;
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public RowData getWindowAccumulators(W w) throws Exception {
            WindowOperator.this.windowState.setCurrentNamespace(w);
            return (RowData) WindowOperator.this.windowState.value();
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public void setWindowAccumulators(W w, RowData rowData) throws Exception {
            WindowOperator.this.windowState.setCurrentNamespace(w);
            WindowOperator.this.windowState.update(rowData);
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public void clearWindowState(W w) throws Exception {
            WindowOperator.this.windowState.setCurrentNamespace(w);
            WindowOperator.this.windowState.clear();
            WindowOperator.this.windowAggregator.cleanup(w);
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public void clearPreviousState(W w) throws Exception {
            if (WindowOperator.this.previousState != null) {
                WindowOperator.this.previousState.setCurrentNamespace(w);
                WindowOperator.this.previousState.clear();
            }
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public void clearTrigger(W w) throws Exception {
            WindowOperator.this.triggerContext.window = w;
            WindowOperator.this.triggerContext.clear();
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public void deleteCleanupTimer(W w) throws Exception {
            long epochMillsForTimer = TimeWindowUtil.toEpochMillsForTimer(WindowOperator.this.cleanupTime(w), WindowOperator.this.shiftTimeZone);
            if (epochMillsForTimer == CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT) {
                return;
            }
            if (WindowOperator.this.windowAssigner.isEventTime()) {
                WindowOperator.this.triggerContext.deleteEventTimeTimer(epochMillsForTimer);
            } else {
                WindowOperator.this.triggerContext.deleteProcessingTimeTimer(epochMillsForTimer);
            }
        }

        @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction.Context
        public void onMerge(W w, Collection<W> collection) throws Exception {
            WindowOperator.this.triggerContext.window = w;
            WindowOperator.this.triggerContext.mergedWindows = collection;
            WindowOperator.this.triggerContext.onMerge();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowOperator(NamespaceAggsHandleFunctionBase<W> namespaceAggsHandleFunctionBase, WindowAssigner<W> windowAssigner, Trigger<W> trigger, TypeSerializer<W> typeSerializer, LogicalType[] logicalTypeArr, LogicalType[] logicalTypeArr2, LogicalType[] logicalTypeArr3, LogicalType[] logicalTypeArr4, int i, boolean z, long j, ZoneId zoneId, int i2) {
        Preconditions.checkArgument(j >= 0);
        this.windowAggregator = (NamespaceAggsHandleFunctionBase) Preconditions.checkNotNull(namespaceAggsHandleFunctionBase);
        this.windowAssigner = (WindowAssigner) Preconditions.checkNotNull(windowAssigner);
        this.trigger = (Trigger) Preconditions.checkNotNull(trigger);
        this.windowSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.inputFieldTypes = (LogicalType[]) Preconditions.checkNotNull(logicalTypeArr);
        this.accumulatorTypes = (LogicalType[]) Preconditions.checkNotNull(logicalTypeArr2);
        this.aggResultTypes = (LogicalType[]) Preconditions.checkNotNull(logicalTypeArr3);
        this.windowPropertyTypes = (LogicalType[]) Preconditions.checkNotNull(logicalTypeArr4);
        this.allowedLateness = j;
        this.produceUpdates = z;
        Preconditions.checkArgument(!windowAssigner.isEventTime() || i >= 0);
        this.rowtimeIndex = i;
        this.shiftTimeZone = zoneId;
        this.recordCounter = RecordCounter.of(i2);
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowOperator(WindowAssigner<W> windowAssigner, Trigger<W> trigger, TypeSerializer<W> typeSerializer, LogicalType[] logicalTypeArr, LogicalType[] logicalTypeArr2, LogicalType[] logicalTypeArr3, LogicalType[] logicalTypeArr4, int i, boolean z, long j, ZoneId zoneId, int i2) {
        Preconditions.checkArgument(j >= 0);
        this.windowAssigner = (WindowAssigner) Preconditions.checkNotNull(windowAssigner);
        this.trigger = (Trigger) Preconditions.checkNotNull(trigger);
        this.windowSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.inputFieldTypes = (LogicalType[]) Preconditions.checkNotNull(logicalTypeArr);
        this.accumulatorTypes = (LogicalType[]) Preconditions.checkNotNull(logicalTypeArr2);
        this.aggResultTypes = (LogicalType[]) Preconditions.checkNotNull(logicalTypeArr3);
        this.windowPropertyTypes = (LogicalType[]) Preconditions.checkNotNull(logicalTypeArr4);
        this.allowedLateness = j;
        this.produceUpdates = z;
        Preconditions.checkArgument(!windowAssigner.isEventTime() || i >= 0);
        this.rowtimeIndex = i;
        this.shiftTimeZone = zoneId;
        this.recordCounter = RecordCounter.of(i2);
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    protected abstract void compileGeneratedCode();

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector<>(this.output);
        this.collector.eraseTimestamp();
        this.internalTimerService = (InternalTimerService<W>) getInternalTimerService("window-timers", this.windowSerializer, this);
        this.triggerContext = new TriggerContext();
        this.triggerContext.open();
        this.windowState = (InternalValueState) getOrCreateKeyedState(this.windowSerializer, new ValueStateDescriptor("window-aggs", new RowDataSerializer(this.accumulatorTypes)));
        if (this.produceUpdates) {
            this.previousState = (InternalValueState) getOrCreateKeyedState(this.windowSerializer, new ValueStateDescriptor("previous-aggs", new RowDataSerializer((LogicalType[]) ArrayUtils.addAll(this.aggResultTypes, this.windowPropertyTypes))));
        }
        compileGeneratedCode();
        WindowContext windowContext = new WindowContext();
        this.windowAggregator.open(new PerWindowStateDataViewStore(getKeyedStateBackend(), this.windowSerializer, getRuntimeContext()));
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            this.windowFunction = new MergingWindowProcessFunction((MergingWindowAssigner) this.windowAssigner, this.windowAggregator, this.windowSerializer, this.allowedLateness);
        } else if (this.windowAssigner instanceof PanedWindowAssigner) {
            this.windowFunction = new PanedWindowProcessFunction((PanedWindowAssigner) this.windowAssigner, this.windowAggregator, this.allowedLateness);
        } else {
            this.windowFunction = new GeneralWindowProcessFunction(this.windowAssigner, this.windowAggregator, this.allowedLateness);
        }
        this.windowFunction.open(windowContext);
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.lateRecordsDroppedRate = this.metrics.meter(LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, new MeterView(this.numLateRecordsDropped));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long currentWatermark = this.internalTimerService.currentWatermark();
            if (currentWatermark < 0) {
                return 0L;
            }
            return Long.valueOf(this.internalTimerService.currentProcessingTime() - currentWatermark);
        });
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.collector = null;
        this.triggerContext = null;
        if (this.windowAggregator != null) {
            this.windowAggregator.close();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData value = streamRecord.getValue();
        long utcTimestampMills = TimeWindowUtil.toUtcTimestampMills(this.windowAssigner.isEventTime() ? value.getLong(this.rowtimeIndex) : this.internalTimerService.currentProcessingTime(), this.shiftTimeZone);
        boolean z = true;
        for (W w : this.windowFunction.assignStateNamespace(value, utcTimestampMills)) {
            z = false;
            this.windowState.setCurrentNamespace(w);
            RowData value2 = this.windowState.value();
            if (value2 == null) {
                value2 = this.windowAggregator.createAccumulators();
            }
            this.windowAggregator.setAccumulators(w, value2);
            if (RowDataUtil.isAccumulateMsg(value)) {
                this.windowAggregator.accumulate(value);
            } else {
                this.windowAggregator.retract(value);
            }
            this.windowState.update(this.windowAggregator.getAccumulators());
        }
        for (W w2 : this.windowFunction.assignActualWindows(value, utcTimestampMills)) {
            z = false;
            ((TriggerContext) this.triggerContext).window = w2;
            if (this.triggerContext.onElement(value, utcTimestampMills)) {
                emitWindowResult(w2);
            }
            registerCleanupTimer(w2);
        }
        if (z) {
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.Triggerable
    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        ((TriggerContext) this.triggerContext).window = internalTimer.getNamespace();
        if (this.triggerContext.onEventTime(internalTimer.getTimestamp())) {
            emitWindowResult(((TriggerContext) this.triggerContext).window);
        }
        if (this.windowAssigner.isEventTime()) {
            this.windowFunction.cleanWindowIfNeeded(((TriggerContext) this.triggerContext).window, internalTimer.getTimestamp());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.Triggerable
    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        ((TriggerContext) this.triggerContext).window = internalTimer.getNamespace();
        if (this.triggerContext.onProcessingTime(internalTimer.getTimestamp())) {
            emitWindowResult(((TriggerContext) this.triggerContext).window);
        }
        if (this.windowAssigner.isEventTime()) {
            return;
        }
        this.windowFunction.cleanWindowIfNeeded(((TriggerContext) this.triggerContext).window, internalTimer.getTimestamp());
    }

    protected abstract void emitWindowResult(W w) throws Exception;

    private void registerCleanupTimer(W w) {
        long epochMillsForTimer = TimeWindowUtil.toEpochMillsForTimer(cleanupTime(w), this.shiftTimeZone);
        if (epochMillsForTimer == CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.registerEventTimeTimer(epochMillsForTimer);
        } else {
            this.triggerContext.registerProcessingTimeTimer(epochMillsForTimer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long cleanupTime(W w) {
        if (!this.windowAssigner.isEventTime()) {
            return Math.max(0L, w.maxTimestamp());
        }
        long max = Math.max(0L, w.maxTimestamp() + this.allowedLateness);
        return max >= w.maxTimestamp() ? max : CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public K currentKey() {
        return (K) getCurrentKey();
    }

    protected Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }

    protected Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }
}
