/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.iohub.sdk.client.outstream;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.ovopark.iohub.sdk.client.Client2ControlTransport;
import com.ovopark.iohub.sdk.client.Client2WorkRestClient;
import com.ovopark.iohub.sdk.client.Client2WorkTransport;
import com.ovopark.iohub.sdk.client.ClientNode;
import com.ovopark.iohub.sdk.client.ClientNodeRegister;
import com.ovopark.iohub.sdk.client.IOHubClientConfig;
import com.ovopark.iohub.sdk.client.JobClientActive;
import com.ovopark.iohub.sdk.client.JobLogImpl;
import com.ovopark.iohub.sdk.client.outstream.HttpWriter;
import com.ovopark.iohub.sdk.client.outstream.JobOutTaskFlow;
import com.ovopark.iohub.sdk.client.outstream.JobOutTaskFlowProvider;
import com.ovopark.iohub.sdk.client.outstream.RequestParamBody;
import com.ovopark.iohub.sdk.model.AppNode;
import com.ovopark.iohub.sdk.model.ClientNodeTaskRegisterRequest;
import com.ovopark.iohub.sdk.model.ClientNodeTaskRegisterResponse;
import com.ovopark.iohub.sdk.model.CreateExportRecordRequest;
import com.ovopark.iohub.sdk.model.CreateExportRecordResponse;
import com.ovopark.iohub.sdk.model.JobHint;
import com.ovopark.iohub.sdk.model.JobMeta;
import com.ovopark.iohub.sdk.model.TaskCancelRequest;
import com.ovopark.iohub.sdk.model.TaskCancelResponse;
import com.ovopark.iohub.sdk.model.TaskLockRequest;
import com.ovopark.iohub.sdk.model.TaskLockResponse;
import com.ovopark.iohub.sdk.model.TaskModel;
import com.ovopark.iohub.sdk.model.TaskSubmitRequest;
import com.ovopark.iohub.sdk.model.TaskSubmitResponse;
import com.ovopark.iohub.sdk.model.TransientFunc;
import com.ovopark.iohub.sdk.model.outstream.ExportPushStartRequest;
import com.ovopark.iohub.sdk.model.outstream.ExportPushStartResponse;
import com.ovopark.iohub.sdk.model.proto.InMemoryOutStore;
import com.ovopark.iohub.sdk.model.proto.LimitLogger;
import com.ovopark.iohub.sdk.model.proto.NFSOutStore;
import com.ovopark.iohub.sdk.model.proto.NoPrivilegeException;
import com.ovopark.iohub.sdk.model.proto.OutStore;
import com.ovopark.iohub.sdk.model.proto.RowTransLogConfig;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Util;
import com.ovopark.module.shared.Session;
import com.ovopark.module.shared.spring.rbac.SessionImpl;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@JobClientActive
@RestController(value="com.ovopark.iohub.sdk.client.outstream.ExportEndpoint")
@RequestMapping(value={"/feign/iohub-job/processing"})
public class ExportEndpoint
implements InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(ExportEndpoint.class);
    @Autowired
    private Client2ControlTransport jobClient2ControlTransport;
    private static final ExecutorService jobExecutor = new ThreadPoolExecutor(Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 64), Integer.parseInt(System.getProperty("IOHUB_EXPORT_IO", "0"))), Math.max(Math.max(Runtime.getRuntime().availableProcessors() * 2, 1024), Integer.parseInt(System.getProperty("IOHUB_EXPORT_IO", "0"))), 600L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), Util.newThreadFactory((String)"iohub-export-io"), new ThreadPoolExecutor.AbortPolicy());
    private final Set<Long> executingJobTaskIdSet = ConcurrentHashMap.newKeySet();
    final Map<String, SubmitTask> submitTaskMap = new ConcurrentHashMap<String, SubmitTask>();
    final AtomicInteger receivedTaskCount = new AtomicInteger(0);
    @Autowired
    private IOHubClientConfig ioHubClientConfig;
    @Autowired
    private JobOutTaskFlowProvider jobOutTaskFlowProvider;
    @Autowired
    private ClientNodeRegister.ClientNodeProvider clientNodeProvider;
    private static JobTaskManager instance;
    private final SimpleJobTaskManager simpleJobTaskManager = new SimpleJobTaskManager();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @RequestMapping(value={"/submit"})
    @ResponseBody
    public TaskSubmitResponse submit(@RequestBody TaskSubmitRequest taskSubmitRequest) {
        String taskKey = "task_" + taskSubmitRequest.getTaskId();
        MDC.put((String)"traceId", (String)taskKey);
        MDC.put((String)"requestId", (String)taskKey);
        log.info("receive a new task, we can do it???: " + JSONAccessor.impl().format((Object)taskSubmitRequest));
        TaskSubmitResponse taskSubmitResponse = new TaskSubmitResponse();
        taskSubmitResponse.setSubmitted(false);
        final JobLogImpl client2ControlLog = new JobLogImpl(this.jobClient2ControlTransport, 300, taskKey);
        try {
            Util.lock((Comparable)((Object)taskKey), () -> {
                if (this.executingJobTaskIdSet.contains(taskSubmitRequest.getTaskId())) {
                    taskSubmitResponse.setSubmitted(false);
                    taskSubmitResponse.setDesc("Rejected,the previous task is executing");
                    throw new IllegalStateException("the previous task is executing, reject the task: " + JSONAccessor.impl().format((Object)taskSubmitRequest));
                }
                TaskLockRequest taskLockRequest = new TaskLockRequest();
                taskLockRequest.setTaskId(taskSubmitRequest.getTaskId());
                taskLockRequest.setNode(ClientNode.UUID_STR);
                TaskLockResponse taskLockResponse = this.jobClient2ControlTransport.lockClient(taskLockRequest);
                if (taskLockResponse == null || !taskLockResponse.isSuccess()) {
                    taskSubmitResponse.setSubmitted(false);
                    throw new IllegalStateException("cannot get lock: " + JSONAccessor.impl().format((Object)taskLockRequest));
                }
                this.receivedTaskCount.incrementAndGet();
                this.executingJobTaskIdSet.add(taskSubmitRequest.getTaskId());
                long taskId = taskSubmitRequest.getTaskId();
                String uri = taskSubmitRequest.getUri();
                String args = taskSubmitRequest.getArgs();
                String session = taskSubmitRequest.getSession();
                boolean retry = taskSubmitRequest.isRetry();
                Integer exportTaskId = taskSubmitRequest.getExportTaskId();
                JobMeta jobMeta = taskSubmitRequest.getJobMeta();
                AppNode worker = taskSubmitRequest.getWorker();
                JobHint jobHint = taskSubmitRequest.getJobHint();
                JobTaskManager jobTaskManager = new JobTaskManager(){

                    @Override
                    public void cancel(long taskId) {
                        SubmitTask submitTask = ExportEndpoint.this.submitTaskMap.get("task_" + taskId);
                        submitTask.future.cancel(true);
                        log.info("cancel the task");
                        client2ControlLog.log("cancel the task");
                    }
                };
                ClientNode clientNode = this.clientNodeProvider.clientNode();
                SessionImpl si = Util.isEmpty((CharSequence)session) ? null : (SessionImpl)JSONAccessor.impl().read(session, SessionImpl.class);
                SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
                factory.setReadTimeout(45000);
                factory.setConnectTimeout(15000);
                RestTemplate restTemplate = new RestTemplate((ClientHttpRequestFactory)factory);
                restTemplate.getMessageConverters().add(0, new StringHttpMessageConverter(StandardCharsets.UTF_8));
                Client2WorkRestClient client2WorkTransport = new Client2WorkRestClient(worker, restTemplate);
                ExportPushStartRequest exportPushStartRequest = new ExportPushStartRequest();
                exportPushStartRequest.setTaskId(taskId);
                exportPushStartRequest.setApp(clientNode.app());
                exportPushStartRequest.setClientNode(clientNode.node());
                exportPushStartRequest.setWorkApp(worker.getApp());
                exportPushStartRequest.setWorkNode(worker.getNode());
                exportPushStartRequest.setExportTaskId(exportTaskId);
                exportPushStartRequest.setJobHint(jobHint);
                exportPushStartRequest.setSession(session);
                exportPushStartRequest.setUserId(si == null ? null : si.getUserId());
                exportPushStartRequest.setUserName(si == null ? null : si.getUserName());
                exportPushStartRequest.setUserGroupId(si == null ? null : si.getGroupId());
                exportPushStartRequest.setArgs(args);
                log.info("to request resource : ");
                client2ControlLog.log("to request resource : ");
                ExportPushStartResponse exportPushStartResponse = client2WorkTransport.start(exportPushStartRequest);
                if (exportPushStartResponse == null || !exportPushStartResponse.isSuccess()) {
                    client2ControlLog.log("cannot get enough resource , break the export task.");
                    log.info("cannot get enough resource , break the export task.");
                    taskSubmitResponse.setSubmitted(false);
                    taskSubmitResponse.setDesc("resource limit");
                    throw new IllegalStateException("resource limit, reject the task: " + JSONAccessor.impl().format((Object)taskSubmitRequest));
                }
                log.info("get request resource : ");
                client2ControlLog.log("get request resource : ");
                JobOutTaskFlow<RequestParamBody> jobOutTaskFlow = this.jobOutTaskFlowProvider.find(uri);
                boolean taskSubmit = false;
                try {
                    InMemoryOutStore outStore;
                    jobOutTaskFlow.prepared();
                    log.info("prepare bean : ");
                    client2ControlLog.log("prepare bean : ");
                    RequestParamBody requestParamBody = null;
                    if (retry) {
                        requestParamBody = (RequestParamBody)JSONAccessor.impl().read(args, jobOutTaskFlow.paramType());
                    } else {
                        try {
                            requestParamBody = jobOutTaskFlow.requestParamBody(args, (Session)si);
                        }
                        catch (NoPrivilegeException e) {
                            taskSubmitResponse.setSubmitted(false);
                            taskSubmitResponse.setDesc("no privilege");
                            throw new IllegalStateException("no privilege, reject the task: " + JSONAccessor.impl().format((Object)taskSubmitRequest));
                        }
                        catch (Exception e) {
                            taskSubmitResponse.setSubmitted(false);
                            taskSubmitResponse.setDesc(e.getMessage());
                            log.error(e.getMessage(), (Throwable)e);
                            throw new IllegalStateException("error, reject the task: " + e.getMessage() + JSONAccessor.impl().format((Object)taskSubmitRequest));
                        }
                        JobOutTaskFlow.ExportTaskInfoProvider exportTaskInfoProvider = jobOutTaskFlow.exportTaskInfoProvider(requestParamBody, (Session)si);
                        CreateExportRecordRequest createExportRecordRequest = new CreateExportRecordRequest();
                        createExportRecordRequest.setRequestParamBody((Object)requestParamBody);
                        createExportRecordRequest.setExportTaskName(exportTaskInfoProvider == null ? null : exportTaskInfoProvider.name());
                        createExportRecordRequest.setTaskId(taskId);
                        createExportRecordRequest.setApp(clientNode.app());
                        createExportRecordRequest.setNode(clientNode.node());
                        createExportRecordRequest.setWorkApp(worker.getApp());
                        createExportRecordRequest.setWorkNode(worker.getNode());
                        createExportRecordRequest.setUserId(si == null ? null : si.getUserId());
                        createExportRecordRequest.setGroupId(si == null ? null : si.getGroupId());
                        createExportRecordRequest.setJobMeta(jobMeta);
                        CreateExportRecordResponse exportRecord = client2WorkTransport.createExportRecord(createExportRecordRequest);
                        client2ControlLog.log("createExportRecord: " + JSONAccessor.impl().format((Object)exportRecord));
                        if (exportRecord == null || !exportRecord.isSuccess() || exportRecord.getExportTaskId() == null) {
                            client2ControlLog.log("cannot get exportRecord id , break the export task.");
                            log.info("cannot get exportRecord id , break the export task.");
                            taskSubmitResponse.setSubmitted(false);
                            taskSubmitResponse.setDesc("cannot create exportRecord");
                            throw new IllegalStateException("cannot create exportRecord: " + JSONAccessor.impl().format((Object)taskSubmitRequest));
                        }
                        log.info("created export record  , export record id: " + exportRecord.getExportTaskId());
                        client2ControlLog.log("created export record , export record id: " + exportRecord.getExportTaskId());
                        taskSubmitResponse.setExportTaskId(exportRecord.getExportTaskId());
                    }
                    boolean nfs = jobMeta.isNfs();
                    if (jobHint != null && Util.isNotEmpty((CharSequence)jobHint.getOutStore())) {
                        boolean bl = nfs = "nfs".equalsIgnoreCase(jobHint.getOutStore()) || "nas".equalsIgnoreCase(jobHint.getOutStore());
                    }
                    if (nfs) {
                        int regionRowCount;
                        RowTransLogConfig rowTransLogConfig = new RowTransLogConfig();
                        int n = regionRowCount = jobMeta.getRegionRowCount() > 0 ? jobMeta.getRegionRowCount() : 1000;
                        if (jobHint != null && jobHint.getRegionRowCount() > 0) {
                            regionRowCount = jobHint.getRegionRowCount();
                        }
                        rowTransLogConfig.setRegionRowCount(regionRowCount);
                        outStore = new NFSOutStore(this.ioHubClientConfig.nfsPath() + "/client-" + ClientNode.UUID_STR, 1, rowTransLogConfig);
                    } else {
                        outStore = new InMemoryOutStore(Math.min(jobMeta.getSegmentCount(), 10));
                    }
                    log.info("create out store , nfs ? : " + nfs);
                    client2ControlLog.log("create out store , nfs ? : " + nfs);
                    ClientNodeTaskRegisterRequest clientNodeTaskRegisterRequest = new ClientNodeTaskRegisterRequest();
                    clientNodeTaskRegisterRequest.setTaskId(taskId);
                    clientNodeTaskRegisterRequest.setApp(clientNode.app());
                    clientNodeTaskRegisterRequest.setNode(clientNode.node());
                    clientNodeTaskRegisterRequest.setWorkApp(worker.getApp());
                    clientNodeTaskRegisterRequest.setWorkNode(worker.getNode());
                    clientNodeTaskRegisterRequest.setVersion(3);
                    clientNodeTaskRegisterRequest.setMinVersion(23);
                    RequestParamBody finalRequestParamBody = requestParamBody;
                    SubmitTask submitTask = new SubmitTask();
                    submitTask.setTaskModel(new TaskModel());
                    TaskModel taskModel = submitTask.getTaskModel();
                    taskModel.setExportClientStat(new TaskModel.ExportClientStat());
                    TaskModel.ExportClientStat exportClientStat = taskModel.getExportClientStat();
                    submitTask.setRuntimeStat(outStore.runtimeStat());
                    submitTask.setTaskId(taskSubmitRequest.getTaskId());
                    taskModel.setJobTaskId(taskSubmitRequest.getTaskId());
                    taskModel.setExportTaskId(taskSubmitRequest.getExportTaskId());
                    taskModel.setWorkNode(taskSubmitRequest.getWorker().getNode());
                    taskModel.setClientNode(ClientNode.UUID_STR);
                    taskModel.setExport(true);
                    taskModel.setUri(uri);
                    taskModel.setClientApp(clientNode.app());
                    taskModel.setSession(session);
                    taskModel.setUserId(si == null ? null : si.getUserId());
                    taskModel.setUserName(si == null ? null : si.getUserName());
                    taskModel.setUserGroupId(si == null ? null : si.getGroupId());
                    exportClientStat.setArgs(args);
                    exportClientStat.setConvertArgs(JSONAccessor.impl().format((Object)finalRequestParamBody));
                    try {
                        Future<?> future = jobExecutor.submit(Util.catchRunnable(() -> this.lambda$submit$2(taskKey, taskModel, (OutStore)outStore, exportClientStat, clientNodeTaskRegisterRequest, jobOutTaskFlow, finalRequestParamBody, submitTask, client2WorkTransport, taskId, clientNode, worker, jobTaskManager, taskLockRequest)));
                        taskSubmit = true;
                        submitTask.setFuture(future);
                        taskModel.setAcceptTime(System.currentTimeMillis());
                        taskModel.setAcceptTimeStr(Util.formatTime((LocalDateTime)LocalDateTime.now(), (String[])new String[0]));
                        this.submitTaskMap.put(taskKey, submitTask);
                        log.info("OK, the task is scheduled: " + taskKey);
                        taskSubmitResponse.setSubmitted(true);
                    }
                    catch (RejectedExecutionException e) {
                        this.executingJobTaskIdSet.remove(taskLockRequest.getTaskId());
                        taskSubmitResponse.setSubmitted(false);
                        taskSubmitResponse.setDesc("Rejected, resource is unavailable ");
                        throw Util.convert2RuntimeException((Throwable)e);
                    }
                }
                catch (Exception e) {
                    client2ControlLog.log("error : " + e.getMessage());
                    throw Util.convert2RuntimeException((Throwable)e);
                }
                finally {
                    if (!taskSubmit) {
                        jobOutTaskFlow.close();
                    }
                    client2ControlLog.close();
                }
                return null;
            }, (long)10L, (TimeUnit)TimeUnit.SECONDS);
            return taskSubmitResponse;
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
            return taskSubmitResponse;
        }
        finally {
            try {
                client2ControlLog.close();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
            finally {
                MDC.remove((String)"traceId");
                MDC.remove((String)"requestId");
            }
        }
    }

    @RequestMapping(value={"/cancel"})
    @ResponseBody
    public TaskCancelResponse cancel(@RequestBody TaskCancelRequest taskCancelRequest) {
        TaskCancelResponse taskCancelResponse = new TaskCancelResponse();
        taskCancelResponse.setApp(this.clientNodeProvider.clientNode().app());
        taskCancelResponse.setNode(ClientNode.UUID_STR);
        long taskId = taskCancelRequest.getTaskId();
        if (!this.executingJobTaskIdSet.contains(taskId)) {
            taskCancelResponse.setFound(false);
            taskCancelResponse.setSuccess(false);
            taskCancelResponse.setDesc("cannot find task");
            return taskCancelResponse;
        }
        SubmitTask submitTask = this.submitTaskMap.get("task_" + taskId);
        if (submitTask == null) {
            taskCancelResponse.setFound(false);
            taskCancelResponse.setSuccess(false);
            taskCancelResponse.setDesc("cannot find task");
            return taskCancelResponse;
        }
        submitTask.setKilled(true);
        submitTask.runtimeStat.cancel();
        taskCancelResponse.setFound(true);
        Future future = submitTask.future;
        if (future == null) {
            taskCancelResponse.setSuccess(false);
            taskCancelResponse.setDesc("cannot find io thread");
            return taskCancelResponse;
        }
        future.cancel(true);
        taskCancelResponse.setSuccess(future.isCancelled());
        taskCancelResponse.setDesc("done cancel");
        return taskCancelResponse;
    }

    public void afterPropertiesSet() throws Exception {
        instance = this.simpleJobTaskManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T doTask(Callable<T> callable, final Client2WorkTransport client2WorkTransport, final ClientNodeTaskRegisterRequest clientNodeTaskRegisterRequest, ScheduledExecutorService taskHeartbeat, final JobTaskManager jobTaskManager, final SubmitTask submitTask) {
        Runnable runnable = Util.catchRunnable((Util.CatchRunnable)new Util.CatchRunnable(){
            int failCount = 0;

            public void run() throws Exception {
                long taskId = submitTask.taskId;
                TaskModel taskModel = submitTask.taskModel;
                try {
                    TransientFunc transientFunc = taskModel.getTransientFunc();
                    transientFunc.run();
                }
                catch (Exception e) {
                    log.error(e.getMessage(), (Throwable)e);
                }
                TaskModel.ExportClientStat exportClientStat = taskModel.getExportClientStat();
                clientNodeTaskRegisterRequest.setExportClientStat(exportClientStat);
                ClientNodeTaskRegisterResponse clientNodeTaskRegisterResponse = client2WorkTransport.exportHeartbeat(clientNodeTaskRegisterRequest);
                log.info("heartbeat from work(" + clientNodeTaskRegisterRequest.getWorkApp() + ":" + clientNodeTaskRegisterRequest.getWorkNode() + ") : " + JSONAccessor.impl().format((Object)clientNodeTaskRegisterResponse));
                if (clientNodeTaskRegisterResponse == null || !clientNodeTaskRegisterResponse.isSuccess()) {
                    ++this.failCount;
                    if (this.failCount > 3) {
                        jobTaskManager.cancel(taskId);
                    }
                    return;
                }
                this.failCount = 0;
            }
        });
        ScheduledFuture<?> scheduledFuture = taskHeartbeat.scheduleWithFixedDelay(runnable, 0L, 5L, TimeUnit.SECONDS);
        try {
            T t = callable.call();
            return t;
        }
        catch (Exception e) {
            throw Util.convert2RuntimeException((Throwable)e);
        }
        finally {
            try {
                runnable.run();
            }
            finally {
                scheduledFuture.cancel(true);
                log.info("cancel heartbeat thread");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private /* synthetic */ void lambda$submit$2(String taskKey, TaskModel taskModel, OutStore outStore, TaskModel.ExportClientStat exportClientStat, ClientNodeTaskRegisterRequest clientNodeTaskRegisterRequest, JobOutTaskFlow jobOutTaskFlow, RequestParamBody finalRequestParamBody, SubmitTask submitTask, Client2WorkTransport client2WorkTransport, long taskId, ClientNode clientNode, AppNode worker, JobTaskManager jobTaskManager, TaskLockRequest taskLockRequest) throws Exception {
        final JobLogImpl jobLog2Control = new JobLogImpl(this.jobClient2ControlTransport, 300, taskKey);
        taskModel.setIoThreadName(Thread.currentThread().getName());
        taskModel.setTransientFunc(() -> {
            OutStore.RuntimeStat runtimeStat = outStore.runtimeStat();
            int sumRowCount = exportClientStat.getSumRowCount();
            int rowCount = runtimeStat.rowCount();
            exportClientStat.setRowCountAdded(rowCount - sumRowCount);
            exportClientStat.setSumRowCount(rowCount);
            String currentSegment = runtimeStat.currentSegment();
            exportClientStat.setCurrentSegment(currentSegment);
            exportClientStat.setSegmentCount(runtimeStat.segmentCount());
            long lastIoTimeMs = runtimeStat.lastIoTimeMs();
            exportClientStat.setLastIoTimeStr(Util.formatTime((LocalDateTime)Util.dateTime((long)lastIoTimeMs), (String[])new String[0]));
            long byteSize = runtimeStat.byteSize();
            exportClientStat.setByteSize(byteSize);
        });
        clientNodeTaskRegisterRequest.setIoThreadName(Thread.currentThread().getName());
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, Util.newThreadFactory((String)"tmp"), new ThreadPoolExecutor.CallerRunsPolicy());
        scheduledThreadPoolExecutor.setKeepAliveTime(60L, TimeUnit.SECONDS);
        scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true);
        try {
            this.doTask(() -> {
                log.info("to get all data ");
                jobLog2Control.log("to get all data ");
                jobLog2Control.flush();
                ((LimitLogger.LimitLoggerSetter)outStore).setLimitLogger(new LimitLogger(){
                    final AtomicLong limit = new AtomicLong();

                    public void log(String content) {
                        log.info(content);
                        if (this.limit.incrementAndGet() > 10000L) {
                            log.warn("limit logger,cannot push log to control, exceed max log count: 10000");
                            return;
                        }
                        jobLog2Control.log(content);
                    }
                });
                try {
                    jobOutTaskFlow.execute(finalRequestParamBody, outStore);
                    outStore.commit();
                }
                catch (Throwable e) {
                    log.error("job out task flow error: " + e.getMessage(), e);
                    jobLog2Control.log("job out task flow error: " + e.getMessage());
                    throw Util.convert2RuntimeException((Throwable)e);
                }
                finally {
                    outStore.close();
                }
                log.info("got all data , then push meta to work");
                jobLog2Control.log("got all data , then push meta to work");
                OutStore.Stat stat = outStore.stat();
                if (Util.isNotEmpty((Collection)stat.getSegmentStatList())) {
                    StringBuilder stringBuilder = new StringBuilder();
                    for (int i = 0; i < stat.getSegmentStatList().size(); ++i) {
                        OutStore.SegmentStat segmentStat = (OutStore.SegmentStat)stat.getSegmentStatList().get(i);
                        int rowCount = segmentStat.getRowCount();
                        stringBuilder.append(",s" + i + ": " + rowCount);
                    }
                    log.info(stringBuilder.toString());
                    jobLog2Control.log(stringBuilder.toString());
                }
                jobLog2Control.flush();
                if (submitTask.killed) {
                    throw new CancellationException("task is cancelled");
                }
                HttpWriter httpWriter = new HttpWriter(client2WorkTransport);
                httpWriter.writeAndCommit(outStore.sdList(), outStore, taskId, clientNode, worker);
                log.info("task completed, and all data pushed to worker, wait worker generate xlsx???");
                jobLog2Control.log("task completed, and all data pushed to worker, wait worker generate xlsx???");
                return null;
            }, client2WorkTransport, clientNodeTaskRegisterRequest, scheduledThreadPoolExecutor, jobTaskManager, submitTask);
        }
        catch (Throwable e) {
            log.error(e.getMessage(), e);
            jobLog2Control.log("error: " + e.getMessage());
        }
        finally {
            try {
                scheduledThreadPoolExecutor.shutdown();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
            this.submitTaskMap.remove(taskKey);
            this.executingJobTaskIdSet.remove(taskLockRequest.getTaskId());
            try {
                log.info(JSONAccessor.impl().format((Object)submitTask.getTaskModel()));
                jobLog2Control.log(JSONAccessor.impl().format((Object)submitTask.getTaskModel()));
                jobLog2Control.log(taskKey + " > completed: " + Util.formatTime((LocalDateTime)LocalDateTime.now(), (String[])new String[0]));
                jobLog2Control.flush();
                jobLog2Control.close();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
            try {
                jobOutTaskFlow.close();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    public static interface JobTaskManager {
        public void cancel(long var1);

        default public List<TaskModel> taskList() {
            throw new UnsupportedOperationException();
        }

        public static JobTaskManager getOrCreate() {
            return instance;
        }

        default public int receivedTaskCount() {
            return 0;
        }
    }

    private class SimpleJobTaskManager
    implements JobTaskManager {
        private SimpleJobTaskManager() {
        }

        @Override
        public void cancel(long taskId) {
            SubmitTask submitTask = ExportEndpoint.this.submitTaskMap.get("task_" + taskId);
            if (submitTask == null) {
                return;
            }
            if (submitTask.future == null) {
                return;
            }
            submitTask.future.cancel(true);
            log.info("cancel the task");
        }

        @Override
        public List<TaskModel> taskList() {
            ArrayList<TaskModel> taskModelList = new ArrayList<TaskModel>(ExportEndpoint.this.submitTaskMap.size());
            for (SubmitTask submitTask : ExportEndpoint.this.submitTaskMap.values()) {
                TaskModel taskModel = submitTask.getTaskModel();
                try {
                    TransientFunc transientFunc = taskModel.getTransientFunc();
                    transientFunc.run();
                }
                catch (Exception e) {
                    log.error(e.getMessage(), (Throwable)e);
                }
                taskModelList.add(taskModel);
            }
            return taskModelList;
        }

        @Override
        public int receivedTaskCount() {
            return ExportEndpoint.this.receivedTaskCount.get();
        }
    }

    public static class SubmitTask {
        private long taskId;
        private Future<?> future;
        private TaskModel taskModel;
        private boolean killed;
        @JsonIgnore
        private transient OutStore.RuntimeStat runtimeStat;

        public long getTaskId() {
            return this.taskId;
        }

        public Future<?> getFuture() {
            return this.future;
        }

        public TaskModel getTaskModel() {
            return this.taskModel;
        }

        public boolean isKilled() {
            return this.killed;
        }

        public OutStore.RuntimeStat getRuntimeStat() {
            return this.runtimeStat;
        }

        public void setTaskId(long taskId) {
            this.taskId = taskId;
        }

        public void setFuture(Future<?> future) {
            this.future = future;
        }

        public void setTaskModel(TaskModel taskModel) {
            this.taskModel = taskModel;
        }

        public void setKilled(boolean killed) {
            this.killed = killed;
        }

        @JsonIgnore
        public void setRuntimeStat(OutStore.RuntimeStat runtimeStat) {
            this.runtimeStat = runtimeStat;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof SubmitTask)) {
                return false;
            }
            SubmitTask other = (SubmitTask)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getTaskId() != other.getTaskId()) {
                return false;
            }
            if (this.isKilled() != other.isKilled()) {
                return false;
            }
            Future<?> this$future = this.getFuture();
            Future<?> other$future = other.getFuture();
            if (this$future == null ? other$future != null : !this$future.equals(other$future)) {
                return false;
            }
            TaskModel this$taskModel = this.getTaskModel();
            TaskModel other$taskModel = other.getTaskModel();
            return !(this$taskModel == null ? other$taskModel != null : !this$taskModel.equals(other$taskModel));
        }

        protected boolean canEqual(Object other) {
            return other instanceof SubmitTask;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $taskId = this.getTaskId();
            result = result * 59 + (int)($taskId >>> 32 ^ $taskId);
            result = result * 59 + (this.isKilled() ? 79 : 97);
            Future<?> $future = this.getFuture();
            result = result * 59 + ($future == null ? 43 : $future.hashCode());
            TaskModel $taskModel = this.getTaskModel();
            result = result * 59 + ($taskModel == null ? 43 : $taskModel.hashCode());
            return result;
        }

        public String toString() {
            return "ExportEndpoint.SubmitTask(taskId=" + this.getTaskId() + ", future=" + this.getFuture() + ", taskModel=" + this.getTaskModel() + ", killed=" + this.isKilled() + ", runtimeStat=" + this.getRuntimeStat() + ")";
        }
    }
}

