/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.iohub.sdk.client.instream;

import com.ovopark.iohub.sdk.client.Client2ControlTransport;
import com.ovopark.iohub.sdk.client.Client2WorkRestClient;
import com.ovopark.iohub.sdk.client.Client2WorkTransport;
import com.ovopark.iohub.sdk.client.ClientNode;
import com.ovopark.iohub.sdk.client.IOHubClientConfig;
import com.ovopark.iohub.sdk.client.instream.JobInTaskFlow;
import com.ovopark.iohub.sdk.client.instream.ReadJob;
import com.ovopark.iohub.sdk.client.outstream.RequestParamBody;
import com.ovopark.iohub.sdk.model.AppNode;
import com.ovopark.iohub.sdk.model.JobMeta;
import com.ovopark.iohub.sdk.model.instream.ImportPushDataRequest;
import com.ovopark.iohub.sdk.model.instream.ReadJobAssignWorkRequest;
import com.ovopark.iohub.sdk.model.instream.ReadJobAssignWorkResponse;
import com.ovopark.iohub.sdk.model.instream.ReadJobHeartbeatRequest;
import com.ovopark.iohub.sdk.model.instream.ReadJobHeartbeatResponse;
import com.ovopark.iohub.sdk.model.instream.ReadJobParseRequest;
import com.ovopark.iohub.sdk.model.instream.ReadJobParseResponse;
import com.ovopark.iohub.sdk.model.proto.DataExistStrategy;
import com.ovopark.iohub.sdk.model.proto.ImportPreDefConf;
import com.ovopark.iohub.sdk.model.proto.LimitLogger;
import com.ovopark.iohub.sdk.model.proto.Segment;
import com.ovopark.iohub.sdk.model.proto.internal.ImportPreDefConfImpl;
import com.ovopark.iohub.sdk.model.proto.internal.JobHint;
import com.ovopark.iohub.sdk.model.proto.internal.SegmentImpl;
import com.ovopark.kernel.shared.Config;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Util;
import com.ovopark.kernel.shared.concurrent.ListenerFuture;
import com.ovopark.kernel.shared.kv.KVEngine;
import com.ovopark.kernel.shared.stream.Stream;
import com.ovopark.module.shared.Session;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestTemplate;

