package com.ibm.narpc;

import com.ibm.narpc.NaRPCMessage;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;

/* loaded from: input_file:com/ibm/narpc/NaRPCDispatcher.class */
public class NaRPCDispatcher<R extends NaRPCMessage, T extends NaRPCMessage> implements Runnable {
    private static Logger LOG = NaRPCUtils.getLogger();
    private NaRPCGroup group;
    private LinkedBlockingQueue<NaRPCServerChannel> incomingChannels;
    private NaRPCService<R, T> service;
    private Selector selector;
    private R request;
    private int id;
    private boolean isAlive;

    public NaRPCDispatcher() {
        this.isAlive = true;
    }

    public NaRPCDispatcher(NaRPCGroup naRPCGroup, NaRPCService<R, T> naRPCService, int i) throws IOException {
        this.group = naRPCGroup;
        this.service = naRPCService;
        this.id = i;
        this.selector = Selector.open();
        this.incomingChannels = new LinkedBlockingQueue<>();
        this.request = naRPCService.createRequest();
        this.isAlive = true;
    }

    public void addChannel(NaRPCServerChannel naRPCServerChannel) throws IOException {
        this.service.addEndpoint(naRPCServerChannel);
        this.incomingChannels.add(naRPCServerChannel);
        this.selector.wakeup();
    }

    public void close() {
        this.isAlive = false;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.isAlive) {
            try {
                if (this.selector.select(1000L) > 0) {
                    Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey next = it.next();
                        if (next.isValid()) {
                            if (next.isReadable()) {
                                NaRPCServerChannel naRPCServerChannel = (NaRPCServerChannel) next.attachment();
                                long fetch = naRPCServerChannel.fetch(this.request);
                                if (fetch > 0) {
                                    naRPCServerChannel.transmit(fetch, this.service.processRequest(this.request));
                                } else {
                                    if (fetch >= 0) {
                                        throw new Exception("ticket number invalid");
                                    }
                                    LOG.info("closing channel " + naRPCServerChannel.address());
                                    this.service.removeEndpoint(naRPCServerChannel);
                                    next.cancel();
                                    naRPCServerChannel.close();
                                }
                            }
                            it.remove();
                        }
                    }
                }
                processIncomingChannels();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        LOG.info("closing the select call");
    }

    public void processIncomingChannels() throws IOException {
        NaRPCServerChannel poll = this.incomingChannels.poll();
        while (true) {
            NaRPCServerChannel naRPCServerChannel = poll;
            if (naRPCServerChannel == null) {
                return;
            }
            SocketChannel socketChannel = naRPCServerChannel.getSocketChannel();
            socketChannel.configureBlocking(false);
            socketChannel.socket().setTcpNoDelay(this.group.isNodelay());
            socketChannel.socket().setReuseAddress(true);
            socketChannel.register(this.selector, 1, naRPCServerChannel);
            LOG.info("adding new channel to selector, from " + socketChannel.getRemoteAddress());
            poll = this.incomingChannels.poll();
        }
    }
}
