/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.iohub.sdk.client;

import com.ovopark.iohub.sdk.client.ClientNode;
import com.ovopark.iohub.sdk.client.ClientNodeRegister;
import com.ovopark.iohub.sdk.client.HttpWriter;
import com.ovopark.iohub.sdk.client.IOHubClientConfig;
import com.ovopark.iohub.sdk.client.InMemoryOutStore;
import com.ovopark.iohub.sdk.client.JobClient2ControlTransport;
import com.ovopark.iohub.sdk.client.JobClient2WorkRestClient;
import com.ovopark.iohub.sdk.client.JobClient2WorkTransport;
import com.ovopark.iohub.sdk.client.JobClientActive;
import com.ovopark.iohub.sdk.client.JobEndpoint;
import com.ovopark.iohub.sdk.client.JobLog;
import com.ovopark.iohub.sdk.client.JobOutTaskFlow;
import com.ovopark.iohub.sdk.client.JobOutTaskFlowProvider;
import com.ovopark.iohub.sdk.client.JobTaskExecutor;
import com.ovopark.iohub.sdk.client.NFSOutStore;
import com.ovopark.iohub.sdk.client.OutStore;
import com.ovopark.iohub.sdk.client.RequestParamBody;
import com.ovopark.iohub.sdk.model.AppNode;
import com.ovopark.iohub.sdk.model.ClientNodeTaskRegisterRequest;
import com.ovopark.iohub.sdk.model.ClientNodeTaskRegisterResponse;
import com.ovopark.iohub.sdk.model.CreateExportRecordRequest;
import com.ovopark.iohub.sdk.model.CreateExportRecordResponse;
import com.ovopark.iohub.sdk.model.JobMeta;
import com.ovopark.iohub.sdk.model.PreFlowErrorRequest;
import com.ovopark.iohub.sdk.model.PushStartRequest;
import com.ovopark.iohub.sdk.model.PushStartResponse;
import com.ovopark.iohub.sdk.model.proto.NoPrivilegeException;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Util;
import com.ovopark.module.shared.Session;
import com.ovopark.module.shared.spring.rbac.SessionImpl;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

