/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.iohub.sdk.client.instream.test;

import com.ovopark.iohub.sdk.client.Client2ControlTransport;
import com.ovopark.iohub.sdk.client.Client2WorkRestClient;
import com.ovopark.iohub.sdk.client.ClientNodeProvider;
import com.ovopark.iohub.sdk.client.IOHubClientConfig;
import com.ovopark.iohub.sdk.client.JobClientActive;
import com.ovopark.iohub.sdk.client.instream.JobInTaskFlow;
import com.ovopark.iohub.sdk.client.instream.JobInTaskFlowProvider;
import com.ovopark.iohub.sdk.client.instream.test.ImportTaskVo;
import com.ovopark.iohub.sdk.client.outstream.RequestParamBody;
import com.ovopark.iohub.sdk.model.AppNode;
import com.ovopark.iohub.sdk.model.instream.ImportPushDataRequest;
import com.ovopark.iohub.sdk.model.instream.ImportReplyRequest;
import com.ovopark.iohub.sdk.model.proto.DataExistStrategy;
import com.ovopark.iohub.sdk.model.proto.ImportPreDefConf;
import com.ovopark.iohub.sdk.model.proto.LimitLogger;
import com.ovopark.iohub.sdk.model.proto.Segment;
import com.ovopark.iohub.sdk.model.proto.internal.ImportPreDefConfImpl;
import com.ovopark.iohub.sdk.model.proto.internal.ReplyModel;
import com.ovopark.iohub.sdk.model.proto.internal.ReplyRowModel;
import com.ovopark.iohub.sdk.model.proto.internal.SegmentImpl;
import com.ovopark.iohub.sdk.model.test.ReplyRenderResponse;
import com.ovopark.kernel.shared.Config;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Util;
import com.ovopark.kernel.shared.stream.Stream;
import com.ovopark.module.shared.BaseResult;
import com.ovopark.module.shared.Session;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@JobClientActive
@RestController(value="com.ovopark.iohub.sdk.client.instream.test.ImportLocalTestEndpoint")
@RequestMapping(value={"/feign/iohub-job/processing/only-test/import"})
public class ImportLocalTestEndpoint {
    private static final Logger log = LoggerFactory.getLogger(ImportLocalTestEndpoint.class);
    final String workIp = Config.ConfigPriority.option().getString("iohub.client.io.work.ip", "8.139.4.155");
    final int workPort = Config.ConfigPriority.option().getInt("iohub.client.io.work.port", Integer.valueOf(13950));
    @Autowired
    private Client2ControlTransport client2ControlTransport;
    @Autowired
    private IOHubClientConfig ioHubClientConfig;
    @Autowired
    private JobInTaskFlowProvider jobInTaskFlowProvider;
    @Autowired
    private ClientNodeProvider clientNodeProvider;

