/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collection;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;

public class KTablePassThrough<KIn, VIn>
implements KTableProcessorSupplier<KIn, VIn, KIn, VIn> {
    private final Collection<KStreamAggProcessorSupplier> parents;
    private final String storeName;

    KTablePassThrough(Collection<KStreamAggProcessorSupplier> parents, String storeName) {
        this.parents = parents;
        this.storeName = storeName;
    }

    @Override
    public Processor<KIn, Change<VIn>, KIn, Change<VIn>> get() {
        return new KTablePassThroughProcessor();
    }

    @Override
    public boolean enableSendingOldValues(boolean forceMaterialization) {
        for (KStreamAggProcessorSupplier parent : this.parents) {
            parent.enableSendingOldValues();
        }
        return true;
    }

    @Override
    public KTableValueGetterSupplier<KIn, VIn> view() {
        return new KTableValueGetterSupplier<KIn, VIn>(){

            @Override
            public KTableValueGetter<KIn, VIn> get() {
                return new KTablePassThroughValueGetter();
            }

            @Override
            public String[] storeNames() {
                return new String[]{KTablePassThrough.this.storeName};
            }
        };
    }

    private class KTablePassThroughProcessor
    implements Processor<KIn, Change<VIn>, KIn, Change<VIn>> {
        private ProcessorContext<KIn, Change<VIn>> context;

        private KTablePassThroughProcessor() {
        }

        @Override
        public void init(ProcessorContext<KIn, Change<VIn>> context) {
            this.context = context;
        }

        @Override
        public void process(Record<KIn, Change<VIn>> record) {
            this.context.forward(record);
        }
    }

    private class KTablePassThroughValueGetter
    implements KTableValueGetter<KIn, VIn> {
        private KeyValueStoreWrapper<KIn, VIn> store;

        private KTablePassThroughValueGetter() {
        }

        @Override
        public void init(ProcessorContext<?, ?> context) {
            this.store = new KeyValueStoreWrapper(context, KTablePassThrough.this.storeName);
        }

        @Override
        public ValueAndTimestamp<VIn> get(KIn key) {
            return this.store.get(key);
        }

        @Override
        public ValueAndTimestamp<VIn> get(KIn key, long asOfTimestamp) {
            return this.store.get(key, asOfTimestamp);
        }

        @Override
        public boolean isVersioned() {
            return this.store.isVersionedStore();
        }
    }
}

