/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.notifications.cachelistener;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.Flowable;
import java.lang.invoke.MethodHandles;
import java.util.Queue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.notifications.cachelistener.BaseQueueingSegmentListener;
import org.infinispan.notifications.cachelistener.EventWrapper;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.notifications.impl.ListenerInvocation;
import org.infinispan.util.KeyValuePair;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

class DistributedQueueingSegmentListener<K, V>
extends BaseQueueingSegmentListener<K, V, CacheEntryEvent<K, V>> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final AtomicReferenceArray<Queue<KeyValuePair<CacheEntryEvent<K, V>, ListenerInvocation<Event<K, V>>>>> queues;
    protected final InternalEntryFactory entryFactory;

    public DistributedQueueingSegmentListener(InternalEntryFactory entryFactory, int numSegments, KeyPartitioner keyPartitioner) {
        super(numSegments, keyPartitioner);
        this.entryFactory = entryFactory;
        this.queues = new AtomicReferenceArray(numSegments);
        for (int i = 0; i < this.queues.length(); ++i) {
            this.queues.set(i, new ConcurrentLinkedQueue());
        }
    }

    @Override
    public boolean handleEvent(EventWrapper<K, V, CacheEntryEvent<K, V>> wrapped, ListenerInvocation<Event<K, V>> invocation) {
        if (this.completed.get()) {
            return false;
        }
        K key = wrapped.getKey();
        CacheEntryEvent<K, V> event = wrapped.getEvent();
        int segment = this.segmentFromEventWrapper(wrapped);
        InternalCacheEntry<K, V> cacheEntry = this.entryFactory.create(event.getKey(), event.getValue(), event.getMetadata());
        boolean enqueued = true;
        if (!this.addEvent(key, segment, cacheEntry.getValue() != null ? cacheEntry : REMOVED)) {
            Queue<KeyValuePair<CacheEntryEvent<K, V>, ListenerInvocation<Event<K, V>>>> queue = this.queues.get(segment);
            if (queue != null) {
                KeyValuePair<CacheEntryEvent<K, V>, ListenerInvocation<Event<K, V>>> eventPair = new KeyValuePair<CacheEntryEvent<K, V>, ListenerInvocation<Event<K, V>>>(event, invocation);
                queue.add(eventPair);
                if (this.queues.get(segment) == null && queue.remove(eventPair)) {
                    enqueued = false;
                }
            } else {
                enqueued = false;
            }
        }
        return enqueued;
    }

    @Override
    public CompletionStage<Void> transferComplete() {
        this.completed.set(true);
        for (int i = 0; i < this.notifiedKeys.length(); ++i) {
            assert (this.notifiedKeys.get(i) == null);
            assert (this.queues.get(i) == null);
        }
        return CompletableFutures.completedNull();
    }

    @Override
    Flowable<CacheEntry<K, V>> segmentComplete(int segment) {
        return super.segmentComplete(segment).concatWith((CompletableSource)Completable.defer(() -> Completable.fromCompletionStage(this.completeSegment(segment))));
    }

    private CompletionStage<Void> completeSegment(int segment) {
        Queue queue = this.queues.getAndSet(segment, null);
        AggregateCompletionStage aggregateCompletionStage = null;
        if (queue != null && !queue.isEmpty()) {
            aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            for (KeyValuePair event : queue) {
                aggregateCompletionStage.dependsOn(((ListenerInvocation)event.getValue()).invoke((Event)event.getKey()));
            }
        }
        return aggregateCompletionStage != null ? aggregateCompletionStage.freeze() : CompletableFutures.completedNull();
    }

    @Override
    protected Log getLog() {
        return log;
    }
}