    @RequestMapping(value={"/submit"})
    public BaseResult<Boolean> submit(ImportTaskVo importTaskVo, @RequestHeader(value="ovo-authorization", required=false) String token) {
        String uri = importTaskVo.getUri();
        String args = importTaskVo.getParams();
        String localFile = this.write2LocalFile(0L, importTaskVo);
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        factory.setReadTimeout(45000);
        factory.setConnectTimeout(15000);
        RestTemplate restTemplate = new RestTemplate((ClientHttpRequestFactory)factory);
        restTemplate.getMessageConverters().add(0, new StringHttpMessageConverter(StandardCharsets.UTF_8));
        AppNode appNode = new AppNode();
        appNode.setIp(this.workIp);
        appNode.setPort(this.workPort);
        appNode.setServletPath("iohub-work");
        Client2WorkRestClient client2WorkRestClient = new Client2WorkRestClient(appNode, restTemplate);
        JobInTaskFlow jobInTaskFlow = this.jobInTaskFlowProvider.find(uri);
        if (jobInTaskFlow == null) {
            jobInTaskFlow = this.jobInTaskFlowProvider.find("/iohub-test/import/testAnyImport");
            log.info("found flow impl: " + jobInTaskFlow.getClass().getName());
        }
        jobInTaskFlow.prepared();
        Session si = Session.getOrCreate().get();
        if (si == null && Util.isNotEmpty((CharSequence)token)) {
            si = client2WorkRestClient.parseSessionOnlyForTest(token);
        }
        if (si == null) {
            log.error("cannot parse session , invalid token ???: " + token);
            throw new IllegalArgumentException("cannot parse session , invalid token ???: " + token);
        }
        final RequestParamBody requestParamBody = Util.isEmpty((CharSequence)args) ? null : jobInTaskFlow.requestParamBody(args, si);
        JobInTaskFlow.ImportTaskInfoProvider importTaskInfoProvider = jobInTaskFlow.importTaskInfoProvider(requestParamBody, si);
        final ImportPreDefConfImpl importPreDefConf = new ImportPreDefConfImpl();
        importPreDefConf.setDataExistStrategy(DataExistStrategy.valueOf((String)importTaskVo.getDataExistStrategy()));
        final ReplyModel replyModel = new ReplyModel();
        final HashMap replyModelIndex = new HashMap();
        final AtomicBoolean cancelled = new AtomicBoolean(false);
        final JobInTaskFlow.ImportContext importContext = new JobInTaskFlow.ImportContext(){

            public void cancel() {
                cancelled.set(true);
            }

            public synchronized void replyWithSegment(int segment, int fromRow, int toRow, String desc, JobInTaskFlow.Result result) {
                if (result == JobInTaskFlow.Result.EXISTS) {
                    replyModel.setRepeatCount(replyModel.getRepeatCount() + 1);
                } else if (result == JobInTaskFlow.Result.SUCCESS) {
                    replyModel.setSuccessCount(replyModel.getSuccessCount() + 1);
                } else if (result == JobInTaskFlow.Result.FAIL) {
                    replyModel.setErrorCount(replyModel.getErrorCount() + 1);
                }
                String key = segment + ":" + fromRow + ":" + toRow;
                ReplyRowModel rrm = (ReplyRowModel)replyModelIndex.get(key);
                if (rrm == null) {
                    ReplyRowModel replyRowModel = ReplyRowModel.replyWithSegment((int)segment, (int)fromRow, (int)toRow, (String)desc);
                    replyModel.add(replyRowModel);
                    replyModelIndex.put(key, replyRowModel);
                } else {
                    if (rrm.getDescList().size() > 100) {
                        return;
                    }
                    rrm.getDescList().add(desc);
                }
            }
        };
        JobInTaskFlow.ImportConf<RequestParamBody> importConf = new JobInTaskFlow.ImportConf<RequestParamBody>(){

            public RequestParamBody requestParam() {
                return requestParamBody;
            }

            public ImportPreDefConf importPreDefConf() {
                return importPreDefConf;
            }
        };
        ImportPushDataRequest importPushDataRequest = client2WorkRestClient.parseOnlyForTest(localFile, uri, importTaskVo.getParams());
        if (importPushDataRequest == null) {
            log.error("cannot parse excel , invalid excel ???: " + uri);
            throw new IllegalArgumentException("cannot parse excel , invalid excel ???: " + uri);
        }
        final List segmentList = importPushDataRequest.getSegmentList();
        final int segmentCount = segmentList.size();
        int rowCount = 0;
        for (SegmentImpl segment : segmentList) {
            rowCount += segment.size();
        }
        final int finalRowCount = rowCount;
        final Session finalSi = si;
        final LimitLogger limitLogger = new LimitLogger(){

            public void log(String content) {
                log.info(content);
            }
        };
        JobInTaskFlow.DataRowStream<RequestParamBody> dataRowStream = new JobInTaskFlow.DataRowStream<RequestParamBody>(){
            final AtomicBoolean reset = new AtomicBoolean(true);
            final /* synthetic */ JobInTaskFlow.ImportConf val$importConf;
            {
                this.val$importConf = importConf;
            }

            public JobInTaskFlow.DataRowStream<RequestParamBody> reset() {
                cancelled.set(false);
                return this;
            }

            public void mark(JobInTaskFlow.StreamProcessResult streamProcessResult, String msg) {
                if (streamProcessResult == JobInTaskFlow.StreamProcessResult.SUCCESS) {
                    replyModel.setStatus(1);
                } else if (streamProcessResult == JobInTaskFlow.StreamProcessResult.FAIL) {
                    replyModel.setStatus(-1);
                    replyModel.setFailMsg(msg);
                }
            }

            public LimitLogger limitLogger() {
                return limitLogger;
            }

            public void subscribe(JobInTaskFlow.DataRowListener dataRowListener) {
                if (!this.reset.get()) {
                    throw new IllegalStateException("need reset stream before subscribe it.");
                }
                for (int segmentIndex = 0; segmentIndex < segmentList.size(); ++segmentIndex) {
                    if (cancelled.get()) {
                        Stream.from((Object)("cancel the import flow, segment: " + segmentIndex)).subscribe(s -> log.info(s));
                        return;
                    }
                    Segment segment = (Segment)segmentList.get(segmentIndex);
                    final HashMap headerCellMap = new HashMap();
                    segment.header().scanUp2Down(new Segment.Header.Scan(){

                        public void scan(Segment.Header.Cell cell) {
                            headerCellMap.put(cell.path(), cell);
                        }
                    });
                    JobInTaskFlow.HeaderLookup headerLookup = new JobInTaskFlow.HeaderLookup(){

                        public Segment.Header.Cell lookup(String path) {
                            return (Segment.Header.Cell)headerCellMap.get(path);
                        }
                    };
                    List mapList = segment.rowList();
                    for (int rowIndex = 0; rowIndex < mapList.size(); ++rowIndex) {
                        if (cancelled.get()) {
                            Stream.from((Object)("cancel the import flow, segment: " + segmentIndex)).subscribe(s -> log.info(s));
                            return;
                        }
                        Map data = (Map)mapList.get(rowIndex);
                        boolean allEmpty = true;
                        for (Object v : data.values()) {
                            if (!Util.isNotEmpty((CharSequence)((String)v))) continue;
                            allEmpty = false;
                            break;
                        }
                        if (allEmpty) continue;
                        JobInTaskFlow.DataRowImpl dataRow = new JobInTaskFlow.DataRowImpl();
                        dataRow.setSegment(segmentIndex);
                        dataRow.setRowNum(rowIndex);
                        dataRow.setData(data);
                        dataRow.setHeaderLookup(headerLookup);
                        dataRowListener.onRow((JobInTaskFlow.DataRow)dataRow, importContext);
                    }
                }
                this.reset.set(false);
            }

            public int segmentCount() {
                return segmentCount;
            }

            public long rowCount() {
                return finalRowCount;
            }

            public long rowCount(int segment) {
                return ((SegmentImpl)segmentList.get(segment)).size();
            }

            public Session session() {
                return finalSi;
            }

            public JobInTaskFlow.ImportConf<RequestParamBody> importConf() {
                return this.val$importConf;
            }
        };
        jobInTaskFlow.execute((JobInTaskFlow.DataRowStream)dataRowStream);
        log.info("replyModel size: " + replyModel.getReplyMap().size());
        ImportReplyRequest importReplyRequest = new ImportReplyRequest();
        importReplyRequest.setTaskId(importPushDataRequest.getTaskId());
        importReplyRequest.setApp(null);
        importReplyRequest.setNode(null);
        importReplyRequest.setWorkApp(null);
        importReplyRequest.setWorkNode(null);
        importReplyRequest.setReplyModel(replyModel);
        ReplyRenderResponse importReplyResponse = client2WorkRestClient.renderReplyOnlyForTest(importReplyRequest);
        log.info("reply result: " + JSONAccessor.impl().format((Object)importReplyResponse));
        log.info("reply url: " + importReplyResponse.getUrl());
        return BaseResult.success((Object)true);
    }

