package org.eclipse.hono.application.client.kafka.impl;

import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.kafka.KafkaApplicationClient;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;

/* loaded from: input_file:BOOT-INF/lib/hono-client-application-kafka-1.12.3.jar:org/eclipse/hono/application/client/kafka/impl/KafkaApplicationClientImpl.class */
public class KafkaApplicationClientImpl extends KafkaBasedCommandSender implements KafkaApplicationClient {
    private final Vertx vertx;
    private final MessagingKafkaConsumerConfigProperties consumerConfig;
    private final List<MessageConsumer> consumersToCloseOnStop;
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;

    public KafkaApplicationClientImpl(Vertx vertx, MessagingKafkaConsumerConfigProperties messagingKafkaConsumerConfigProperties, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, MessagingKafkaProducerConfigProperties messagingKafkaProducerConfigProperties) {
        this(vertx, messagingKafkaConsumerConfigProperties, kafkaProducerFactory, messagingKafkaProducerConfigProperties, NoopTracerFactory.create());
    }

    public KafkaApplicationClientImpl(Vertx vertx, MessagingKafkaConsumerConfigProperties messagingKafkaConsumerConfigProperties, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, MessagingKafkaProducerConfigProperties messagingKafkaProducerConfigProperties, Tracer tracer) {
        super(vertx, messagingKafkaConsumerConfigProperties, kafkaProducerFactory, messagingKafkaProducerConfigProperties, tracer);
        this.consumersToCloseOnStop = new LinkedList();
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(messagingKafkaConsumerConfigProperties);
        if (!messagingKafkaConsumerConfigProperties.isConfigured() || !messagingKafkaProducerConfigProperties.isConfigured()) {
            throw new IllegalArgumentException("No Kafka configuration found!");
        }
        this.vertx = vertx;
        this.consumerConfig = messagingKafkaConsumerConfigProperties;
    }

    @Override // org.eclipse.hono.application.client.kafka.impl.KafkaBasedCommandSender, org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender, org.eclipse.hono.util.Lifecycle
    public Future<Void> stop() {
        List list = (List) this.consumersToCloseOnStop.stream().map((v0) -> {
            return v0.close();
        }).collect(Collectors.toList());
        list.add(super.stop());
        return CompositeFuture.join(list).mapEmpty();
    }

    @Override // org.eclipse.hono.application.client.kafka.KafkaApplicationClient, org.eclipse.hono.application.client.ApplicationClient
    public Future<MessageConsumer> createTelemetryConsumer(String str, Handler<DownstreamMessage<KafkaMessageContext>> handler, Handler<Throwable> handler2) {
        return createKafkaBasedDownstreamMessageConsumer(str, HonoTopic.Type.TELEMETRY, handler);
    }

    @Override // org.eclipse.hono.application.client.kafka.KafkaApplicationClient, org.eclipse.hono.application.client.ApplicationClient
    public Future<MessageConsumer> createEventConsumer(String str, Handler<DownstreamMessage<KafkaMessageContext>> handler, Handler<Throwable> handler2) {
        return createKafkaBasedDownstreamMessageConsumer(str, HonoTopic.Type.EVENT, handler);
    }

    @Override // org.eclipse.hono.application.client.ApplicationClient
    public Future<MessageConsumer> createCommandResponseConsumer(String str, String str2, Handler<DownstreamMessage<KafkaMessageContext>> handler, Handler<Throwable> handler2) {
        return createKafkaBasedDownstreamMessageConsumer(str, HonoTopic.Type.COMMAND_RESPONSE, handler);
    }

    void setKafkaConsumerFactory(Supplier<Consumer<String, Buffer>> supplier) {
        this.kafkaConsumerSupplier = (Supplier) Objects.requireNonNull(supplier);
    }

    private Future<MessageConsumer> createKafkaBasedDownstreamMessageConsumer(String str, HonoTopic.Type type, Handler<DownstreamMessage<KafkaMessageContext>> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(type);
        Objects.requireNonNull(handler);
        String honoTopic = new HonoTopic(type, str).toString();
        HonoKafkaConsumer honoKafkaConsumer = new HonoKafkaConsumer(this.vertx, (Set<String>) Set.of(honoTopic), (Handler<KafkaConsumerRecord<String, Buffer>>) kafkaConsumerRecord -> {
            handler.handle(new KafkaDownstreamMessage(kafkaConsumerRecord));
        }, this.consumerConfig.getConsumerConfig(type.toString()));
        honoKafkaConsumer.setPollTimeout(Duration.ofMillis(this.consumerConfig.getPollTimeout()));
        Optional ofNullable = Optional.ofNullable(this.kafkaConsumerSupplier);
        Objects.requireNonNull(honoKafkaConsumer);
        ofNullable.ifPresent(honoKafkaConsumer::setKafkaConsumerSupplier);
        Future<U> map = honoKafkaConsumer.start().map(r7 -> {
            return new MessageConsumer() { // from class: org.eclipse.hono.application.client.kafka.impl.KafkaApplicationClientImpl.1
                @Override // org.eclipse.hono.application.client.MessageConsumer
                public Future<Void> close() {
                    return honoKafkaConsumer.stop();
                }
            };
        });
        List<MessageConsumer> list = this.consumersToCloseOnStop;
        Objects.requireNonNull(list);
        return map.onSuccess2((v1) -> {
            r1.add(v1);
        });
    }
}
