package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.api.common.functions.DummyMerger;
import org.apache.flink.api.common.functions.HashPartitioner;
import org.apache.flink.api.common.functions.NaturalComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state2.FieldBasedPartitioner;
import org.apache.flink.runtime.state2.GroupSet;
import org.apache.flink.runtime.state2.InternalState;
import org.apache.flink.runtime.state2.InternalStateDescriptor;
import org.apache.flink.runtime.state2.InternalStateDescriptorBuilder;
import org.apache.flink.runtime.state2.Scope;
import org.apache.flink.streaming.runtime.tasks.EventTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.types.Pair;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.class */
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<K, N> {
    private static final byte PROCESSING_TIME_TAG = 112;
    private static final byte EVENT_TIME_TAG = 101;
    private static final int TIMESTAMP_FIELD_INDEX = 1;
    private static final int KEY_FIELD_INDEX = 2;
    private static final int NAMESPACE_FIELD_INDEX = 3;
    private static final byte DUMMY_BYTE = 0;
    private final InternalState timerState;
    private final ProcessingTimeService processingTimeService;
    private final EventTimeService eventTimeService;
    private final Triggerable<K, N> triggerTarget;
    private final Map<Integer, InternalTimer<K, N>> headProcessingTimerPerGroup;
    private final Map<Integer, InternalTimer<K, N>> headEventTimerPerGroup;
    private volatile ScheduledFuture<?> nextProcessingTimeEvent;
    private volatile InternalTimer<K, N> headProcessingTimer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceImpl$TriggerSortedTimers.class */
    public class TriggerSortedTimers implements Iterable<InternalTimer<K, N>> {
        private PriorityQueue<InternalTimerServiceImpl<K, N>.TriggerSortedTimers.GroupIterateNode> groupNodes;
        private volatile InternalTimerServiceImpl<K, N>.TriggerSortedTimers.GroupIterateNode headGroupNode;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceImpl$TriggerSortedTimers$GroupIterateNode.class */
        public class GroupIterateNode implements Comparable<InternalTimerServiceImpl<K, N>.TriggerSortedTimers.GroupIterateNode> {
            private final List<InternalTimer<K, N>> list;
            private int currentIndex;

            private GroupIterateNode(List<InternalTimer<K, N>> list) {
                Preconditions.checkState((list == null || list.isEmpty()) ? false : true);
                this.list = list;
                this.currentIndex = 0;
            }

            InternalTimer<K, N> getCurrentTimer() {
                return this.list.get(this.currentIndex);
            }

            boolean hasNext() {
                return this.currentIndex < this.list.size() - 1;
            }

            void next() {
                this.currentIndex++;
            }

            @Override // java.lang.Comparable
            public int compareTo(InternalTimerServiceImpl<K, N>.TriggerSortedTimers.GroupIterateNode groupIterateNode) {
                return InternalTimerServiceImpl.this.compareTimers(getCurrentTimer(), groupIterateNode.getCurrentTimer());
            }
        }

        TriggerSortedTimers(Collection<List<InternalTimer<K, N>>> collection) {
            Preconditions.checkNotNull(collection);
            this.groupNodes = new PriorityQueue<>();
            for (List<InternalTimer<K, N>> list : collection) {
                if (!list.isEmpty()) {
                    this.groupNodes.add(new GroupIterateNode(list));
                }
            }
        }

        @Override // java.lang.Iterable
        public Iterator<InternalTimer<K, N>> iterator() {
            return new Iterator<InternalTimer<K, N>>() { // from class: org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.TriggerSortedTimers.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    return !TriggerSortedTimers.this.groupNodes.isEmpty() || (TriggerSortedTimers.this.headGroupNode != null && TriggerSortedTimers.this.headGroupNode.hasNext());
                }

                @Override // java.util.Iterator
                public InternalTimer<K, N> next() {
                    if (!hasNext()) {
                        throw new NoSuchElementException();
                    }
                    if (TriggerSortedTimers.this.headGroupNode != null && TriggerSortedTimers.this.headGroupNode.hasNext()) {
                        TriggerSortedTimers.this.headGroupNode.next();
                        TriggerSortedTimers.this.groupNodes.add(TriggerSortedTimers.this.headGroupNode);
                    }
                    TriggerSortedTimers.this.headGroupNode = (GroupIterateNode) TriggerSortedTimers.this.groupNodes.poll();
                    return TriggerSortedTimers.this.headGroupNode.getCurrentTimer();
                }
            };
        }
    }

    public InternalTimerServiceImpl(InternalState internalState, ProcessingTimeService processingTimeService, EventTimeService eventTimeService, Triggerable<K, N> triggerable) {
        Preconditions.checkNotNull(internalState);
        Preconditions.checkNotNull(processingTimeService);
        Preconditions.checkNotNull(eventTimeService);
        Preconditions.checkNotNull(triggerable);
        this.timerState = internalState;
        this.processingTimeService = processingTimeService;
        this.eventTimeService = eventTimeService;
        this.triggerTarget = triggerable;
        this.headProcessingTimerPerGroup = new HashMap();
        this.headEventTimerPerGroup = new HashMap();
        initialize();
    }

    public static <K, N> InternalStateDescriptor createInternalStateDescriptor(String str, Scope scope, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(typeSerializer);
        Preconditions.checkNotNull(typeSerializer2);
        return new InternalStateDescriptorBuilder(str).addKeyColumn("tag", ByteSerializer.INSTANCE).addKeyColumn("timestamp", LongSerializer.INSTANCE, new NaturalComparator()).addKeyColumn("key", typeSerializer).addKeyColumn("namespace", typeSerializer2).addValueColumn("dummy", ByteSerializer.INSTANCE, new DummyMerger()).setPartitioner(new FieldBasedPartitioner(KEY_FIELD_INDEX, HashPartitioner.INSTANCE)).setScope(scope).getDescriptor();
    }

    private void initialize() {
        GroupSet partitionGroups = this.timerState.getPartitionGroups();
        this.headProcessingTimer = null;
        Iterator it = partitionGroups.iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            Iterator prefixIterator = this.timerState.prefixIterator(intValue, Row.of(new Object[]{(byte) 112}));
            if (prefixIterator.hasNext()) {
                InternalTimer<K, N> timerFromPair = getTimerFromPair((Pair) prefixIterator.next());
                this.headProcessingTimerPerGroup.put(Integer.valueOf(intValue), timerFromPair);
                if (compareTimers(timerFromPair, this.headProcessingTimer) < 0) {
                    this.headProcessingTimer = timerFromPair;
                }
            }
        }
        if (this.headProcessingTimer != null) {
            this.nextProcessingTimeEvent = this.processingTimeService.registerTimer(this.headProcessingTimer.getTimestamp(), this);
        }
        Iterator it2 = partitionGroups.iterator();
        while (it2.hasNext()) {
            int intValue2 = ((Integer) it2.next()).intValue();
            Iterator prefixIterator2 = this.timerState.prefixIterator(intValue2, Row.of(new Object[]{(byte) 101}));
            if (prefixIterator2.hasNext()) {
                this.headEventTimerPerGroup.put(Integer.valueOf(intValue2), getTimerFromPair((Pair) prefixIterator2.next()));
            }
        }
        this.eventTimeService.registerCallback(this);
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public long currentProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public long currentWatermark() {
        return this.eventTimeService.getCurrentWatermark();
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void registerProcessingTimeTimer(K k, N n, long j) {
        Preconditions.checkState(j >= 0, "Registered processing-time timer should have non-negative timestamp.");
        this.timerState.put(Row.of(new Object[]{(byte) 112, Long.valueOf(j), k, n}), Row.of(new Object[]{(byte) 0}));
        InternalTimer<K, N> internalTimer = new InternalTimer<>(j, k, n);
        int partition = HashPartitioner.INSTANCE.partition(k, this.timerState.getNumGroups());
        if (compareTimers(internalTimer, this.headProcessingTimerPerGroup.get(Integer.valueOf(partition))) < 0) {
            this.headProcessingTimerPerGroup.put(Integer.valueOf(partition), internalTimer);
            if (compareTimers(internalTimer, this.headProcessingTimer) < 0) {
                if (this.nextProcessingTimeEvent != null) {
                    this.nextProcessingTimeEvent.cancel(false);
                    this.nextProcessingTimeEvent = null;
                }
                this.headProcessingTimer = internalTimer;
                if (this.headProcessingTimer != null) {
                    this.nextProcessingTimeEvent = this.processingTimeService.registerTimer(this.headProcessingTimer.getTimestamp(), this);
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void deleteProcessingTimeTimer(K k, N n, long j) {
        this.timerState.remove(Row.of(new Object[]{(byte) 112, Long.valueOf(j), k, n}));
        InternalTimer internalTimer = new InternalTimer(j, k, n);
        int partition = HashPartitioner.INSTANCE.partition(k, this.timerState.getNumGroups());
        InternalTimer<K, N> internalTimer2 = this.headProcessingTimerPerGroup.get(Integer.valueOf(partition));
        if (internalTimer.equals(internalTimer2)) {
            Iterator tailIterator = this.timerState.tailIterator(partition, Row.of(new Object[]{(byte) 112}), Long.valueOf(internalTimer2.getTimestamp()));
            this.headProcessingTimerPerGroup.put(Integer.valueOf(partition), tailIterator.hasNext() ? getTimerFromPair((Pair) tailIterator.next()) : null);
            if (internalTimer.equals(this.headProcessingTimer)) {
                if (this.nextProcessingTimeEvent != null) {
                    this.nextProcessingTimeEvent.cancel(false);
                    this.nextProcessingTimeEvent = null;
                }
                this.headProcessingTimer = null;
                for (InternalTimer<K, N> internalTimer3 : this.headProcessingTimerPerGroup.values()) {
                    if (compareTimers(internalTimer3, this.headProcessingTimer) < 0) {
                        this.headProcessingTimer = internalTimer3;
                    }
                }
                if (this.headProcessingTimer != null) {
                    this.nextProcessingTimeEvent = this.processingTimeService.registerTimer(this.headProcessingTimer.getTimestamp(), this);
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void registerEventTimeTimer(K k, N n, long j) {
        Preconditions.checkState(j >= 0, "Registered event-time timer should have non-negative timestamp.");
        this.timerState.put(Row.of(new Object[]{(byte) 101, Long.valueOf(j), k, n}), Row.of(new Object[]{(byte) 0}));
        InternalTimer<K, N> internalTimer = new InternalTimer<>(j, k, n);
        int partition = HashPartitioner.INSTANCE.partition(k, this.timerState.getNumGroups());
        if (compareTimers(internalTimer, this.headEventTimerPerGroup.get(Integer.valueOf(partition))) < 0) {
            this.headEventTimerPerGroup.put(Integer.valueOf(partition), internalTimer);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void deleteEventTimeTimer(K k, N n, long j) {
        this.timerState.remove(Row.of(new Object[]{(byte) 101, Long.valueOf(j), k, n}));
        InternalTimer internalTimer = new InternalTimer(j, k, n);
        int partition = HashPartitioner.INSTANCE.partition(k, this.timerState.getNumGroups());
        InternalTimer<K, N> internalTimer2 = this.headEventTimerPerGroup.get(Integer.valueOf(partition));
        if (internalTimer.equals(internalTimer2)) {
            InternalTimer<K, N> internalTimer3 = null;
            Iterator tailIterator = this.timerState.tailIterator(partition, Row.of(new Object[]{(byte) 101}), Long.valueOf(internalTimer2.getTimestamp()));
            if (tailIterator.hasNext()) {
                internalTimer3 = getTimerFromPair((Pair) tailIterator.next());
            }
            this.headEventTimerPerGroup.put(Integer.valueOf(partition), internalTimer3);
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback
    public void onProcessingTime(long j) throws Exception {
        if (this.headProcessingTimer == null || this.headProcessingTimer.getTimestamp() > j) {
            return;
        }
        InternalTimer<K, N> internalTimer = null;
        ArrayList arrayList = new ArrayList();
        Iterator it = this.timerState.getPartitionGroups().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            InternalTimer<K, N> internalTimer2 = this.headProcessingTimerPerGroup.get(Integer.valueOf(intValue));
            if (internalTimer2 != null && internalTimer2.getTimestamp() <= j) {
                ArrayList arrayList2 = new ArrayList();
                InternalTimer<K, N> internalTimer3 = null;
                Iterator tailIterator = this.timerState.tailIterator(intValue, Row.of(new Object[]{(byte) 112}), Long.valueOf(internalTimer2.getTimestamp()));
                while (true) {
                    if (!tailIterator.hasNext()) {
                        break;
                    }
                    InternalTimer<K, N> timerFromPair = getTimerFromPair((Pair) tailIterator.next());
                    if (timerFromPair.getTimestamp() > this.processingTimeService.getCurrentProcessingTime()) {
                        internalTimer3 = timerFromPair;
                        break;
                    } else {
                        arrayList2.add(timerFromPair);
                        tailIterator.remove();
                    }
                }
                if (!arrayList2.isEmpty()) {
                    arrayList.add(arrayList2);
                }
                this.headProcessingTimerPerGroup.put(Integer.valueOf(intValue), internalTimer3);
                if (compareTimers(internalTimer3, internalTimer) < 0) {
                    internalTimer = internalTimer3;
                }
            } else if (compareTimers(internalTimer2, internalTimer) < 0) {
                internalTimer = internalTimer2;
            }
        }
        this.headProcessingTimer = null;
        this.nextProcessingTimeEvent = null;
        Iterator<InternalTimer<K, N>> it2 = new TriggerSortedTimers(arrayList).iterator();
        while (it2.hasNext()) {
            this.triggerTarget.onProcessingTime(it2.next());
        }
        if (internalTimer != null) {
            if (this.headProcessingTimer == null || this.headProcessingTimer.getTimestamp() > internalTimer.getTimestamp()) {
                this.headProcessingTimer = internalTimer;
                if (this.nextProcessingTimeEvent != null) {
                    this.nextProcessingTimeEvent.cancel(false);
                }
                this.nextProcessingTimeEvent = this.processingTimeService.registerTimer(this.headProcessingTimer.getTimestamp(), this);
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.EventTimeCallback
    public void onEventTime(long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.timerState.getPartitionGroups().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            InternalTimer<K, N> internalTimer = this.headEventTimerPerGroup.get(Integer.valueOf(intValue));
            if (internalTimer != null && internalTimer.getTimestamp() <= j) {
                ArrayList arrayList2 = new ArrayList();
                InternalTimer<K, N> internalTimer2 = null;
                Iterator tailIterator = this.timerState.tailIterator(intValue, Row.of(new Object[]{(byte) 101}), Long.valueOf(internalTimer.getTimestamp()));
                while (true) {
                    if (!tailIterator.hasNext()) {
                        break;
                    }
                    InternalTimer<K, N> timerFromPair = getTimerFromPair((Pair) tailIterator.next());
                    if (timerFromPair.getTimestamp() > this.eventTimeService.getCurrentWatermark()) {
                        internalTimer2 = timerFromPair;
                        break;
                    } else {
                        arrayList2.add(timerFromPair);
                        tailIterator.remove();
                    }
                }
                if (!arrayList2.isEmpty()) {
                    arrayList.add(arrayList2);
                }
                this.headEventTimerPerGroup.put(Integer.valueOf(intValue), internalTimer2);
            }
        }
        Iterator<InternalTimer<K, N>> it2 = new TriggerSortedTimers(arrayList).iterator();
        while (it2.hasNext()) {
            this.triggerTarget.onEventTime(it2.next());
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public int getNumProcessingTimeTimers() {
        return getNumTimers((byte) 112);
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public int getNumEventTimeTimers() {
        return getNumTimers((byte) 101);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int compareTimers(InternalTimer<K, N> internalTimer, InternalTimer<K, N> internalTimer2) {
        if (internalTimer == null) {
            return internalTimer2 == null ? 0 : 1;
        }
        if (internalTimer2 == null) {
            return -1;
        }
        return Long.compare(internalTimer.getTimestamp(), internalTimer2.getTimestamp());
    }

    private InternalTimer<K, N> getTimerFromPair(Pair<Row, Row> pair) {
        return new InternalTimer<>(((Long) ((Row) pair.getKey()).getField(1)).longValue(), ((Row) pair.getKey()).getField(KEY_FIELD_INDEX), ((Row) pair.getKey()).getField(NAMESPACE_FIELD_INDEX));
    }

    private int getNumTimers(byte b) {
        int i = 0;
        Iterator it = this.timerState.getPartitionGroups().iterator();
        while (it.hasNext()) {
            Iterator prefixIterator = this.timerState.prefixIterator(((Integer) it.next()).intValue(), Row.of(new Object[]{Byte.valueOf(b)}));
            while (prefixIterator.hasNext()) {
                prefixIterator.next();
                i++;
            }
        }
        return i;
    }
}
