/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.iohub.sdk.client.instream;

import com.ovopark.iohub.sdk.client.Client2ControlTransport;
import com.ovopark.iohub.sdk.client.Client2WorkRestClient;
import com.ovopark.iohub.sdk.client.Client2WorkTransport;
import com.ovopark.iohub.sdk.client.ClientNode;
import com.ovopark.iohub.sdk.client.ClientNodeRegister;
import com.ovopark.iohub.sdk.client.IOHubClientConfig;
import com.ovopark.iohub.sdk.client.JobClientActive;
import com.ovopark.iohub.sdk.client.JobLog;
import com.ovopark.iohub.sdk.client.JobLogImpl;
import com.ovopark.iohub.sdk.client.instream.JobInTaskFlow;
import com.ovopark.iohub.sdk.client.instream.JobInTaskFlowProvider;
import com.ovopark.iohub.sdk.client.outstream.RequestParamBody;
import com.ovopark.iohub.sdk.model.AppNode;
import com.ovopark.iohub.sdk.model.ClientNodeTaskRegisterRequest;
import com.ovopark.iohub.sdk.model.ClientNodeTaskRegisterResponse;
import com.ovopark.iohub.sdk.model.JobMeta;
import com.ovopark.iohub.sdk.model.TaskCancelRequest;
import com.ovopark.iohub.sdk.model.TaskCancelResponse;
import com.ovopark.iohub.sdk.model.TaskLockRequest;
import com.ovopark.iohub.sdk.model.TaskLockResponse;
import com.ovopark.iohub.sdk.model.TaskModel;
import com.ovopark.iohub.sdk.model.TransientFunc;
import com.ovopark.iohub.sdk.model.instream.ImportCommitRequest;
import com.ovopark.iohub.sdk.model.instream.ImportCommitResponse;
import com.ovopark.iohub.sdk.model.instream.ImportPushDataRequest;
import com.ovopark.iohub.sdk.model.instream.ImportPushDataResponse;
import com.ovopark.iohub.sdk.model.instream.ImportReplyRequest;
import com.ovopark.iohub.sdk.model.instream.ImportReplyResponse;
import com.ovopark.iohub.sdk.model.proto.ImportPreDefConf;
import com.ovopark.iohub.sdk.model.proto.ImportPreDefConfImpl;
import com.ovopark.iohub.sdk.model.proto.ImportPreDefParam;
import com.ovopark.iohub.sdk.model.proto.LimitLogger;
import com.ovopark.iohub.sdk.model.proto.NoPrivilegeException;
import com.ovopark.iohub.sdk.model.proto.ReplyModel;
import com.ovopark.iohub.sdk.model.proto.ReplyRowModel;
import com.ovopark.iohub.sdk.model.proto.Segment;
import com.ovopark.iohub.sdk.model.proto.SegmentImpl;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Util;
import com.ovopark.kernel.shared.stream.Stream;
import com.ovopark.module.shared.Session;
import com.ovopark.module.shared.spring.rbac.SessionImpl;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@JobClientActive
@RestController(value="com.ovopark.iohub.sdk.client.instream.ImportEndpoint")
@RequestMapping(value={"/feign/iohub-job/processing/import"})
public class ImportEndpoint
implements InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(ImportEndpoint.class);
    @Autowired
    private Client2ControlTransport client2ControlTransport;
    private static final ExecutorService jobExecutor = new ThreadPoolExecutor(Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 64), Integer.parseInt(System.getProperty("IOHUB_IMPORT_IO", "0"))), Math.max(Math.max(Runtime.getRuntime().availableProcessors() * 2, 1024), Integer.parseInt(System.getProperty("IOHUB_IMPORT_IO", "0"))), 600L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), Util.newThreadFactory((String)"iohub-import-io"), new ThreadPoolExecutor.AbortPolicy());
    private final Set<Long> executingJobTaskIdSet = ConcurrentHashMap.newKeySet();
    final Map<String, SubmitTask> submitTaskMap = new ConcurrentHashMap<String, SubmitTask>();
    @Autowired
    private IOHubClientConfig ioHubClientConfig;
    @Autowired
    private JobInTaskFlowProvider jobInTaskFlowProvider;
    @Autowired
    private ClientNodeRegister.ClientNodeProvider clientNodeProvider;
    private static JobTaskManager instance;
    private final SimpleJobTaskManager simpleJobTaskManager = new SimpleJobTaskManager();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @RequestMapping(value={"/submit"})
    @ResponseBody
    public ImportPushDataResponse submit(@RequestBody ImportPushDataRequest importPushDataRequest) {
        String taskKey = "task_" + importPushDataRequest.getTaskId();
        MDC.put((String)"traceId", (String)taskKey);
        MDC.put((String)"requestId", (String)taskKey);
        log.info("receive a new task, we can do it???: " + JSONAccessor.impl().format((Object)importPushDataRequest));
        ImportPushDataResponse importPushDataResponse = new ImportPushDataResponse();
        importPushDataResponse.setSuccess(false);
        final JobLogImpl client2ControlLog = new JobLogImpl(this.client2ControlTransport, 300, taskKey);
        try {
            Util.lock((Comparable)((Object)taskKey), () -> {
                if (this.executingJobTaskIdSet.contains(importPushDataRequest.getTaskId())) {
                    importPushDataResponse.setSuccess(false);
                    importPushDataResponse.setDesc("Rejected,the previous task is executing");
                    throw new IllegalStateException("the previous task is executing, reject the task: " + JSONAccessor.impl().format((Object)importPushDataRequest));
                }
                TaskLockRequest taskLockRequest = new TaskLockRequest();
                taskLockRequest.setTaskId(importPushDataRequest.getTaskId());
                taskLockRequest.setNode(ClientNode.UUID_STR);
                TaskLockResponse taskLockResponse = this.client2ControlTransport.lockClient(taskLockRequest);
                if (taskLockResponse == null || !taskLockResponse.isSuccess()) {
                    importPushDataResponse.setSuccess(false);
                    throw new IllegalStateException("cannot get lock: " + JSONAccessor.impl().format((Object)taskLockRequest));
                }
                this.executingJobTaskIdSet.add(importPushDataRequest.getTaskId());
                long taskId = importPushDataRequest.getTaskId();
                String uri = importPushDataRequest.getJobMeta().getUri();
                String args = importPushDataRequest.getArgs();
                ImportPreDefParam preDefParam = importPushDataRequest.getPreDefParam();
                String session = importPushDataRequest.getSession();
                JobMeta jobMeta = importPushDataRequest.getJobMeta();
                AppNode worker = importPushDataRequest.getWorker();
                JobTaskManager jobTaskManager = new JobTaskManager(){

                    @Override
                    public void cancel(long taskId) {
                        SubmitTask submitTask = ImportEndpoint.this.submitTaskMap.get("task_" + taskId);
                        submitTask.future.cancel(true);
                        Stream.from((Object)"cancel the task").subscribe(s -> {
                            log.info(s);
                            client2ControlLog.log((String)s);
                        });
                    }
                };
                ClientNode clientNode = this.clientNodeProvider.clientNode();
                SessionImpl si = Util.isEmpty((CharSequence)session) ? null : (SessionImpl)JSONAccessor.impl().read(session, SessionImpl.class);
                SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
                factory.setReadTimeout(45000);
                factory.setConnectTimeout(15000);
                RestTemplate restTemplate = new RestTemplate((ClientHttpRequestFactory)factory);
                restTemplate.getMessageConverters().add(0, new StringHttpMessageConverter(StandardCharsets.UTF_8));
                Client2WorkRestClient client2WorkTransport = new Client2WorkRestClient(worker, restTemplate);
                Stream.from((Object)"accepted , to import data??? : ").subscribe(s -> {
                    log.info(s);
                    client2ControlLog.log((String)s);
                });
                JobInTaskFlow<RequestParamBody> jobInTaskFlow = this.jobInTaskFlowProvider.find(uri);
                boolean taskSubmit = false;
                try {
                    RequestParamBody requestParamBody;
                    jobInTaskFlow.prepared();
                    Stream.from((Object)"prepare bean : ").subscribe(s -> {
                        log.info(s);
                        client2ControlLog.log((String)s);
                    });
                    try {
                        log.info(" to parse args: " + args);
                        requestParamBody = Util.isEmpty((CharSequence)args) ? null : jobInTaskFlow.requestParamBody(args, (Session)si);
                    }
                    catch (NoPrivilegeException e) {
                        importPushDataResponse.setSuccess(false);
                        importPushDataResponse.setDesc("no privilege");
                        throw new IllegalStateException("no privilege, reject the task: " + JSONAccessor.impl().format((Object)(args + ", session: " + JSONAccessor.impl().format((Object)si))));
                    }
                    catch (Exception e) {
                        importPushDataResponse.setSuccess(false);
                        importPushDataResponse.setDesc(e.getMessage());
                        log.error(e.getMessage(), (Throwable)e);
                        throw new IllegalStateException("error, reject the task: " + e.getMessage() + JSONAccessor.impl().format((Object)(args + ", session: " + JSONAccessor.impl().format((Object)si))));
                    }
                    JobInTaskFlow.ImportTaskInfoProvider importTaskInfoProvider = jobInTaskFlow.importTaskInfoProvider(requestParamBody, (Session)si);
                    String name = importTaskInfoProvider == null ? null : importTaskInfoProvider.name();
                    Stream.from((Object)("update import record  , import record id: 1, pre name: " + name)).subscribe(s -> {
                        log.info(s);
                        client2ControlLog.log((String)s);
                    });
                    ClientNodeTaskRegisterRequest clientNodeTaskRegisterRequest = new ClientNodeTaskRegisterRequest();
                    clientNodeTaskRegisterRequest.setTaskId(taskId);
                    clientNodeTaskRegisterRequest.setApp(clientNode.app());
                    clientNodeTaskRegisterRequest.setNode(clientNode.node());
                    clientNodeTaskRegisterRequest.setWorkApp(worker.getApp());
                    clientNodeTaskRegisterRequest.setWorkNode(worker.getNode());
                    SubmitTask submitTask = new SubmitTask();
                    submitTask.setTaskModel(new TaskModel());
                    submitTask.getTaskModel().setImportClientStat(new TaskModel.ImportClientStat());
                    TaskModel taskModel = submitTask.getTaskModel();
                    TaskModel.ImportClientStat importClientStat = taskModel.getImportClientStat();
                    RuntimeStatImpl runtimeStat = new RuntimeStatImpl();
                    try {
                        Future<?> future = jobExecutor.submit(Util.catchRunnable(() -> {
                            JobLogImpl jobLog2Control = new JobLogImpl(this.client2ControlTransport, 300, taskKey);
                            taskModel.setIoThreadName(Thread.currentThread().getName());
                            taskModel.setTransientFunc(() -> {
                                int sumRowCount = importClientStat.getSumRowCount();
                                int rowCount = runtimeStat.rowCount();
                                importClientStat.setRowCountAdded(rowCount - sumRowCount);
                                importClientStat.setSumRowCount(rowCount);
                                String currentSegment = runtimeStat.currentSegment();
                                importClientStat.setCurrentSegment(currentSegment);
                            });
                            clientNodeTaskRegisterRequest.setIoThreadName(Thread.currentThread().getName());
                            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, Util.newThreadFactory((String)"tmp"), new ThreadPoolExecutor.CallerRunsPolicy());
                            scheduledThreadPoolExecutor.setKeepAliveTime(60L, TimeUnit.SECONDS);
                            scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true);
                            try {
                                this.doTask(() -> {
                                    Stream.from((Object)"to import all data ").subscribe(s -> {
                                        log.info(s);
                                        jobLog2Control.log((String)s);
                                    });
                                    jobLog2Control.flush();
                                    ReplyModel replyModel = new ReplyModel();
                                    JobInTaskFlow.DataRowStream<RequestParamBody> dataRowStream = this.dataRowStream0(importPushDataRequest, requestParamBody, jobLog2Control, (Session)si, submitTask, replyModel, runtimeStat);
                                    try {
                                        jobInTaskFlow.execute(dataRowStream);
                                        Stream.from((Object)"import all data , then push reply to work").subscribe(s -> {
                                            log.info(s);
                                            jobLog2Control.log((String)s);
                                        });
                                        jobLog2Control.flush();
                                    }
                                    catch (Throwable e) {
                                        dataRowStream.mark(JobInTaskFlow.StreamProcessResult.FAIL, e.getMessage());
                                        log.error("job in task flow error: " + e.getMessage(), e);
                                        jobLog2Control.log("job in task flow error: " + e.getMessage());
                                        throw Util.convert2RuntimeException((Throwable)e);
                                    }
                                    finally {
                                        if (submitTask.killed) {
                                            throw new CancellationException("task is cancelled");
                                        }
                                        ImportReplyRequest importReplyRequest = new ImportReplyRequest();
                                        importReplyRequest.setTaskId(taskId);
                                        importReplyRequest.setApp(clientNode.app());
                                        importReplyRequest.setNode(clientNode.node());
                                        importReplyRequest.setWorkApp(worker.getApp());
                                        importReplyRequest.setWorkNode(worker.getNode());
                                        importReplyRequest.setReplyModel(replyModel);
                                        ImportReplyResponse importReplyResponse = client2WorkTransport.importReply(importReplyRequest);
                                        Stream.from((Object)("reply result: " + JSONAccessor.impl().format((Object)importReplyResponse))).subscribe(s -> {
                                            log.info(s);
                                            jobLog2Control.log((String)s);
                                        });
                                        if (importReplyResponse == null || !importReplyResponse.isSuccess()) {
                                            Stream.from((Object)"push reply fail???").subscribe(s -> {
                                                log.info(s);
                                                jobLog2Control.log((String)s);
                                            });
                                        }
                                        ImportCommitRequest importCommitRequest = new ImportCommitRequest();
                                        importCommitRequest.setTaskId(taskId);
                                        importCommitRequest.setApp(clientNode.app());
                                        importCommitRequest.setNode(clientNode.node());
                                        importCommitRequest.setWorkApp(worker.getApp());
                                        importCommitRequest.setWorkNode(worker.getNode());
                                        ImportCommitResponse importCommitResponse = client2WorkTransport.importCommit(importCommitRequest);
                                        Stream.from((Object)("commit result: " + JSONAccessor.impl().format((Object)importCommitResponse))).subscribe(s -> {
                                            log.info(s);
                                            jobLog2Control.log((String)s);
                                        });
                                        if (importCommitResponse == null || !importCommitResponse.isSuccess()) {
                                            Stream.from((Object)"push commit fail???").subscribe(s -> {
                                                log.info(s);
                                                jobLog2Control.log((String)s);
                                            });
                                        }
                                        Stream.from((Object)"task completed, and all reply pushed to worker, wait worker generate xlsx???").subscribe(s -> {
                                            log.info(s);
                                            jobLog2Control.log((String)s);
                                        });
                                    }
                                    return null;
                                }, client2WorkTransport, clientNodeTaskRegisterRequest, scheduledThreadPoolExecutor, jobTaskManager, submitTask);
                            }
                            catch (Throwable e) {
                                jobLog2Control.log("error: " + e.getMessage());
                                throw Util.convert2RuntimeException((Throwable)e);
                            }
                            finally {
                                this.submitTaskMap.remove(taskKey);
                                this.executingJobTaskIdSet.remove(taskLockRequest.getTaskId());
                                try {
                                    jobLog2Control.log(taskKey + " > completed: " + Util.formatTime((LocalDateTime)LocalDateTime.now(), (String[])new String[0]));
                                    jobLog2Control.flush();
                                    jobLog2Control.close();
                                }
                                catch (Exception e) {
                                    log.error(e.getMessage(), (Throwable)e);
                                }
                                try {
                                    jobInTaskFlow.close();
                                }
                                catch (Exception e) {
                                    log.error(e.getMessage(), (Throwable)e);
                                }
                            }
                        }));
                        taskSubmit = true;
                        submitTask.setTaskId(importPushDataRequest.getTaskId());
                        submitTask.setFuture(future);
                        taskModel.setJobTaskId(importPushDataRequest.getTaskId());
                        taskModel.setWorkNode(importPushDataRequest.getWorker().getNode());
                        taskModel.setClientNode(ClientNode.UUID_STR);
                        taskModel.setAcceptTime(System.currentTimeMillis());
                        taskModel.setAcceptTimeStr(Util.formatTime((LocalDateTime)LocalDateTime.now(), (String[])new String[0]));
                        taskModel.setExport(false);
                        taskModel.setUri(uri);
                        taskModel.setClientApp(clientNode.app());
                        taskModel.setSession(session);
                        taskModel.setUserId(si == null ? null : si.getUserId());
                        taskModel.setUserName(si == null ? null : si.getUserName());
                        taskModel.setUserGroupId(si == null ? null : si.getGroupId());
                        this.submitTaskMap.put(taskKey, submitTask);
                        log.info("OK, the task is scheduled: " + taskKey);
                        importPushDataResponse.setSuccess(true);
                    }
                    catch (RejectedExecutionException e) {
                        this.executingJobTaskIdSet.remove(taskLockRequest.getTaskId());
                        importPushDataResponse.setSuccess(false);
                        importPushDataResponse.setDesc("Rejected, resource is unavailable ");
                        throw Util.convert2RuntimeException((Throwable)e);
                    }
                }
                catch (Exception e) {
                    client2ControlLog.log("error : " + e.getMessage());
                    throw Util.convert2RuntimeException((Throwable)e);
                }
                finally {
                    if (!taskSubmit) {
                        jobInTaskFlow.close();
                    }
                }
                return importPushDataRequest;
            }, (long)10L, (TimeUnit)TimeUnit.SECONDS);
            return importPushDataResponse;
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
            importPushDataResponse.setSuccess(false);
            importPushDataResponse.setDesc(e.getMessage());
            return importPushDataResponse;
        }
        finally {
            try {
                client2ControlLog.close();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
            finally {
                MDC.remove((String)"traceId");
                MDC.remove((String)"requestId");
            }
        }
    }

    private JobInTaskFlow.DataRowStream<RequestParamBody> dataRowStream0(ImportPushDataRequest importPushDataRequest, final RequestParamBody requestParamBody, final JobLog jobLog2Control, final Session session, SubmitTask submitTask, final ReplyModel replyModel, final JobInTaskFlow.RuntimeStat runtimeStat) {
        final TaskModel.ImportClientStat importClientStat = submitTask.getTaskModel().getImportClientStat();
        final ImportPreDefConfImpl importPreDefConf = new ImportPreDefConfImpl();
        importPreDefConf.setDataExistStrategy(importPushDataRequest.getPreDefParam().getDataExistStrategy());
        final HashMap replyModelIndex = new HashMap();
        final AtomicBoolean cancelled = new AtomicBoolean(false);
        final JobInTaskFlow.ImportContext importContext = new JobInTaskFlow.ImportContext(){

            @Override
            public void cancel() {
                cancelled.set(true);
            }

            @Override
            public synchronized void replyWithSegment(int segment, int fromRow, int toRow, String desc, JobInTaskFlow.Result result) {
                if (result == JobInTaskFlow.Result.EXISTS) {
                    replyModel.setRepeatCount(replyModel.getRepeatCount() + 1);
                } else if (result == JobInTaskFlow.Result.SUCCESS) {
                    replyModel.setSuccessCount(replyModel.getSuccessCount() + 1);
                } else if (result == JobInTaskFlow.Result.FAIL) {
                    replyModel.setErrorCount(replyModel.getErrorCount() + 1);
                }
                String key = segment + ":" + fromRow + ":" + toRow;
                ReplyRowModel rrm = (ReplyRowModel)replyModelIndex.get(key);
                if (rrm == null) {
                    ReplyRowModel replyRowModel = ReplyRowModel.replyWithSegment((int)segment, (int)fromRow, (int)toRow, (String)desc);
                    replyModel.add(replyRowModel);
                    replyModelIndex.put(key, replyRowModel);
                } else {
                    if (rrm.getDescList().size() > 100) {
                        return;
                    }
                    rrm.getDescList().add(desc);
                }
            }
        };
        final JobInTaskFlow.ImportConf<RequestParamBody> importConf = new JobInTaskFlow.ImportConf<RequestParamBody>(){

            @Override
            public RequestParamBody requestParam() {
                return requestParamBody;
            }

            @Override
            public ImportPreDefConf importPreDefConf() {
                return importPreDefConf;
            }
        };
        final LimitLogger limitLogger = new LimitLogger(){
            final AtomicLong limit = new AtomicLong();

            public void log(String content) {
                log.info(content);
                if (this.limit.incrementAndGet() > 10000L) {
                    log.warn("limit logger,cannot push log to control, exceed max log count: 10000");
                    return;
                }
                jobLog2Control.log(content);
            }
        };
        final List segmentList = importPushDataRequest.getSegmentList();
        final int segmentCount = segmentList.size();
        int rowCount = 0;
        for (SegmentImpl segment : segmentList) {
            rowCount += segment.size();
        }
        final int finalRowCount = rowCount;
        importClientStat.setMaxRowCount(finalRowCount);
        importClientStat.setSegmentCount(segmentList.size());
        return new JobInTaskFlow.DataRowStream<RequestParamBody>(){
            final AtomicBoolean reset = new AtomicBoolean(true);

            @Override
            public JobInTaskFlow.DataRowStream<RequestParamBody> reset() {
                this.reset.set(true);
                importClientStat.setResetStream(true);
                return this;
            }

            @Override
            public void mark(JobInTaskFlow.StreamProcessResult streamProcessResult, String msg) {
                if (streamProcessResult == JobInTaskFlow.StreamProcessResult.SUCCESS) {
                    replyModel.setStatus(1);
                } else if (streamProcessResult == JobInTaskFlow.StreamProcessResult.FAIL) {
                    replyModel.setStatus(-1);
                    replyModel.setFailMsg(msg);
                }
            }

            @Override
            public void subscribe(JobInTaskFlow.DataRowListener dataRowListener) {
                if (!this.reset.get()) {
                    throw new IllegalStateException("need reset stream before subscribe it.");
                }
                for (int segmentIndex = 0; segmentIndex < segmentList.size(); ++segmentIndex) {
                    if (runtimeStat.cancelled()) {
                        throw new CancellationException("task is cancelled");
                    }
                    if (cancelled.get()) {
                        Stream.from((Object)("cancel the import flow, segment: " + segmentIndex)).subscribe(s -> {
                            log.info(s);
                            jobLog2Control.log((String)s);
                        });
                        this.reset.set(false);
                        return;
                    }
                    Segment segment = (Segment)segmentList.get(segmentIndex);
                    final HashMap headerCellMap = new HashMap();
                    segment.header().scanUp2Down(new Segment.Header.Scan(){

                        public void scan(Segment.Header.Cell cell) {
                            headerCellMap.put(cell.path(), cell);
                        }
                    });
                    dataRowListener.onHeader(segmentIndex, segment.header(), importContext);
                    if (cancelled.get()) {
                        Stream.from((Object)"cannot pass header validation, break importing flow").subscribe(s -> {
                            log.info(s);
                            jobLog2Control.log((String)s);
                        });
                        return;
                    }
                    JobInTaskFlow.HeaderLookup headerLookup = new JobInTaskFlow.HeaderLookup(){

                        @Override
                        public Segment.Header.Cell lookup(String path) {
                            return (Segment.Header.Cell)headerCellMap.get(path);
                        }
                    };
                    List mapList = segment.rowList();
                    int size = mapList.size();
                    for (int rowIndex = 0; rowIndex < size; ++rowIndex) {
                        if (runtimeStat.cancelled()) {
                            throw new CancellationException("task is cancelled");
                        }
                        runtimeStat.rowCountAdd(1);
                        runtimeStat.currentSegment(segmentIndex + "#" + segment.name() + ": " + rowIndex + "/" + size);
                        if (cancelled.get()) {
                            Stream.from((Object)("cancel the import flow, segment: " + segmentIndex)).subscribe(s -> {
                                log.info(s);
                                jobLog2Control.log((String)s);
                            });
                            this.reset.set(false);
                            return;
                        }
                        Map data = (Map)mapList.get(rowIndex);
                        boolean allEmpty = true;
                        for (Object v : data.values()) {
                            if (!Util.isNotEmpty((CharSequence)((String)v))) continue;
                            allEmpty = false;
                            break;
                        }
                        if (allEmpty) continue;
                        JobInTaskFlow.DataRowImpl dataRow = new JobInTaskFlow.DataRowImpl();
                        dataRow.setSegment(segmentIndex);
                        dataRow.setRowNum(rowIndex);
                        dataRow.setData(data);
                        dataRow.setHeaderLookup(headerLookup);
                        dataRowListener.onRow(dataRow, importContext);
                    }
                }
                this.reset.set(false);
            }

            @Override
            public int segmentCount() {
                return segmentCount;
            }

            @Override
            public long rowCount() {
                return finalRowCount;
            }

            @Override
            public long rowCount(int segment) {
                return ((SegmentImpl)segmentList.get(segment)).size();
            }

            @Override
            public Session session() {
                return session;
            }

            @Override
            public JobInTaskFlow.ImportConf<RequestParamBody> importConf() {
                return importConf;
            }

            @Override
            public LimitLogger limitLogger() {
                return limitLogger;
            }
        };
    }

    @RequestMapping(value={"/cancel"})
    @ResponseBody
    public TaskCancelResponse cancel(@RequestBody TaskCancelRequest taskCancelRequest) {
        TaskCancelResponse taskCancelResponse = new TaskCancelResponse();
        long taskId = taskCancelRequest.getTaskId();
        if (!this.executingJobTaskIdSet.contains(taskId)) {
            taskCancelResponse.setFound(false);
            taskCancelResponse.setSuccess(false);
            return taskCancelResponse;
        }
        SubmitTask submitTask = this.submitTaskMap.get("task_" + taskId);
        if (submitTask == null) {
            taskCancelResponse.setFound(false);
            taskCancelResponse.setSuccess(false);
            return taskCancelResponse;
        }
        submitTask.setKilled(true);
        taskCancelResponse.setFound(true);
        Future future = submitTask.future;
        if (future == null) {
            taskCancelResponse.setSuccess(false);
            taskCancelResponse.setDesc("cannot find io thread");
            return taskCancelResponse;
        }
        future.cancel(true);
        taskCancelResponse.setSuccess(future.isCancelled());
        taskCancelResponse.setDesc("done cancel");
        return taskCancelResponse;
    }

    public void afterPropertiesSet() throws Exception {
        instance = this.simpleJobTaskManager;
    }

    private <T> T doTask(Callable<T> callable, final Client2WorkTransport client2WorkTransport, final ClientNodeTaskRegisterRequest clientNodeTaskRegisterRequest, ScheduledExecutorService taskHeartbeat, final JobTaskManager jobTaskManager, final SubmitTask submitTask) {
        ScheduledFuture<?> scheduledFuture = taskHeartbeat.scheduleWithFixedDelay(Util.catchRunnable((Util.CatchRunnable)new Util.CatchRunnable(){
            int failCount = 0;

            public void run() throws Exception {
                long taskId = submitTask.taskId;
                TaskModel taskModel = submitTask.getTaskModel();
                try {
                    TransientFunc transientFunc = taskModel.getTransientFunc();
                    transientFunc.run();
                }
                catch (Exception e) {
                    log.error(e.getMessage(), (Throwable)e);
                }
                TaskModel.ImportClientStat importClientStat = taskModel.getImportClientStat();
                clientNodeTaskRegisterRequest.setImportClientStat(importClientStat);
                ClientNodeTaskRegisterResponse clientNodeTaskRegisterResponse = client2WorkTransport.importHeartbeat(clientNodeTaskRegisterRequest);
                log.info("heartbeat from work(" + clientNodeTaskRegisterRequest.getWorkApp() + ":" + clientNodeTaskRegisterRequest.getWorkNode() + ") : " + JSONAccessor.impl().format((Object)clientNodeTaskRegisterResponse));
                if (clientNodeTaskRegisterResponse == null || !clientNodeTaskRegisterResponse.isSuccess()) {
                    ++this.failCount;
                    if (this.failCount > 3) {
                        jobTaskManager.cancel(taskId);
                    }
                    return;
                }
                this.failCount = 0;
            }
        }), 0L, 5L, TimeUnit.SECONDS);
        try {
            T t = callable.call();
            return t;
        }
        catch (Exception e) {
            throw Util.convert2RuntimeException((Throwable)e);
        }
        finally {
            scheduledFuture.cancel(true);
            log.info("cancel heartbeat thread");
        }
    }

    public static interface JobTaskManager {
        public void cancel(long var1);

        default public List<TaskModel> taskList() {
            throw new UnsupportedOperationException();
        }

        public static JobTaskManager getOrCreate() {
            return instance;
        }
    }

    private class SimpleJobTaskManager
    implements JobTaskManager {
        private SimpleJobTaskManager() {
        }

        @Override
        public void cancel(long taskId) {
            SubmitTask submitTask = ImportEndpoint.this.submitTaskMap.get("task_" + taskId);
            if (submitTask == null) {
                return;
            }
            if (submitTask.future == null) {
                return;
            }
            submitTask.future.cancel(true);
            log.info("cancel the task");
        }

        @Override
        public List<TaskModel> taskList() {
            ArrayList<TaskModel> taskModelList = new ArrayList<TaskModel>(ImportEndpoint.this.submitTaskMap.size());
            for (SubmitTask submitTask : ImportEndpoint.this.submitTaskMap.values()) {
                TaskModel taskModel = submitTask.getTaskModel();
                try {
                    TransientFunc transientFunc = taskModel.getTransientFunc();
                    transientFunc.run();
                }
                catch (Exception e) {
                    log.error(e.getMessage(), (Throwable)e);
                }
                taskModelList.add(taskModel);
            }
            return taskModelList;
        }
    }

    public static class SubmitTask {
        private long taskId;
        private Future<?> future;
        private TaskModel taskModel;
        private boolean killed;

        public long getTaskId() {
            return this.taskId;
        }

        public Future<?> getFuture() {
            return this.future;
        }

        public TaskModel getTaskModel() {
            return this.taskModel;
        }

        public boolean isKilled() {
            return this.killed;
        }

        public void setTaskId(long taskId) {
            this.taskId = taskId;
        }

        public void setFuture(Future<?> future) {
            this.future = future;
        }

        public void setTaskModel(TaskModel taskModel) {
            this.taskModel = taskModel;
        }

        public void setKilled(boolean killed) {
            this.killed = killed;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof SubmitTask)) {
                return false;
            }
            SubmitTask other = (SubmitTask)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getTaskId() != other.getTaskId()) {
                return false;
            }
            if (this.isKilled() != other.isKilled()) {
                return false;
            }
            Future<?> this$future = this.getFuture();
            Future<?> other$future = other.getFuture();
            if (this$future == null ? other$future != null : !this$future.equals(other$future)) {
                return false;
            }
            TaskModel this$taskModel = this.getTaskModel();
            TaskModel other$taskModel = other.getTaskModel();
            return !(this$taskModel == null ? other$taskModel != null : !this$taskModel.equals(other$taskModel));
        }

        protected boolean canEqual(Object other) {
            return other instanceof SubmitTask;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $taskId = this.getTaskId();
            result = result * 59 + (int)($taskId >>> 32 ^ $taskId);
            result = result * 59 + (this.isKilled() ? 79 : 97);
            Future<?> $future = this.getFuture();
            result = result * 59 + ($future == null ? 43 : $future.hashCode());
            TaskModel $taskModel = this.getTaskModel();
            result = result * 59 + ($taskModel == null ? 43 : $taskModel.hashCode());
            return result;
        }

        public String toString() {
            return "ImportEndpoint.SubmitTask(taskId=" + this.getTaskId() + ", future=" + this.getFuture() + ", taskModel=" + this.getTaskModel() + ", killed=" + this.isKilled() + ")";
        }
    }

    static class RuntimeStatImpl
    implements JobInTaskFlow.RuntimeStat {
        private final AtomicInteger rowCount = new AtomicInteger(0);
        String segment;
        private volatile boolean cancelled;

        RuntimeStatImpl() {
        }

        @Override
        public void rowCountAdd(int added) {
            this.rowCount.addAndGet(added);
        }

        @Override
        public int rowCount() {
            return this.rowCount.get();
        }

        @Override
        public void currentSegment(String segment) {
            this.segment = segment;
        }

        @Override
        public String currentSegment() {
            return this.segment;
        }

        @Override
        public void cancel() {
            this.cancelled = true;
        }

        @Override
        public boolean cancelled() {
            return this.cancelled;
        }
    }
}

