/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.messagehub.sdk.job;

import com.ovopark.kernel.shared.CachedExecutors;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Util;
import com.ovopark.kernel.shared.kv.CacheService;
import com.ovopark.kernel.shared.vclient.ClientNode;
import com.ovopark.messagehub.sdk.internal.MessageHubJobApi;
import com.ovopark.messagehub.sdk.job.DelayTaskExecutor;
import com.ovopark.messagehub.sdk.job.JobClientActive;
import com.ovopark.messagehub.sdk.job.JobLogImpl;
import com.ovopark.messagehub.sdk.model.internal.job.TaskLockRequest;
import com.ovopark.messagehub.sdk.model.internal.job.TaskLockResponse;
import com.ovopark.messagehub.sdk.model.internal.job.TaskSubmitRequest;
import com.ovopark.messagehub.sdk.model.internal.job.TaskSubmitResponse;
import com.ovopark.module.shared.BaseResult;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@JobClientActive
@RestController(value="com.ovopark.messagehub.sdk.job.JobEndpoint")
@RequestMapping(value={"/feign/messagehub-job/processing"})
public class JobEndpoint {
    private static final Logger log = LoggerFactory.getLogger(JobEndpoint.class);
    @Autowired
    private DelayTaskExecutor delayTaskExecutor;
    @Autowired
    private MessageHubJobApi messageHubJobApi;
    private static final ExecutorService jobExecutor = new ThreadPoolExecutor(Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 64), Integer.parseInt(System.getProperty("MESSAGEHUB_JOB_IO", "0"))), Math.max(Math.max(Runtime.getRuntime().availableProcessors() * 2, 1024), Integer.parseInt(System.getProperty("MESSAGEHUB_JOB_IO", "0"))), 600L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), Util.newThreadFactory((String)"messagehub-job-io"), new ThreadPoolExecutor.AbortPolicy());
    private static final CacheService<String, TaskSubmitRequest> C = new CacheService.MapCacheService(true, CachedExecutors.impl((String)"messagehub-job-io-c", (int)1, (int)1));
    private final Set<Long> executingCronTaskIdSet = ConcurrentHashMap.newKeySet();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @RequestMapping(value={"/submit"})
    @ResponseBody
    public BaseResult<TaskSubmitResponse> submit(final @RequestBody TaskSubmitRequest taskSubmitRequest) {
        final String taskKey = "task_" + taskSubmitRequest.getCronTaskId() + "_" + taskSubmitRequest.getCronTaskHistoryId();
        MDC.put((String)"traceId", (String)taskKey);
        MDC.put((String)"requestId", (String)taskKey);
        log.info("receive a new task, we can do it???: " + JSONAccessor.impl().format((Object)taskSubmitRequest));
        final TaskSubmitResponse taskSubmitResponse = new TaskSubmitResponse();
        taskSubmitResponse.setSubmitted(false);
        try {
            Util.lock((Comparable)((Object)("task:" + taskSubmitRequest.getCronTaskId())), (Callable)new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    C.putIfAbsentAndGet((Comparable)((Object)taskKey), (Supplier)new Supplier<TaskSubmitRequest>(){

                        @Override
                        public TaskSubmitRequest get() {
                            if (JobEndpoint.this.executingCronTaskIdSet.contains(taskSubmitRequest.getCronTaskId())) {
                                taskSubmitResponse.setSubmitted(false);
                                taskSubmitResponse.setDesc("Rejected,the previous task is executing");
                                throw new IllegalStateException("the previous task is executing, reject the task: " + JSONAccessor.impl().format((Object)taskSubmitRequest));
                            }
                            TaskLockRequest taskLockRequest = new TaskLockRequest();
                            taskLockRequest.setCronTaskId(taskSubmitRequest.getCronTaskId());
                            taskLockRequest.setCronTaskHistoryId(taskSubmitRequest.getCronTaskHistoryId());
                            taskLockRequest.setNode(ClientNode.UUID_STR);
                            BaseResult baseResult = JobEndpoint.this.messageHubJobApi.lockClient(taskLockRequest);
                            if (baseResult == null || baseResult.getIsError().booleanValue()) {
                                throw new IllegalStateException("cannot get lock: " + JSONAccessor.impl().format((Object)taskLockRequest));
                            }
                            TaskLockResponse taskLockResponse = (TaskLockResponse)baseResult.getData();
                            if (taskLockResponse == null || !taskLockResponse.isSuccess()) {
                                throw new IllegalStateException("cannot get lock: " + JSONAccessor.impl().format((Object)taskLockRequest));
                            }
                            JobEndpoint.this.executingCronTaskIdSet.add(taskSubmitRequest.getCronTaskId());
                            try {
                                jobExecutor.execute(Util.catchRunnable(() -> {
                                    JobLogImpl jobLog = new JobLogImpl(JobEndpoint.this.messageHubJobApi, taskKey, taskSubmitRequest.getCronTaskHistoryId());
                                    try {
                                        JobEndpoint.this.delayTaskExecutor.execute(taskSubmitRequest.getUri(), taskSubmitRequest.getArgs(), jobLog);
                                    }
                                    finally {
                                        JobEndpoint.this.executingCronTaskIdSet.remove(taskLockRequest.getCronTaskId());
                                        try {
                                            jobLog.log(taskKey + " > completed: " + Util.formatTime((LocalDateTime)LocalDateTime.now(), (String[])new String[0]));
                                            jobLog.flush();
                                        }
                                        catch (Exception e) {
                                            log.error(e.getMessage(), (Throwable)e);
                                        }
                                    }
                                }));
                                log.info("OK, the task is scheduled: " + taskKey);
                                taskSubmitResponse.setSubmitted(true);
                            }
                            catch (RejectedExecutionException e) {
                                JobEndpoint.this.executingCronTaskIdSet.remove(taskLockRequest.getCronTaskId());
                                taskSubmitResponse.setSubmitted(false);
                                taskSubmitResponse.setDesc("Rejected, resource is unavailable ");
                                throw Util.convert2RuntimeException((Throwable)e);
                            }
                            return taskSubmitRequest;
                        }
                    }, 600L, TimeUnit.SECONDS);
                    return null;
                }
            }, (long)10L, (TimeUnit)TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        finally {
            MDC.remove((String)"traceId");
            MDC.remove((String)"requestId");
        }
        return BaseResult.success((Object)taskSubmitResponse);
    }
}