class ReadJobImpl
implements ReadJob {
    private static final Logger log = LoggerFactory.getLogger(ReadJobImpl.class);
    final boolean renderJobTest;
    private final String uri;
    private final boolean nfs;
    final JobMeta jobMeta;
    final JobHint jobHint;
    private final IOHubClientConfig ioHubClientConfig;
    private final Client2ControlTransport client2ControlTransport;
    final ClientNode clientNode;
    final Long taskId;
    private static final ScheduledExecutorService jobHeartbeatExecutor = new ScheduledThreadPoolExecutor(Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 8), Config.ConfigPriority.option().getInt("IOHUB_READ_JOB_HEARTBEAT_IO", Integer.valueOf(0))), Util.newThreadFactory((String)"iohub_read_job_heartbeat_io"), new ThreadPoolExecutor.AbortPolicy());
    private static final ExecutorService jobFutureExecutor = new ThreadPoolExecutor(Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 64), Config.ConfigPriority.option().getInt("IOHUB_READ_JOB_FUTURE_IO", Integer.valueOf(0))), Math.max(Math.max(Runtime.getRuntime().availableProcessors() * 2, 64), Config.ConfigPriority.option().getInt("IOHUB_READ_JOB_FUTURE_IO", Integer.valueOf(0))), 600L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(Integer.MAX_VALUE), Util.newThreadFactory((String)"iohub_read_job_future_io"), new ThreadPoolExecutor.AbortPolicy());
    static final KVEngine.TtlFunc<String> readFutureTtl = KVEngine.newTtl((String)ReadJob.class.getName());
    static final RestTemplate restTemplate;

    public ReadJobImpl(Long taskId, String uri, boolean nfs, JobMeta jobMeta, JobHint jobHint, IOHubClientConfig ioHubClientConfig, Client2ControlTransport client2ControlTransport, ClientNode clientNode, boolean renderJobTest) {
        this.taskId = taskId;
        this.uri = uri;
        this.nfs = nfs;
        this.jobMeta = jobMeta;
        this.jobHint = jobHint;
        this.ioHubClientConfig = ioHubClientConfig;
        this.client2ControlTransport = client2ControlTransport;
        this.clientNode = clientNode;
        this.renderJobTest = renderJobTest;
    }

    public ReadJob.ReadFuture readExcel(String file, ReadJob.ReadDef readDef) {
        return this.read(file, readDef);
    }

    public ReadJob.ReadFuture read(String file, ReadJob.ReadDef readDef) {
        ReadJobAssignWorkRequest readJobAssignWorkRequest = new ReadJobAssignWorkRequest();
        readJobAssignWorkRequest.setTaskId(this.taskId.longValue());
        readJobAssignWorkRequest.setClientApp(this.clientNode.app());
        readJobAssignWorkRequest.setClientNode(this.clientNode.node());
        readJobAssignWorkRequest.setUri(this.uri);
        readJobAssignWorkRequest.setTest(this.renderJobTest);
        ReadJobAssignWorkResponse readJobAssignWorkResponse = this.client2ControlTransport.assignWorkReadJob(readJobAssignWorkRequest);
        log.info("readJobAssignWorkResponse: " + JSONAccessor.impl().format((Object)readJobAssignWorkResponse));
        if (readJobAssignWorkResponse == null || !readJobAssignWorkResponse.isSuccess() || readJobAssignWorkResponse.getWorker() == null) {
            throw new IllegalArgumentException("cannot assign work for read job: " + JSONAccessor.impl().format((Object)readJobAssignWorkResponse));
        }
        AppNode worker = readJobAssignWorkResponse.getWorker();
        Client2WorkRestClient client2WorkTransport = new Client2WorkRestClient(worker, restTemplate);
        ReadJobParseRequest readJobParseRequest = new ReadJobParseRequest();
        readJobParseRequest.setUri(this.uri);
        if (readDef != null) {
            JobMeta jm = ReadJobImpl.jobMeta0(readDef);
            readJobParseRequest.setJobMeta(jm);
            readJobParseRequest.setJobHint(new JobHint());
        } else {
            readJobParseRequest.setJobMeta(this.jobMeta);
            readJobParseRequest.setJobHint(this.jobHint);
        }
        readJobParseRequest.setClientNode(this.clientNode.node());
        readJobParseRequest.setClientApp(this.clientNode.app());
        readJobParseRequest.setWorkNode(worker.getNode());
        readJobParseRequest.setTaskId(this.taskId.longValue());
        ReadJobParseResponse readJobParseResponse = client2WorkTransport.parseReadJob(file, readJobParseRequest);
        log.info("response readJobParseResponse: " + JSONAccessor.impl().format((Object)readJobParseResponse));
        if (readJobParseResponse == null || !readJobParseResponse.isSuccess()) {
            throw new IllegalStateException("cannot push file to worker:");
        }
        ReadFutureImpl readFuture = new ReadFutureImpl(file, readDef);
        ReadFutureTask readFutureTask = new ReadFutureTask(this.taskId, worker, this.clientNode, readFuture);
        jobHeartbeatExecutor.schedule(Util.catchRunnable((Util.CatchRunnable)readFutureTask), 5L, TimeUnit.SECONDS);
        readFutureTtl.putIfAbsentAndGet((Comparable)((Object)String.valueOf(this.taskId)), k -> readFutureTask, 900L, TimeUnit.SECONDS);
        return readFuture;
    }

    private static JobMeta jobMeta0(ReadJob.ReadDef readDef) {
        JobMeta jm = new JobMeta();
        jm.setHeaderFromIndex(readDef.headerFromIndex());
        jm.setHeaderToIndex(readDef.headerToIndex());
        jm.setDataStartRowIndex(readDef.dataStartRowIndex());
        jm.setSheetIndexList(readDef.sheetIndexList());
        jm.setHeaderMaxColumnIndexList(readDef.headerMaxColumnIndexList());
        return jm;
    }

    private static JobInTaskFlow.DataRowStream<RequestParamBody> dataStream0(String file, ReadJob.ReadDef readDef, ImportPushDataRequest importPushDataRequest) {
        final RequestParamBodyImpl requestParamBody = new RequestParamBodyImpl();
        requestParamBody.setFile(file);
        requestParamBody.setReadDef((ReadJob.ReadDef)ReadJobImpl.jobMeta0(readDef));
        final ImportPreDefConfImpl importPreDefConf = new ImportPreDefConfImpl();
        importPreDefConf.setDataExistStrategy(DataExistStrategy.NONE);
        final AtomicBoolean cancelled = new AtomicBoolean(false);
        final JobInTaskFlow.ImportContext importContext = new JobInTaskFlow.ImportContext(){

            public void cancel() {
                cancelled.set(true);
            }

            public synchronized void replyWithSegment(int segment, int fromRow, int toRow, String desc, JobInTaskFlow.Result result) {
            }
        };
        JobInTaskFlow.ImportConf<RequestParamBody> importConf = new JobInTaskFlow.ImportConf<RequestParamBody>(){

            public RequestParamBody requestParam() {
                return requestParamBody;
            }

            public ImportPreDefConf importPreDefConf() {
                return importPreDefConf;
            }
        };
        final List segmentList = importPushDataRequest.getSegmentList();
        final int segmentCount = segmentList.size();
        int rowCount = 0;
        for (SegmentImpl segment : segmentList) {
            rowCount += segment.size();
        }
        final int finalRowCount = rowCount;
        final LimitLogger limitLogger = new LimitLogger(){

            public void log(String content) {
                log.info(content);
            }
        };
        JobInTaskFlow.DataRowStream<RequestParamBody> dataRowStream = new JobInTaskFlow.DataRowStream<RequestParamBody>(){
            final AtomicBoolean reset = new AtomicBoolean(true);
            final /* synthetic */ JobInTaskFlow.ImportConf val$importConf;
            {
                this.val$importConf = importConf;
            }

            public JobInTaskFlow.DataRowStream<RequestParamBody> reset() {
                cancelled.set(false);
                return this;
            }

            public void mark(JobInTaskFlow.StreamProcessResult streamProcessResult, String msg) {
            }

            public LimitLogger limitLogger() {
                return limitLogger;
            }

            public void subscribe(JobInTaskFlow.DataRowListener dataRowListener) {
                if (!this.reset.get()) {
                    throw new IllegalStateException("need reset stream before subscribe it.");
                }
                for (int segmentIndex = 0; segmentIndex < segmentList.size(); ++segmentIndex) {
                    if (cancelled.get()) {
                        Stream.from((Object)("cancel the import flow, segment: " + segmentIndex)).subscribe(s -> log.info(s));
                        return;
                    }
                    Segment segment = (Segment)segmentList.get(segmentIndex);
                    final HashMap headerCellMap = new HashMap();
                    segment.header().scanUp2Down(new Segment.Header.Scan(){

                        public void scan(Segment.Header.Cell cell) {
                            headerCellMap.put(cell.path(), cell);
                        }
                    });
                    JobInTaskFlow.HeaderLookup headerLookup = new JobInTaskFlow.HeaderLookup(){

                        public Segment.Header.Cell lookup(String path) {
                            return (Segment.Header.Cell)headerCellMap.get(path);
                        }
                    };
                    List mapList = segment.rowList();
                    for (int rowIndex = 0; rowIndex < mapList.size(); ++rowIndex) {
                        if (cancelled.get()) {
                            Stream.from((Object)("cancel the import flow, segment: " + segmentIndex)).subscribe(s -> log.info(s));
                            return;
                        }
                        Map data = (Map)mapList.get(rowIndex);
                        boolean allEmpty = true;
                        for (Object v : data.values()) {
                            if (!Util.isNotEmpty((CharSequence)((String)v))) continue;
                            allEmpty = false;
                            break;
                        }
                        if (allEmpty) continue;
                        JobInTaskFlow.DataRowImpl dataRow = new JobInTaskFlow.DataRowImpl();
                        dataRow.setSegment(segmentIndex);
                        dataRow.setRowNum(rowIndex);
                        dataRow.setData(data);
                        dataRow.setHeaderLookup(headerLookup);
                        dataRowListener.onRow((JobInTaskFlow.DataRow)dataRow, importContext);
                    }
                }
                this.reset.set(false);
            }

            public int segmentCount() {
                return segmentCount;
            }

            public long rowCount() {
                return finalRowCount;
            }

            public long rowCount(int segment) {
                return ((SegmentImpl)segmentList.get(segment)).size();
            }

            public Session session() {
                throw new UnsupportedOperationException("read job does not contains session");
            }

            public JobInTaskFlow.ImportConf<RequestParamBody> importConf() {
                return this.val$importConf;
            }
        };
        return dataRowStream;
    }

    static {
        readFutureTtl.subscribeTtl(t -> true, (getResult, l, l1) -> {
            String key = (String)((Object)getResult.key());
            log.info("onExpired: " + key);
            ReadFutureImpl renderFuture = (ReadFutureImpl)getResult.value();
            renderFuture.setDataRowStreamAndNotify(null, new TimeoutException("seconds: " + TimeUnit.MILLISECONDS.toSeconds(l1)));
        });
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        factory.setReadTimeout(45000);
        factory.setConnectTimeout(15000);
        restTemplate = new RestTemplate((ClientHttpRequestFactory)factory);
        restTemplate.getMessageConverters().add(0, new StringHttpMessageConverter(StandardCharsets.UTF_8));
    }

    static class ReadFutureImpl
    implements ReadJob.ReadFuture {
        private final ListenerFuture.ListenerFutureSetter<JobInTaskFlow.DataRowStream<?>> futureSetter = ListenerFuture.newFuture();
        final String file;
        final ReadJob.ReadDef readDef;

        public ReadFutureImpl(String file, ReadJob.ReadDef readDef) {
            this.file = file;
            this.readDef = readDef;
        }

        void setDataRowStreamAndNotify(JobInTaskFlow.DataRowStream<?> dataRowStream, Throwable t) {
            this.futureSetter.setValueAndNotify(dataRowStream, t);
        }

        void cancelAndNotify() {
            this.futureSetter.cancelAndNotify();
        }

        public JobInTaskFlow.DataRowStream<?> get() throws InterruptedException, ExecutionException {
            return (JobInTaskFlow.DataRowStream)this.futureSetter.get();
        }

        public JobInTaskFlow.DataRowStream<?> get(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException, ExecutionException {
            return (JobInTaskFlow.DataRowStream)this.futureSetter.get(timeout, timeUnit);
        }

        public boolean cancelled() {
            return this.futureSetter.isCancelled();
        }

        public void cancel() {
            this.futureSetter.cancel(true);
        }

        public void setReadListener(ReadJob.ReadListener readListener) {
            this.futureSetter.addListener((arg_0, arg_1) -> ((ReadJob.ReadListener)readListener).onRead(arg_0, arg_1));
        }
    }

    static class ReadFutureTask
    implements Util.CatchRunnable {
        final Long taskId;
        final AppNode worker;
        final ClientNode clientNode;
        final ReadFutureImpl renderFuture;
        final long startMs = System.currentTimeMillis();
        final Client2WorkTransport client2WorkTransport;
        int failCount;

        public ReadFutureTask(Long taskId, AppNode worker, ClientNode clientNode, ReadFutureImpl renderFuture) {
            this.taskId = taskId;
            this.worker = worker;
            this.clientNode = clientNode;
            this.renderFuture = renderFuture;
            SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
            factory.setReadTimeout(45000);
            factory.setConnectTimeout(15000);
            RestTemplate restTemplate = new RestTemplate((ClientHttpRequestFactory)factory);
            restTemplate.getMessageConverters().add(0, new StringHttpMessageConverter(StandardCharsets.UTF_8));
            this.client2WorkTransport = new Client2WorkRestClient(worker, restTemplate);
        }

        public void run() throws Exception {
            if (this.renderFuture.cancelled()) {
                log.info("cancel job render by client: " + this.taskId);
                return;
            }
            ReadJobHeartbeatRequest readJobHeartbeatRequest = new ReadJobHeartbeatRequest();
            readJobHeartbeatRequest.setTaskId(this.taskId.longValue());
            readJobHeartbeatRequest.setClientApp(this.clientNode.app());
            readJobHeartbeatRequest.setClientNode(this.clientNode.node());
            readJobHeartbeatRequest.setWorkNode(this.worker.getNode());
            ReadJobHeartbeatResponse renderJobHeartbeatResponse = this.client2WorkTransport.heartbeatReadJob(readJobHeartbeatRequest);
            if (renderJobHeartbeatResponse == null || !renderJobHeartbeatResponse.isSuccess()) {
                log.info("response job render heartbeat: " + JSONAccessor.impl().format((Object)renderJobHeartbeatResponse));
            }
            if (renderJobHeartbeatResponse == null || !renderJobHeartbeatResponse.isSuccess()) {
                if (this.failCount++ > 5) {
                    readFutureTtl.delete((Comparable)((Object)String.valueOf(this.taskId)));
                    this.renderFuture.setDataRowStreamAndNotify(null, new RuntimeException("render server error: " + (renderJobHeartbeatResponse == null ? "" : renderJobHeartbeatResponse.getDesc())));
                    return;
                }
                jobHeartbeatExecutor.schedule(Util.catchRunnable((Util.CatchRunnable)this), 5L, TimeUnit.SECONDS);
                return;
            }
            if (renderJobHeartbeatResponse.isReading()) {
                jobHeartbeatExecutor.schedule(Util.catchRunnable((Util.CatchRunnable)this), 5L, TimeUnit.SECONDS);
                return;
            }
            if (renderJobHeartbeatResponse.isReadCancelled()) {
                log.info("render job ( " + this.taskId + " ) was cancelled by work: " + JSONAccessor.impl().format((Object)renderJobHeartbeatResponse));
                readFutureTtl.delete((Comparable)((Object)String.valueOf(this.taskId)));
                this.renderFuture.cancelAndNotify();
                return;
            }
            if (renderJobHeartbeatResponse.isReadError()) {
                readFutureTtl.delete((Comparable)((Object)String.valueOf(this.taskId)));
                this.renderFuture.setDataRowStreamAndNotify(null, new RuntimeException(renderJobHeartbeatResponse.getReadErrorDesc()));
                return;
            }
            if (renderJobHeartbeatResponse.isReadCompleted()) {
                readFutureTtl.delete((Comparable)((Object)String.valueOf(this.taskId)));
                ImportPushDataRequest importPushDataRequest = renderJobHeartbeatResponse.getImportPushDataRequest();
                if (importPushDataRequest == null) {
                    this.renderFuture.setDataRowStreamAndNotify(null, new RuntimeException("render server error"));
                    return;
                }
                this.renderFuture.setDataRowStreamAndNotify(ReadJobImpl.dataStream0(this.renderFuture.file, this.renderFuture.readDef, renderJobHeartbeatResponse.getImportPushDataRequest()), null);
                return;
            }
            throw new RuntimeException("unreachable code , error???");
        }
    }

    private static class RequestParamBodyImpl
    implements RequestParamBody {
        private String file;
        private ReadJob.ReadDef readDef;

        public String getFile() {
            return this.file;
        }

        public ReadJob.ReadDef getReadDef() {
            return this.readDef;
        }

        public void setFile(String file) {
            this.file = file;
        }

        public void setReadDef(ReadJob.ReadDef readDef) {
            this.readDef = readDef;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof RequestParamBodyImpl)) {
                return false;
            }
            RequestParamBodyImpl other = (RequestParamBodyImpl)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$file = this.getFile();
            String other$file = other.getFile();
            if (this$file == null ? other$file != null : !this$file.equals(other$file)) {
                return false;
            }
            ReadJob.ReadDef this$readDef = this.getReadDef();
            ReadJob.ReadDef other$readDef = other.getReadDef();
            return !(this$readDef == null ? other$readDef != null : !this$readDef.equals(other$readDef));
        }

        protected boolean canEqual(Object other) {
            return other instanceof RequestParamBodyImpl;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $file = this.getFile();
            result = result * 59 + ($file == null ? 43 : $file.hashCode());
            ReadJob.ReadDef $readDef = this.getReadDef();
            result = result * 59 + ($readDef == null ? 43 : $readDef.hashCode());
            return result;
        }

        public String toString() {
            return "ReadJobImpl.RequestParamBodyImpl(file=" + this.getFile() + ", readDef=" + this.getReadDef() + ")";
        }
    }
}

