package org.apache.flink.api.common.eventtime;

import java.time.Duration;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

@Public
/* loaded from: input_file:org/apache/flink/api/common/eventtime/WatermarksWithIdleness.class */
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {
    private final WatermarkGenerator<T> watermarks;
    private final IdlenessTimer idlenessTimer;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/api/common/eventtime/WatermarksWithIdleness$IdlenessTimer.class */
    static final class IdlenessTimer {
        private final Clock clock;
        private long counter;
        private long lastCounter;
        private long startOfInactivityNanos;
        private final long maxIdleTimeNanos;

        IdlenessTimer(Clock clock, Duration duration) {
            long j;
            this.clock = clock;
            try {
                j = duration.toNanos();
            } catch (ArithmeticException e) {
                j = Long.MAX_VALUE;
            }
            this.maxIdleTimeNanos = j;
        }

        public void activity() {
            this.counter++;
        }

        public boolean checkIfIdle() {
            if (this.counter != this.lastCounter) {
                this.lastCounter = this.counter;
                this.startOfInactivityNanos = 0L;
                return false;
            }
            if (this.startOfInactivityNanos != 0) {
                return this.clock.relativeTimeNanos() - this.startOfInactivityNanos > this.maxIdleTimeNanos;
            }
            this.startOfInactivityNanos = this.clock.relativeTimeNanos();
            return false;
        }
    }

    public WatermarksWithIdleness(WatermarkGenerator<T> watermarkGenerator, Duration duration) {
        this(watermarkGenerator, duration, SystemClock.getInstance());
    }

    @VisibleForTesting
    WatermarksWithIdleness(WatermarkGenerator<T> watermarkGenerator, Duration duration, Clock clock) {
        Preconditions.checkNotNull(duration, "idleTimeout");
        Preconditions.checkArgument((duration.isZero() || duration.isNegative()) ? false : true, "idleTimeout must be greater than zero");
        this.watermarks = (WatermarkGenerator) Preconditions.checkNotNull(watermarkGenerator, "watermarks");
        this.idlenessTimer = new IdlenessTimer(clock, duration);
    }

    @Override // org.apache.flink.api.common.eventtime.WatermarkGenerator
    public void onEvent(T t, long j, WatermarkOutput watermarkOutput) {
        this.watermarks.onEvent(t, j, watermarkOutput);
        this.idlenessTimer.activity();
    }

    @Override // org.apache.flink.api.common.eventtime.WatermarkGenerator
    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        if (this.idlenessTimer.checkIfIdle()) {
            watermarkOutput.markIdle();
        } else {
            this.watermarks.onPeriodicEmit(watermarkOutput);
        }
    }
}