    private String write2LocalFile(long jobTaskId, ImportTaskVo importTaskVo) {
        String basePath = this.ioHubClientConfig.nfsPath();
        String osName = System.getProperty("os.name").toLowerCase();
        if (osName.contains("win")) {
            log.info("This is a Windows operating system.");
            if (basePath.startsWith("/")) {
                basePath = "d:/iohub";
            }
        } else {
            log.info("This is not a Windows operating system.");
            log.warn("you must config: 'iohub.client.io.nfsPath=/xxx' in environment");
        }
        log.warn(basePath + " , directory exists????");
        String jobTaskFilePath = basePath + "/test-client/" + Util.formatTime((LocalDateTime)LocalDateTime.now(), (String[])new String[]{"yyyyMMddHHmmss"}) + "/" + jobTaskId;
        File file = new File(jobTaskFilePath);
        if (!file.exists()) {
            file.mkdirs();
        }
        String filePath = jobTaskFilePath + "/import-" + Util.uniqueFirstPart() + ".xlsx";
        log.info("to write data to disk: " + filePath);
        try {
            importTaskVo.getFile().transferTo(new File(filePath));
        }
        catch (IOException e) {
            throw Util.convert2RuntimeException((Throwable)e);
        }
        log.info("write data to disk (" + filePath + "), cost time ");
        return filePath;
    }
}

