/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.publisher;

import java.io.Closeable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.publisher.AgentState;
import org.apache.sling.distribution.journal.impl.publisher.PackageMessageFactory;
import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
import org.apache.sling.distribution.journal.impl.publisher.PublishMetrics;
import org.apache.sling.distribution.journal.impl.publisher.PublisherConfiguration;
import org.apache.sling.distribution.journal.impl.publisher.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
import org.apache.sling.distribution.journal.shared.DistributionLogEventListener;
import org.apache.sling.distribution.journal.shared.Strings;
import org.apache.sling.distribution.journal.shared.Timed;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.osgi.service.condition.Condition;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;

@ParametersAreNonnullByDefault
@Component(immediate=true, configurationPid={"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"})
@Designate(ocd=PublisherConfiguration.class, factory=true)
public class DistributionPublisher
implements DistributionAgent {
    public static final String FACTORY_PID = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
    @Nonnull
    private final DefaultDistributionLog distLog;
    private final DistributionPackageBuilder packageBuilder;
    private final PackageMessageFactory factory;
    private final EventAdmin eventAdmin;
    private final PublishMetrics publishMetrics;
    private final PubQueueProvider pubQueueProvider;
    private final String pubAgentName;
    private final String pkgType;
    private final boolean limitEnabled;
    private final long queuedTimeout;
    private final int queueSizeLimit;
    private final int maxQueueSizeDelay;
    private final Consumer<PackageMessage> sender;
    private final DistributionLogEventListener distributionLogEventListener;

    @Activate
    public DistributionPublisher(@Reference MessagingProvider messagingProvider, @Reference(name="packageBuilder") DistributionPackageBuilder packageBuilder, @Reference DiscoveryService discoveryService, @Reference PackageMessageFactory factory, @Reference EventAdmin eventAdmin, @Reference Topics topics, @Reference MetricsService metricsService, @Reference PubQueueProvider pubQueueProvider, @Reference(target="(osgi.condition.id=toggle.FT_SLING-12218)", cardinality=ReferenceCardinality.OPTIONAL, policyOption=ReferencePolicyOption.GREEDY) Condition limitToggle, PublisherConfiguration config, BundleContext context) {
        this.pubAgentName = Strings.requireNotBlank(config.name());
        this.packageBuilder = packageBuilder;
        this.factory = Objects.requireNonNull(factory);
        this.eventAdmin = eventAdmin;
        Objects.requireNonNull(metricsService);
        this.publishMetrics = new PublishMetrics(metricsService, this.pubAgentName);
        this.pubQueueProvider = pubQueueProvider;
        this.publishMetrics.queueSize(() -> pubQueueProvider.getMaxQueueSize(this.pubAgentName));
        this.distLog = new DefaultDistributionLog(this.pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
        this.distributionLogEventListener = new DistributionLogEventListener(context, this.distLog, this.pubAgentName);
        this.limitEnabled = limitToggle != null;
        this.queuedTimeout = config.queuedTimeout();
        this.queueSizeLimit = config.queueSizeLimit();
        this.maxQueueSizeDelay = config.maxQueueSizeDelay();
        this.pkgType = packageBuilder.getType();
        this.sender = messagingProvider.createSender(topics.getPackageTopic());
        this.publishMetrics.subscriberCount(() -> discoveryService.getSubscriberCount(this.pubAgentName));
        this.distLog.info("Started Publisher agent={} with packageBuilder={}, limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}", this.pubAgentName, this.pkgType, this.limitEnabled, this.queuedTimeout, this.queueSizeLimit, this.maxQueueSizeDelay);
    }

    @Deactivate
    public void deactivate() {
        IOUtils.closeQuietly((Closeable)this.distributionLogEventListener);
        String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s", this.pubAgentName, this.pkgType, this.queuedTimeout);
        this.distLog.info(msg, new Object[0]);
    }

    @Nonnull
    public Iterable<String> getQueueNames() {
        return Collections.unmodifiableCollection(this.pubQueueProvider.getQueueNames(this.pubAgentName));
    }

    public DistributionQueue getQueue(String queueName) {
        try {
            DistributionQueue queue = this.pubQueueProvider.getQueue(this.pubAgentName, queueName);
            if (queue == null) {
                this.publishMetrics.getQueueAccessErrorCount().increment();
            }
            return queue;
        }
        catch (Exception e) {
            this.publishMetrics.getQueueAccessErrorCount().increment();
            throw e;
        }
    }

    @Nonnull
    public DistributionLog getLog() {
        return this.distLog;
    }

    @Nonnull
    public DistributionAgentState getState() {
        return AgentState.getState(this);
    }

    @Nonnull
    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) throws DistributionException {
        if (request.getRequestType() == DistributionRequestType.PULL) {
            String msg = "Request requestType=PULL not supported by this agent";
            this.distLog.info(msg, new Object[0]);
            return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
        }
        int queueSize = this.pubQueueProvider.getMaxQueueSize(this.pubAgentName);
        int sleepMs = this.getSleepTime(queueSize);
        this.sleep(sleepMs);
        PackageMessage pkg = this.buildPackage(resourceResolver, request);
        return this.send(pkg, queueSize, sleepMs);
    }

    int getSleepTime(int queueSize) {
        if (!this.limitEnabled || queueSize <= this.queueSizeLimit) {
            return 0;
        }
        if (queueSize >= this.queueSizeLimit * 2) {
            return this.maxQueueSizeDelay;
        }
        return (queueSize - this.queueSizeLimit) * this.maxQueueSizeDelay / this.queueSizeLimit;
    }

    private void sleep(long sleepMs) throws DistributionException {
        if (sleepMs <= 0L) {
            return;
        }
        try {
            Thread.sleep(sleepMs);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DistributionException("Interrupted");
        }
    }

    private PackageMessage buildPackage(ResourceResolver resourceResolver, DistributionRequest request) throws DistributionException {
        try {
            if (request.getRequestType() != DistributionRequestType.TEST && request.getPaths().length == 0) {
                throw new DistributionException("Empty paths are not allowed");
            }
            return Timed.timed(this.publishMetrics.getBuildPackageDuration(), () -> this.factory.create(this.packageBuilder, resourceResolver, this.pubAgentName, request));
        }
        catch (Exception e) {
            this.publishMetrics.getDroppedRequests().mark();
            String msg = String.format("Failed to create content package for requestType=%s, paths=%s. Error=%s", request.getRequestType(), Arrays.toString(request.getPaths()), e.getMessage());
            this.distLog.error(msg, e);
            throw new DistributionException(msg, (Throwable)e);
        }
    }

    @Nonnull
    private DistributionResponse send(PackageMessage pkg, int queueSize, int delayMS) throws DistributionException {
        try {
            long offset = Timed.timed(this.publishMetrics.getEnqueuePackageDuration(), () -> this.sendAndWait(pkg));
            this.publishMetrics.getExportedPackageSize().update(pkg.getPkgLength());
            this.publishMetrics.getAcceptedRequests().mark();
            String msg = String.format("Request accepted with distribution package %s at offset=%d, queueSize=%d, queueSizeDelay=%d", pkg, offset, queueSize, delayMS);
            this.distLog.info(msg, new Object[0]);
            return new SimpleDistributionResponse(DistributionRequestState.ACCEPTED, msg, () -> ((PackageMessage)pkg).getPkgId());
        }
        catch (Throwable e) {
            this.publishMetrics.getDroppedRequests().mark();
            String msg = String.format("Failed to append distribution package %s to the journal", pkg);
            this.distLog.error(msg, e);
            if (e instanceof Error) {
                throw (Error)e;
            }
            throw new DistributionException(msg, e);
        }
    }

    private long sendAndWait(PackageMessage pkg) {
        if (pkg.getReqType() == PackageMessage.ReqType.TEST) {
            this.sender.accept(pkg);
            return -1L;
        }
        PackageQueuedNotifier queuedNotifier = this.pubQueueProvider.getQueuedNotifier();
        try {
            CompletableFuture<Long> received = queuedNotifier.registerWait(pkg.getPkgId());
            Event createdEvent = DistributionEvent.eventPackageCreated(pkg, this.pubAgentName);
            this.eventAdmin.postEvent(createdEvent);
            this.sender.accept(pkg);
            return received.get(this.queuedTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            queuedNotifier.unRegisterWait(pkg.getPkgId());
            throw new RuntimeException(e);
        }
    }
}

