/*
 * Decompiled with CFR 0.152.
 */
package io.parallec.core.actor;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import io.parallec.core.actor.message.ContinueToSendToBatchSenderAsstManager;
import io.parallec.core.actor.message.RequestToBatchSenderAsstManager;
import io.parallec.core.actor.message.ResponseCountToBatchSenderAsstManager;
import io.parallec.core.actor.message.type.OperationWorkerMsgType;
import io.parallec.core.config.ParallecGlobalConfig;
import io.parallec.core.util.PcDateUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;

public class AssistantExecutionManager
extends UntypedActor {
    private static Logger logger = LoggerFactory.getLogger(AssistantExecutionManager.class);
    private long asstManagerRetryIntervalMillis;
    protected int responseCount = 0;
    protected int requestTotalCount = 0;
    protected long startTime = System.currentTimeMillis();
    protected long endTime = -1L;
    protected ActorRef originalManager = null;
    protected List<ActorRef> workers = new ArrayList<ActorRef>();
    protected int maxConcurrencyAdjusted = ParallecGlobalConfig.concurrencyDefault;
    protected int processedWorkerCount = 0;
    protected String taskId = null;
    protected String taskIdTrim = null;

    public void sendMessageUntilStopCount(int stopCount) {
        for (int i = this.processedWorkerCount; i < this.workers.size(); ++i) {
            ActorRef worker = this.workers.get(i);
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                logger.error("sleep exception " + e + " details: ", (Throwable)e);
            }
            worker.tell((Object)OperationWorkerMsgType.PROCESS_REQUEST, this.originalManager);
            ++this.processedWorkerCount;
            if (this.processedWorkerCount > stopCount) {
                return;
            }
            logger.debug("REQ_SENT: {} / {} taskId {}", new Object[]{this.processedWorkerCount, this.requestTotalCount, this.taskIdTrim});
        }
    }

    public void waitAndRetry() {
        ContinueToSendToBatchSenderAsstManager continueToSendToBatchSenderAsstManager = new ContinueToSendToBatchSenderAsstManager(this.processedWorkerCount);
        logger.debug("NOW WAIT Another " + this.asstManagerRetryIntervalMillis + " MS. at " + PcDateUtils.getNowDateTimeStrStandard());
        this.getContext().system().scheduler().scheduleOnce(Duration.create((long)this.asstManagerRetryIntervalMillis, (TimeUnit)TimeUnit.MILLISECONDS), this.getSelf(), (Object)continueToSendToBatchSenderAsstManager, (ExecutionContext)this.getContext().system().dispatcher(), this.getSelf());
    }

    public void onReceive(Object message) {
        if (message instanceof RequestToBatchSenderAsstManager) {
            RequestToBatchSenderAsstManager request = (RequestToBatchSenderAsstManager)message;
            this.originalManager = this.getSender();
            this.taskId = request.getTaskId();
            this.asstManagerRetryIntervalMillis = request.getAsstManagerRetryIntervalMillis();
            this.taskIdTrim = this.taskId.length() <= 12 ? this.taskId : this.taskId.substring(this.taskId.length() - 12, this.taskId.length());
            this.workers = request.getWorkers();
            this.maxConcurrencyAdjusted = request.getMaxConcurrency();
            this.requestTotalCount = this.workers.size();
            this.sendMessageUntilStopCount(this.maxConcurrencyAdjusted);
            if (this.processedWorkerCount < this.requestTotalCount) {
                this.waitAndRetry();
                return;
            }
            logger.info("Now finished sending all needed messages. Done job of ASST Manager at " + PcDateUtils.getNowDateTimeStrStandard());
            return;
        }
        if (message instanceof ContinueToSendToBatchSenderAsstManager) {
            int notProcessedNodeCount = this.requestTotalCount - this.processedWorkerCount;
            if (notProcessedNodeCount <= 0) {
                logger.info("!Finished sending all msg in ASST MANAGER at " + PcDateUtils.getNowDateTimeStrStandard() + " STOP doing wait and retry.");
                return;
            }
            int extraSendCount = this.maxConcurrencyAdjusted - (this.processedWorkerCount - this.responseCount);
            if (extraSendCount > 0) {
                logger.info("HAVE ROOM to send extra of : " + extraSendCount + " MSG. now Send at " + PcDateUtils.getNowDateTimeStrStandard());
                this.sendMessageUntilStopCount(this.processedWorkerCount + extraSendCount);
                this.waitAndRetry();
            } else {
                logger.info("NO ROOM to send extra. Windowns is full. extraSendCount is negative: " + extraSendCount + " reschedule now at " + PcDateUtils.getNowDateTimeStrStandard());
                this.waitAndRetry();
            }
        } else if (message instanceof ResponseCountToBatchSenderAsstManager) {
            this.responseCount = ((ResponseCountToBatchSenderAsstManager)message).getResponseCount();
            logger.debug("RECV IN batchSenderAsstManager FROM ExecutionManager responseCount: " + this.responseCount);
        }
    }
}

