package org.apache.flink.streaming.runtime.tasks;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.NeverCompleteFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.class */
public class SystemProcessingTimeService implements TimerService {
    private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);
    private static final int STATUS_ALIVE = 0;
    private static final int STATUS_QUIESCED = 1;
    private static final int STATUS_SHUTDOWN = 2;
    private final ScheduledThreadPoolExecutor timerService;
    private final ExceptionHandler exceptionHandler;
    private final AtomicInteger status;
    private final CompletableFuture<Void> quiesceCompletedFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService$ExceptionHandler.class */
    public interface ExceptionHandler {
        void handleException(Exception exc);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService$ScheduledTask.class */
    public static final class ScheduledTask implements Runnable {
        private final AtomicInteger serviceStatus;
        private final ExceptionHandler exceptionHandler;
        private final ProcessingTimeService.ProcessingTimeCallback callback;
        private long nextTimestamp;
        private final long period;

        ScheduledTask(AtomicInteger atomicInteger, ExceptionHandler exceptionHandler, ProcessingTimeService.ProcessingTimeCallback processingTimeCallback, long j, long j2) {
            this.serviceStatus = atomicInteger;
            this.exceptionHandler = exceptionHandler;
            this.callback = processingTimeCallback;
            this.nextTimestamp = j;
            this.period = j2;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.serviceStatus.get() != 0) {
                return;
            }
            try {
                this.callback.onProcessingTime(this.nextTimestamp);
            } catch (Exception e) {
                this.exceptionHandler.handleException(e);
            }
            this.nextTimestamp += this.period;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService$ScheduledTaskExecutor.class */
    private class ScheduledTaskExecutor extends ScheduledThreadPoolExecutor {
        public ScheduledTaskExecutor(int i) {
            super(i);
        }

        public ScheduledTaskExecutor(int i, ThreadFactory threadFactory) {
            super(i, threadFactory);
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        protected void terminated() {
            super.terminated();
            SystemProcessingTimeService.this.quiesceCompletedFuture.complete(null);
        }
    }

    @VisibleForTesting
    SystemProcessingTimeService(ExceptionHandler exceptionHandler) {
        this(exceptionHandler, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SystemProcessingTimeService(ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {
        this.exceptionHandler = (ExceptionHandler) Preconditions.checkNotNull(exceptionHandler);
        this.status = new AtomicInteger(0);
        this.quiesceCompletedFuture = new CompletableFuture<>();
        if (threadFactory == null) {
            this.timerService = new ScheduledTaskExecutor(1);
        } else {
            this.timerService = new ScheduledTaskExecutor(1, threadFactory);
        }
        this.timerService.setRemoveOnCancelPolicy(true);
        this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    }

    @Override // org.apache.flink.api.common.operators.ProcessingTimeService
    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();
    }

    @Override // org.apache.flink.api.common.operators.ProcessingTimeService
    public ScheduledFuture<?> registerTimer(long j, ProcessingTimeService.ProcessingTimeCallback processingTimeCallback) {
        long processingTimeDelay = ProcessingTimeServiceUtil.getProcessingTimeDelay(j, getCurrentProcessingTime());
        try {
            return this.timerService.schedule(wrapOnTimerCallback(processingTimeCallback, j), processingTimeDelay, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            int i = this.status.get();
            if (i == 1) {
                return new NeverCompleteFuture(processingTimeDelay);
            }
            if (i == 2) {
                throw new IllegalStateException("Timer service is shut down");
            }
            throw e;
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback, long j, long j2) {
        return scheduleRepeatedly(processingTimeCallback, j, j2, false);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback, long j, long j2) {
        return scheduleRepeatedly(processingTimeCallback, j, j2, true);
    }

    private ScheduledFuture<?> scheduleRepeatedly(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback, long j, long j2, boolean z) {
        Runnable wrapOnTimerCallback = wrapOnTimerCallback(processingTimeCallback, getCurrentProcessingTime() + j, j2);
        try {
            return z ? this.timerService.scheduleWithFixedDelay(wrapOnTimerCallback, j, j2, TimeUnit.MILLISECONDS) : this.timerService.scheduleAtFixedRate(wrapOnTimerCallback, j, j2, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            int i = this.status.get();
            if (i == 1) {
                return new NeverCompleteFuture(j);
            }
            if (i == 2) {
                throw new IllegalStateException("Timer service is shut down");
            }
            throw e;
        }
    }

    @VisibleForTesting
    boolean isAlive() {
        return this.status.get() == 0;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.TimerService
    public boolean isTerminated() {
        return this.status.get() == 2;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public CompletableFuture<Void> quiesce() {
        if (this.status.compareAndSet(0, 1)) {
            this.timerService.shutdown();
        }
        return this.quiesceCompletedFuture;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.TimerService
    public void shutdownService() {
        if (this.status.compareAndSet(0, 2) || this.status.compareAndSet(1, 2)) {
            this.timerService.shutdownNow();
        }
    }

    @VisibleForTesting
    boolean shutdownAndAwaitPending(long j, TimeUnit timeUnit) throws InterruptedException {
        shutdownService();
        return this.timerService.awaitTermination(j, timeUnit);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.TimerService
    public boolean shutdownServiceUninterruptible(long j) {
        Deadline fromNow = Deadline.fromNow(Duration.ofMillis(j));
        boolean z = false;
        boolean z2 = false;
        do {
            try {
                z = shutdownAndAwaitPending(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                z2 = true;
                LOG.trace("Intercepted attempt to interrupt timer service shutdown.", e);
            }
            if (!fromNow.hasTimeLeft()) {
                break;
            }
        } while (!z);
        if (z2) {
            Thread.currentThread().interrupt();
        }
        return z;
    }

    protected void finalize() throws Throwable {
        super.finalize();
        this.timerService.shutdownNow();
    }

    @VisibleForTesting
    int getNumTasksScheduled() {
        BlockingQueue<Runnable> queue = this.timerService.getQueue();
        if (queue == null) {
            return 0;
        }
        return queue.size();
    }

    private Runnable wrapOnTimerCallback(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback, long j) {
        return new ScheduledTask(this.status, this.exceptionHandler, processingTimeCallback, j, 0L);
    }

    private Runnable wrapOnTimerCallback(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback, long j, long j2) {
        return new ScheduledTask(this.status, this.exceptionHandler, processingTimeCallback, j, j2);
    }
}