@JobClientActive
@Component
public class SimpleJobTaskExecutor
implements JobTaskExecutor {
    private static final Logger log = LoggerFactory.getLogger(SimpleJobTaskExecutor.class);
    @Autowired
    private IOHubClientConfig ioHubClientConfig;
    @Autowired
    private JobOutTaskFlowProvider jobOutTaskFlowProvider;
    @Autowired
    private ClientNodeRegister.ClientNodeProvider clientNodeProvider;
    @Autowired
    private JobClient2ControlTransport jobClient2ControlTransport;

    @Override
    public void execute(final long taskId, String uri, String args, String session, boolean retry, Integer exportTaskId, JobMeta jobMeta, final AppNode worker, final JobLog jobLog, JobEndpoint.JobTaskManager jobTaskManager) {
        final ClientNode clientNode = this.clientNodeProvider.clientNode();
        SessionImpl si = Util.isEmpty((CharSequence)session) ? null : (SessionImpl)JSONAccessor.impl().read(session, SessionImpl.class);
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        factory.setReadTimeout(45000);
        factory.setConnectTimeout(15000);
        RestTemplate restTemplate = new RestTemplate((ClientHttpRequestFactory)factory);
        final JobClient2WorkRestClient jobClient2WorkTransport = new JobClient2WorkRestClient(worker, restTemplate);
        PushStartRequest pushStartRequest = new PushStartRequest();
        pushStartRequest.setTaskId(taskId);
        pushStartRequest.setApp(clientNode.app());
        pushStartRequest.setClientNode(clientNode.node());
        pushStartRequest.setWorkApp(worker.getApp());
        pushStartRequest.setWorkNode(worker.getNode());
        pushStartRequest.setExportTaskId(exportTaskId);
        log.info("to request resource : ");
        jobLog.log("to request resource : ");
        PushStartResponse pushStartResponse = jobClient2WorkTransport.start(pushStartRequest);
        if (pushStartResponse == null || !pushStartResponse.isSuccess()) {
            jobLog.log("cannot get enough resource , break the export task.");
            log.info("cannot get enough resource , break the export task.");
            PreFlowErrorRequest preFlowErrorRequest = new PreFlowErrorRequest();
            preFlowErrorRequest.setNode(ClientNode.UUID_STR);
            preFlowErrorRequest.setTaskId(taskId);
            preFlowErrorRequest.setUserId(si == null ? -1 : si.getUserId());
            preFlowErrorRequest.setError("resource limit, try later");
            try {
                log.info("resource limit , send wbs ");
                this.jobClient2ControlTransport.preFlowError(preFlowErrorRequest);
                log.info("resource limit , send wbs  , OK  ");
            }
            catch (Exception ex) {
                log.error(ex.getMessage(), (Throwable)ex);
            }
            return;
        }
        log.info("get request resource : ");
        jobLog.log("get request resource : ");
        final JobOutTaskFlow<RequestParamBody> jobOutTaskFlow = this.jobOutTaskFlowProvider.find(uri);
        try {
            jobOutTaskFlow.prepared();
            log.info("prepare bean : ");
            jobLog.log("prepare bean : ");
            RequestParamBody requestParamBody = null;
            if (retry) {
                requestParamBody = (RequestParamBody)JSONAccessor.impl().read(args, jobOutTaskFlow.paramType());
            } else {
                try {
                    requestParamBody = jobOutTaskFlow.requestParamBody(args, (Session)si);
                }
                catch (NoPrivilegeException e) {
                    PreFlowErrorRequest preFlowErrorRequest = new PreFlowErrorRequest();
                    preFlowErrorRequest.setNode(ClientNode.UUID_STR);
                    preFlowErrorRequest.setTaskId(taskId);
                    preFlowErrorRequest.setUserId(si == null ? -1 : si.getUserId());
                    preFlowErrorRequest.setError("no privilege");
                    try {
                        log.info("no privilege , send wbs ");
                        this.jobClient2ControlTransport.preFlowError(preFlowErrorRequest);
                        log.info("no privilege , send wbs  , OK  ");
                    }
                    catch (Exception ex) {
                        log.error(ex.getMessage(), (Throwable)ex);
                    }
                    jobOutTaskFlow.close();
                    jobLog.close();
                    return;
                }
                CreateExportRecordRequest createExportRecordRequest = new CreateExportRecordRequest();
                createExportRecordRequest.setRequestParamBody((Object)requestParamBody);
                createExportRecordRequest.setTaskId(taskId);
                createExportRecordRequest.setApp(clientNode.app());
                createExportRecordRequest.setNode(clientNode.node());
                createExportRecordRequest.setWorkApp(worker.getApp());
                createExportRecordRequest.setWorkNode(worker.getNode());
                createExportRecordRequest.setUserId(si == null ? null : si.getUserId());
                createExportRecordRequest.setGroupId(si == null ? null : si.getGroupId());
                createExportRecordRequest.setJobMeta(jobMeta);
                CreateExportRecordResponse exportRecord = jobClient2WorkTransport.createExportRecord(createExportRecordRequest);
                jobLog.log("createExportRecord: " + JSONAccessor.impl().format((Object)exportRecord));
                if (exportRecord == null || !exportRecord.isSuccess() || exportRecord.getExportTaskId() == null) {
                    jobLog.log("cannot get exportRecord id , break the export task.");
                    log.info("cannot get exportRecord id , break the export task.");
                    PreFlowErrorRequest preFlowErrorRequest = new PreFlowErrorRequest();
                    preFlowErrorRequest.setNode(ClientNode.UUID_STR);
                    preFlowErrorRequest.setTaskId(taskId);
                    preFlowErrorRequest.setUserId(si == null ? -1 : si.getUserId());
                    preFlowErrorRequest.setError("server error");
                    try {
                        log.info("server error , cannot create export record , send wbs ");
                        this.jobClient2ControlTransport.preFlowError(preFlowErrorRequest);
                        log.info("server error , cannot create export record , send wbs  , OK  ");
                    }
                    catch (Exception ex) {
                        log.error(ex.getMessage(), (Throwable)ex);
                    }
                    return;
                }
                log.info("created export record  , export record id: " + exportRecord.getExportTaskId());
                jobLog.log("created export record , export record id: " + exportRecord.getExportTaskId());
            }
            final OutStore outStore = jobMeta.isNfs() ? new NFSOutStore(this.ioHubClientConfig.nfsPath() + "/client-" + ClientNode.UUID_STR, 1) : new InMemoryOutStore(Math.min(jobMeta.getSegmentCount(), 10));
            log.info("create out store , nfs ? : " + jobMeta.isNfs());
            jobLog.log("create out store , nfs ? : " + jobMeta.isNfs());
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, Util.newThreadFactory((String)"tmp"), new ThreadPoolExecutor.CallerRunsPolicy());
            scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true);
            scheduledThreadPoolExecutor.setKeepAliveTime(60L, TimeUnit.SECONDS);
            ClientNodeTaskRegisterRequest clientNodeTaskRegisterRequest = new ClientNodeTaskRegisterRequest();
            clientNodeTaskRegisterRequest.setTaskId(taskId);
            clientNodeTaskRegisterRequest.setApp(clientNode.app());
            clientNodeTaskRegisterRequest.setNode(clientNode.node());
            clientNodeTaskRegisterRequest.setWorkApp(worker.getApp());
            clientNodeTaskRegisterRequest.setWorkNode(worker.getNode());
            final RequestParamBody finalRequestParamBody = requestParamBody;
            this.doTask(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    log.info("to get all data ");
                    jobLog.log("to get all data ");
                    jobLog.flush();
                    try {
                        jobOutTaskFlow.execute(finalRequestParamBody, outStore);
                        outStore.commit();
                    }
                    catch (Exception e) {
                        throw Util.convert2RuntimeException((Throwable)e);
                    }
                    finally {
                        outStore.close();
                    }
                    log.info("got all data , then push meta to work");
                    jobLog.log("got all data , then push meta to work");
                    jobLog.flush();
                    HttpWriter httpWriter = new HttpWriter(jobClient2WorkTransport);
                    httpWriter.writeAndCommit(outStore.sdList(), outStore, taskId, clientNode, worker);
                    log.info("task completed, and all data pushed to worker, wait worker generate xlsx???");
                    jobLog.log("task completed, and all data pushed to worker, wait worker generate xlsx???");
                    return null;
                }
            }, jobClient2WorkTransport, clientNodeTaskRegisterRequest, scheduledThreadPoolExecutor, taskId, jobMeta, jobTaskManager);
        }
        catch (Exception e) {
            jobLog.log("error : " + e.getMessage());
            throw Util.convert2RuntimeException((Throwable)e);
        }
        finally {
            jobOutTaskFlow.close();
            jobLog.close();
        }
    }

    private <T> T doTask(Callable<T> callable, final JobClient2WorkTransport jobClient2WorkTransport, final ClientNodeTaskRegisterRequest clientNodeTaskRegisterRequest, ScheduledExecutorService taskHeartbeat, final long taskId, JobMeta jobMeta, final JobEndpoint.JobTaskManager jobTaskManager) {
        ScheduledFuture<?> scheduledFuture = taskHeartbeat.scheduleWithFixedDelay(Util.catchRunnable((Util.CatchRunnable)new Util.CatchRunnable(){
            int failCount = 0;

            public void run() throws Exception {
                ClientNodeTaskRegisterResponse clientNodeTaskRegisterResponse = jobClient2WorkTransport.heartbeat(clientNodeTaskRegisterRequest);
                log.info("heartbeat from work(" + clientNodeTaskRegisterRequest.getWorkApp() + ":" + clientNodeTaskRegisterRequest.getWorkNode() + ") : " + JSONAccessor.impl().format((Object)clientNodeTaskRegisterResponse));
                if (clientNodeTaskRegisterResponse == null || !clientNodeTaskRegisterResponse.isSuccess()) {
                    ++this.failCount;
                    if (this.failCount > 3) {
                        jobTaskManager.cancel(taskId);
                    }
                    return;
                }
                this.failCount = 0;
            }
        }), 0L, 5L, TimeUnit.SECONDS);
        try {
            T t = callable.call();
            return t;
        }
        catch (Exception e) {
            throw Util.convert2RuntimeException((Throwable)e);
        }
        finally {
            scheduledFuture.cancel(true);
            log.info("cancel heartbeat thread");
        }
    }
}

