/*
 * Decompiled with CFR 0.152.
 */
package org.apache.catalina.tribes.transport.nio;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.sql.Timestamp;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.io.BufferPool;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.ListenCallback;
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.transport.AbstractRxTask;
import org.apache.catalina.tribes.transport.Constants;
import org.apache.catalina.tribes.transport.nio.NioReceiver;
import org.apache.catalina.tribes.util.Logs;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

public class NioReplicationTask
extends AbstractRxTask {
    private static final Log log = LogFactory.getLog(NioReplicationTask.class);
    private ByteBuffer buffer = null;
    private SelectionKey key;
    private int rxBufSize;
    private NioReceiver receiver;

    public NioReplicationTask(ListenCallback callback, NioReceiver receiver) {
        super(callback);
        this.receiver = receiver;
    }

    @Override
    public synchronized void run() {
        if (this.buffer == null) {
            int size = this.getRxBufSize();
            if (this.key.channel() instanceof DatagramChannel) {
                size = 65535;
            }
            this.buffer = (this.getOptions() & 4) == 4 ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
        } else {
            this.buffer.clear();
        }
        if (this.key == null) {
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace("Servicing key:" + this.key);
        }
        try {
            ObjectReader reader = (ObjectReader)this.key.attachment();
            if (reader == null) {
                if (log.isTraceEnabled()) {
                    log.trace("No object reader, cancelling:" + this.key);
                }
                this.cancelKey(this.key);
            } else {
                if (log.isTraceEnabled()) {
                    log.trace("Draining channel:" + this.key);
                }
                this.drainChannel(this.key, reader);
            }
        }
        catch (Exception e) {
            if (!(e instanceof CancelledKeyException)) {
                if (e instanceof IOException) {
                    if (log.isDebugEnabled()) {
                        log.debug("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed[" + e.getMessage() + "].", e);
                    } else {
                        log.warn("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed[" + e.getMessage() + "].");
                    }
                } else if (log.isErrorEnabled()) {
                    log.error("Exception caught in TcpReplicationThread.drainChannel.", e);
                }
            }
            this.cancelKey(this.key);
        }
        this.key = null;
        this.getTaskPool().returnWorker(this);
    }

    public synchronized void serviceChannel(SelectionKey key) {
        ObjectReader reader;
        if (log.isTraceEnabled()) {
            log.trace("About to service key:" + key);
        }
        if ((reader = (ObjectReader)key.attachment()) != null) {
            reader.setLastAccess(System.currentTimeMillis());
        }
        this.key = key;
        key.interestOps(key.interestOps() & 0xFFFFFFFE);
        key.interestOps(key.interestOps() & 0xFFFFFFFB);
    }

    protected void drainChannel(SelectionKey key, ObjectReader reader) throws Exception {
        reader.setLastAccess(System.currentTimeMillis());
        reader.access();
        ReadableByteChannel channel = (ReadableByteChannel)((Object)key.channel());
        int count = -1;
        this.buffer.clear();
        SocketAddress saddr = null;
        if (channel instanceof SocketChannel) {
            while ((count = channel.read(this.buffer)) > 0) {
                this.buffer.flip();
                if (this.buffer.hasArray()) {
                    reader.append(this.buffer.array(), 0, count, false);
                } else {
                    reader.append(this.buffer, count, false);
                }
                this.buffer.clear();
                if (!reader.hasPackage()) {
                    continue;
                }
                break;
            }
        } else if (channel instanceof DatagramChannel) {
            DatagramChannel dchannel = (DatagramChannel)channel;
            saddr = dchannel.receive(this.buffer);
            this.buffer.flip();
            if (this.buffer.hasArray()) {
                reader.append(this.buffer.array(), 0, this.buffer.limit() - this.buffer.position(), false);
            } else {
                reader.append(this.buffer, this.buffer.limit() - this.buffer.position(), false);
            }
            this.buffer.clear();
            count = reader.hasPackage() ? 1 : -1;
        }
        int pkgcnt = reader.count();
        if (count < 0 && pkgcnt == 0) {
            this.remoteEof(key);
            return;
        }
        ChannelMessage[] msgs = pkgcnt == 0 ? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
        this.registerForRead(key, reader);
        int i = 0;
        while (i < msgs.length) {
            block24: {
                if (ChannelData.sendAckAsync(msgs[i].getOptions())) {
                    this.sendAck(key, (WritableByteChannel)((Object)channel), Constants.ACK_COMMAND, saddr);
                }
                try {
                    if (Logs.MESSAGES.isTraceEnabled()) {
                        try {
                            Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new Timestamp(System.currentTimeMillis()));
                        }
                        catch (Throwable throwable) {}
                    }
                    this.getCallback().messageDataReceived(msgs[i]);
                    if (ChannelData.sendAckSync(msgs[i].getOptions())) {
                        this.sendAck(key, (WritableByteChannel)((Object)channel), Constants.ACK_COMMAND, saddr);
                    }
                }
                catch (RemoteProcessException e) {
                    if (log.isDebugEnabled()) {
                        log.error("Processing of cluster message failed.", e);
                    }
                    if (ChannelData.sendAckSync(msgs[i].getOptions())) {
                        this.sendAck(key, (WritableByteChannel)((Object)channel), Constants.FAIL_ACK_COMMAND, saddr);
                    }
                }
                catch (Exception e) {
                    log.error("Processing of cluster message failed.", e);
                    if (!ChannelData.sendAckSync(msgs[i].getOptions())) break block24;
                    this.sendAck(key, (WritableByteChannel)((Object)channel), Constants.FAIL_ACK_COMMAND, saddr);
                }
            }
            if (this.getUseBufferPool()) {
                BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
                msgs[i].setMessage(null);
            }
            ++i;
        }
        if (count < 0) {
            this.remoteEof(key);
            return;
        }
    }

    private void remoteEof(SelectionKey key) {
        if (log.isDebugEnabled()) {
            log.debug("Channel closed on the remote end, disconnecting");
        }
        this.cancelKey(key);
    }

    protected void registerForRead(final SelectionKey key, ObjectReader reader) {
        if (log.isTraceEnabled()) {
            log.trace("Adding key for read event:" + key);
        }
        reader.finish();
        Runnable r = new Runnable(){

            @Override
            public void run() {
                try {
                    if (key.isValid()) {
                        key.selector().wakeup();
                        int resumeOps = key.interestOps() | 1;
                        key.interestOps(resumeOps);
                        if (log.isTraceEnabled()) {
                            log.trace("Registering key for read:" + key);
                        }
                    }
                }
                catch (CancelledKeyException cancelledKeyException) {
                    NioReceiver.cancelledKey(key);
                    if (log.isTraceEnabled()) {
                        log.trace("CKX Cancelling key:" + key);
                    }
                }
                catch (Exception x) {
                    log.error("Error registering key for read:" + key, x);
                }
            }
        };
        this.receiver.addEvent(r);
    }

    private void cancelKey(final SelectionKey key) {
        ObjectReader reader;
        if (log.isTraceEnabled()) {
            log.trace("Adding key for cancel event:" + key);
        }
        if ((reader = (ObjectReader)key.attachment()) != null) {
            reader.setCancelled(true);
            reader.finish();
        }
        Runnable cx = new Runnable(){

            @Override
            public void run() {
                if (log.isTraceEnabled()) {
                    log.trace("Cancelling key:" + key);
                }
                NioReceiver.cancelledKey(key);
            }
        };
        this.receiver.addEvent(cx);
    }

    /*
     * Unable to fully structure code
     */
    protected void sendAck(SelectionKey key, WritableByteChannel channel, byte[] command, SocketAddress udpaddr) {
        try {
            block5: {
                buf = ByteBuffer.wrap(command);
                total = 0;
                if (!(channel instanceof DatagramChannel)) ** GOTO lbl11
                dchannel = (DatagramChannel)channel;
                while (total < command.length) {
                    total += dchannel.send(buf, udpaddr);
                }
                break block5;
lbl-1000:
                // 1 sources

                {
                    total += channel.write(buf);
lbl11:
                    // 2 sources

                    ** while (total < command.length)
                }
            }
            if (NioReplicationTask.log.isTraceEnabled()) {
                NioReplicationTask.log.trace("ACK sent to " + (channel instanceof SocketChannel != false ? ((SocketChannel)channel).socket().getInetAddress() : ((DatagramChannel)channel).socket().getInetAddress()));
            }
        }
        catch (IOException x) {
            NioReplicationTask.log.warn("Unable to send ACK back through channel, channel disconnected?: " + x.getMessage());
        }
    }

    public void setRxBufSize(int rxBufSize) {
        this.rxBufSize = rxBufSize;
    }

    public int getRxBufSize() {
        return this.rxBufSize;
    }
}

