/*
 * Decompiled with CFR 0.152.
 */
package org.apache.crail.storage.rdma.client;

import com.ibm.disni.RdmaEndpoint;
import com.ibm.disni.RdmaEndpointGroup;
import com.ibm.disni.verbs.IbvCQ;
import com.ibm.disni.verbs.IbvMr;
import com.ibm.disni.verbs.IbvSendWR;
import com.ibm.disni.verbs.IbvSge;
import com.ibm.disni.verbs.IbvWC;
import com.ibm.disni.verbs.RdmaCmId;
import com.ibm.disni.verbs.SVCPollCq;
import com.ibm.disni.verbs.SVCPostSend;
import com.ibm.disni.verbs.SVCRegMr;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.crail.CrailBuffer;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.metadata.BlockInfo;
import org.apache.crail.storage.StorageEndpoint;
import org.apache.crail.storage.StorageFuture;
import org.apache.crail.storage.rdma.MrCache;
import org.apache.crail.storage.rdma.RdmaConstants;
import org.apache.crail.storage.rdma.client.RdmaPassiveFuture;
import org.apache.crail.storage.rdma.client.RdmaStoragePassiveGroup;
import org.apache.crail.utils.AtomicIntegerModulo;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;

public class RdmaStoragePassiveEndpoint
extends RdmaEndpoint
implements StorageEndpoint {
    private static final Logger LOG = CrailUtils.getLogger();
    private LinkedBlockingQueue<SVCPostSend> writeOps = new LinkedBlockingQueue();
    private LinkedBlockingQueue<SVCPostSend> readOps = new LinkedBlockingQueue();
    private AtomicIntegerModulo opcount = new AtomicIntegerModulo();
    private ReentrantLock lock = new ReentrantLock();
    private IbvWC[] wcList;
    private SVCPollCq poll;
    private Semaphore sendQueueAvailable = new Semaphore(RdmaConstants.STORAGE_RDMA_QUEUESIZE);
    private ConcurrentHashMap<Long, RdmaPassiveFuture> futureMap = new ConcurrentHashMap();
    private MrCache mrCache;
    private MrCache.DeviceMrCache deviceCache;

    public RdmaStoragePassiveEndpoint(RdmaStoragePassiveGroup group, RdmaCmId id, boolean serverSide) throws IOException {
        super((RdmaEndpointGroup)group, id, serverSide);
        this.mrCache = group.getMrCache();
        this.deviceCache = null;
    }

    protected synchronized void init() throws IOException {
        super.init();
        for (int i = 0; i < RdmaConstants.STORAGE_RDMA_QUEUESIZE; ++i) {
            SVCPostSend write = this.initWriteOp();
            this.writeOps.add(write);
            SVCPostSend read = this.initReadOp();
            this.readOps.add(read);
        }
        IbvCQ cq = this.getCqProvider().getCQ();
        this.wcList = new IbvWC[this.getCqProvider().getCqSize()];
        for (int i = 0; i < this.wcList.length; ++i) {
            this.wcList[i] = new IbvWC();
        }
        this.poll = cq.poll(this.wcList, this.wcList.length);
    }

    private SVCPostSend initWriteOp() throws IOException {
        LinkedList<IbvSendWR> wrList_send = new LinkedList<IbvSendWR>();
        IbvSendWR writeWR = new IbvSendWR();
        writeWR.setWr_id((long)this.opcount.getAndIncrement());
        writeWR.setOpcode(0);
        writeWR.setSend_flags(0);
        LinkedList<IbvSge> sgeListWrite = new LinkedList<IbvSge>();
        IbvSge sgeSendWrite = new IbvSge();
        sgeListWrite.add(sgeSendWrite);
        writeWR.setSg_list(sgeListWrite);
        wrList_send.add(writeWR);
        IbvSendWR readWR = new IbvSendWR();
        readWR.setWr_id((long)this.opcount.getAndIncrement());
        readWR.setOpcode(4);
        readWR.setSend_flags(IbvSendWR.IBV_SEND_SIGNALED);
        LinkedList<IbvSge> sgeListRead = new LinkedList<IbvSge>();
        IbvSge sgeSendRead = new IbvSge();
        sgeSendRead.setLength(1);
        sgeListRead.add(sgeSendRead);
        readWR.setSg_list(sgeListRead);
        wrList_send.add(readWR);
        SVCPostSend rdmaOp = this.postSend(wrList_send);
        return rdmaOp;
    }

    private SVCPostSend initReadOp() throws IOException {
        LinkedList<IbvSendWR> wrList_send = new LinkedList<IbvSendWR>();
        LinkedList<IbvSge> sgeList = new LinkedList<IbvSge>();
        IbvSge sgeSend = new IbvSge();
        IbvSendWR sendWR = new IbvSendWR();
        sgeList.add(sgeSend);
        sendWR.setSg_list(sgeList);
        wrList_send.add(sendWR);
        sendWR.setWr_id((long)this.opcount.getAndIncrement());
        sendWR.setOpcode(4);
        sendWR.setSend_flags(IbvSendWR.IBV_SEND_SIGNALED);
        SVCPostSend rdmaOp = this.postSend(wrList_send);
        return rdmaOp;
    }

    public StorageFuture write(CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset) throws IOException, InterruptedException {
        IbvMr localMr;
        if ((long)buffer.remaining() > CrailConstants.BLOCK_SIZE) {
            throw new IOException("write size too large " + buffer.remaining());
        }
        if (buffer.remaining() <= 0) {
            throw new IOException("write size too small, len " + buffer.remaining());
        }
        if (buffer.position() < 0) {
            throw new IOException("local offset too small " + buffer.position());
        }
        if (remoteOffset < 0L) {
            throw new IOException("remote offset too small " + remoteOffset);
        }
        if (remoteMr.getAddr() == 0L) {
            throw new IOException("remote addr is 0 " + remoteMr.getAddr());
        }
        if (remoteMr.getLkey() == 0) {
            throw new IOException("remote key is 0 " + remoteMr.getLkey());
        }
        if (this.deviceCache == null) {
            this.deviceCache = this.mrCache.getDeviceCache(this.getPd());
        }
        if ((localMr = this.deviceCache.get(buffer.getRegion())) == null) {
            localMr = ((SVCRegMr)((SVCRegMr)this.registerMemory(buffer.getRegion().getByteBuffer()).execute()).free()).getMr();
            this.deviceCache.put(localMr);
        }
        long bufferAddress = buffer.address();
        SVCPostSend writeOp = this.writeOps.take();
        SVCPostSend.SendWRMod sendWriteWR = writeOp.getWrMod(0);
        sendWriteWR.setWr_id((long)this.opcount.getAndIncrement());
        sendWriteWR.getRdmaMod().setRemote_addr(remoteMr.getAddr() + remoteOffset);
        sendWriteWR.getRdmaMod().setRkey(remoteMr.getLkey());
        SVCPostSend.SgeMod sgeSendWrite = writeOp.getWrMod(0).getSgeMod(0);
        sgeSendWrite.setAddr(bufferAddress + (long)buffer.position());
        sgeSendWrite.setLength(buffer.remaining());
        sgeSendWrite.setLkey(localMr.getLkey());
        SVCPostSend.SendWRMod sendReadWR = writeOp.getWrMod(1);
        sendReadWR.setWr_id((long)this.opcount.getAndIncrement());
        sendReadWR.getRdmaMod().setRemote_addr(remoteMr.getAddr() + remoteOffset);
        sendReadWR.getRdmaMod().setRkey(remoteMr.getLkey());
        SVCPostSend.SgeMod sgeSendRead = writeOp.getWrMod(1).getSgeMod(0);
        sgeSendRead.setAddr(bufferAddress + (long)buffer.position());
        sgeSendRead.setLkey(localMr.getLkey());
        while (!this.sendQueueAvailable.tryAcquire()) {
            this.pollOnce();
        }
        while (!this.sendQueueAvailable.tryAcquire()) {
            this.pollOnce();
        }
        RdmaPassiveFuture future = new RdmaPassiveFuture(this, sendReadWR.getWr_id(), sgeSendWrite.getLength(), true);
        this.futureMap.put(future.getWrid(), future);
        writeOp.execute();
        this.writeOps.add(writeOp);
        return future;
    }

    public StorageFuture read(CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset) throws IOException, InterruptedException {
        IbvMr localMr;
        if ((long)buffer.remaining() > CrailConstants.BLOCK_SIZE) {
            throw new IOException("read size too large");
        }
        if (buffer.remaining() <= 0) {
            throw new IOException("read size too small, len " + buffer.remaining());
        }
        if (buffer.position() < 0) {
            throw new IOException("local offset too small " + buffer.position());
        }
        if (remoteOffset < 0L) {
            throw new IOException("remote offset too small " + remoteOffset);
        }
        if (remoteMr.getAddr() == 0L) {
            throw new IOException("remote addr is 0 " + remoteMr.getAddr());
        }
        if (remoteMr.getLkey() == 0) {
            throw new IOException("remote key is 0 " + remoteMr.getLkey());
        }
        if (this.deviceCache == null) {
            this.deviceCache = this.mrCache.getDeviceCache(this.getPd());
        }
        if ((localMr = this.deviceCache.get(buffer.getRegion())) == null) {
            localMr = ((SVCRegMr)((SVCRegMr)this.registerMemory(buffer.getRegion().getByteBuffer()).execute()).free()).getMr();
            this.deviceCache.put(localMr);
        }
        long bufferAddress = buffer.address();
        SVCPostSend readOp = this.readOps.take();
        SVCPostSend.SendWRMod sendWR = readOp.getWrMod(0);
        sendWR.setWr_id((long)this.opcount.getAndIncrement());
        SVCPostSend.SgeMod sgeSend = sendWR.getSgeMod(0);
        sgeSend.setAddr(bufferAddress + (long)buffer.position());
        sgeSend.setLength(buffer.remaining());
        sgeSend.setLkey(localMr.getLkey());
        sendWR.getRdmaMod().setRemote_addr(remoteMr.getAddr() + remoteOffset);
        sendWR.getRdmaMod().setRkey(remoteMr.getLkey());
        while (!this.sendQueueAvailable.tryAcquire()) {
            this.pollOnce();
        }
        RdmaPassiveFuture future = new RdmaPassiveFuture(this, sendWR.getWr_id(), sgeSend.getLength(), false);
        this.futureMap.put(future.getWrid(), future);
        readOp.execute();
        this.readOps.add(readOp);
        return future;
    }

    public int pollOnce() throws IOException {
        int res = 0;
        if (!this.lock.tryLock()) {
            return res;
        }
        try {
            res = this._pollOnce();
        }
        finally {
            this.lock.unlock();
        }
        return res;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollUntil(AtomicInteger future, long timeout) throws IOException {
        boolean locked = false;
        do {
            locked = this.lock.tryLock();
        } while (future.get() <= 0 && !locked);
        try {
            if (future.get() == 0) {
                this._pollUntil(future, timeout);
            }
        }
        finally {
            if (locked) {
                this.lock.unlock();
            }
        }
    }

    private int _pollOnce() throws IOException {
        int res = ((SVCPollCq)this.poll.execute()).getPolls();
        if (res > 0) {
            for (int i = 0; i < res; ++i) {
                IbvWC wc = this.wcList[i];
                this.dispatchCqEvent(wc);
            }
        }
        return res;
    }

    private int _pollUntil(AtomicInteger future, long timeout) throws IOException {
        long count = 0L;
        long checkTimeOut = 16384L;
        long startTime = System.nanoTime();
        while (future.get() == 0) {
            int res = ((SVCPollCq)this.poll.execute()).getPolls();
            if (res > 0) {
                for (int i = 0; i < res; ++i) {
                    IbvWC wc = this.wcList[i];
                    this.dispatchCqEvent(wc);
                }
            }
            if (count == 16384L) {
                count = 0L;
                if ((double)(System.nanoTime() - startTime) / 1000000.0 > (double)timeout) break;
            }
            ++count;
        }
        return 1;
    }

    private void dispatchCqEvent(IbvWC wc) throws IOException {
        if (wc.getStatus() != 5) {
            if (wc.getStatus() != 0) {
                LOG.info("faulty request, status " + wc.getStatus());
            } else {
                RdmaPassiveFuture future = this.futureMap.remove(wc.getWr_id());
                if (future != null) {
                    future.signal(wc.getStatus());
                    if (future.isWrite()) {
                        this.sendQueueAvailable.release(2);
                    } else {
                        this.sendQueueAvailable.release();
                    }
                } else {
                    throw new IOException("cannot find future object for wrid " + wc.getWr_id() + ", status " + wc.getStatus() + ", opcount " + this.opcount + ", ep " + this.getEndpointId() + ", wc.qpnum " + wc.getQp_num() + ", this.qp.num " + this.qp.getQp_num() + ", connstate " + this.getConnState() + ", futureMap.size " + this.futureMap.size());
                }
            }
        }
    }

    public void close() throws IOException, InterruptedException {
        this.lock.lock();
        try {
            while (this.pollOnce() > 0) {
            }
        }
        finally {
            this.lock.unlock();
        }
        super.close();
    }

    public int getFreeSlots() {
        return this.sendQueueAvailable.availablePermits();
    }

    public String getAddress() throws IOException {
        return super.getDstAddr().toString();
    }

    public RdmaCmId getContext() {
        return super.getIdPriv();
    }

    public boolean isLocal() {
        return false;
    }
}

