/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.connector.source.FileStoreSourceSplit;
import org.apache.flink.table.store.connector.source.FileStoreSourceSplitGenerator;
import org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint;
import org.apache.flink.table.store.table.source.SnapshotEnumerator;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContinuousFileSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
    private final SplitEnumeratorContext<FileStoreSourceSplit> context;
    private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
    private final long discoveryInterval;
    private final Set<Integer> readersAwaitingSplit;
    private final FileStoreSourceSplitGenerator splitGenerator;
    private final SnapshotEnumerator snapshotEnumerator;
    private Long currentSnapshotId;

    public ContinuousFileSplitEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> context, Path location, TableScan scan, Collection<FileStoreSourceSplit> remainSplits, long currentSnapshotId, long discoveryInterval) {
        Preconditions.checkArgument((discoveryInterval > 0L ? 1 : 0) != 0);
        this.context = (SplitEnumeratorContext)Preconditions.checkNotNull(context);
        this.bucketSplits = new HashMap<Integer, Queue<FileStoreSourceSplit>>();
        this.addSplits(remainSplits);
        this.currentSnapshotId = currentSnapshotId;
        this.discoveryInterval = discoveryInterval;
        this.readersAwaitingSplit = new HashSet<Integer>();
        this.splitGenerator = new FileStoreSourceSplitGenerator();
        this.snapshotEnumerator = new SnapshotEnumerator(location, scan, currentSnapshotId);
    }

    private void addSplits(Collection<FileStoreSourceSplit> splits) {
        splits.forEach(this::addSplit);
    }

    private void addSplit(FileStoreSourceSplit split) {
        this.bucketSplits.computeIfAbsent(split.split().bucket(), i -> new LinkedList()).add(split);
    }

    public void start() {
        this.context.callAsync((Callable)this.snapshotEnumerator, this::processDiscoveredSplits, this.discoveryInterval, this.discoveryInterval);
    }

    public void close() throws IOException {
    }

    public void addReader(int subtaskId) {
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        this.readersAwaitingSplit.add(subtaskId);
        this.assignSplits();
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        LOG.error("Received unrecognized event: {}", (Object)sourceEvent);
    }

    public void addSplitsBack(List<FileStoreSourceSplit> splits, int subtaskId) {
        LOG.debug("File Source Enumerator adds splits back: {}", splits);
        this.addSplits(splits);
    }

    public PendingSplitsCheckpoint snapshotState(long checkpointId) {
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        this.bucketSplits.values().forEach(splits::addAll);
        PendingSplitsCheckpoint checkpoint = new PendingSplitsCheckpoint(splits, this.currentSnapshotId == null ? -1L : this.currentSnapshotId);
        LOG.debug("Source Checkpoint is {}", (Object)checkpoint);
        return checkpoint;
    }

    private void processDiscoveredSplits(@Nullable SnapshotEnumerator.EnumeratorResult result, Throwable error) {
        if (error != null) {
            LOG.error("Failed to enumerate files", error);
            return;
        }
        if (result == null) {
            return;
        }
        this.currentSnapshotId = result.snapshotId;
        this.addSplits(this.splitGenerator.createSplits(result.plan));
        this.assignSplits();
    }

    private void assignSplits() {
        this.bucketSplits.forEach((bucket, splits) -> {
            int task;
            if (splits.size() > 0 && this.readersAwaitingSplit.remove(task = bucket % this.context.currentParallelism())) {
                if (!this.context.registeredReaders().containsKey(task)) {
                    return;
                }
                this.context.assignSplit((SourceSplit)splits.poll(), task);
            }
        });
    }
}

