/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.jobhub.sdk.client.kafka;

import com.ovopark.jobhub.sdk.client.Client2ControlTransport;
import com.ovopark.jobhub.sdk.client.DelayTaskExecutor;
import com.ovopark.jobhub.sdk.client.JobClientActive;
import com.ovopark.jobhub.sdk.client.JobContext;
import com.ovopark.jobhub.sdk.client.JobEndpoint;
import com.ovopark.jobhub.sdk.client.JobInnerContext;
import com.ovopark.jobhub.sdk.client.JobInnerContextMgr;
import com.ovopark.jobhub.sdk.client.JobLog;
import com.ovopark.jobhub.sdk.client.JobLogImpl;
import com.ovopark.jobhub.sdk.client.JobService;
import com.ovopark.jobhub.sdk.model.internal.TaskLockRequest;
import com.ovopark.jobhub.sdk.model.internal.TaskLockResponse;
import com.ovopark.jobhub.sdk.model.internal.TaskSubmitRequest;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.ShutdownManager;
import com.ovopark.kernel.shared.Util;
import com.ovopark.kernel.shared.vclient.ClientNode;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@JobClientActive
@Component
public class JobListenerRunnerViaKafkaProvider
implements JobService.JobListenerKafkaRunnerProvider,
CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(JobListenerRunnerViaKafkaProvider.class);
    @Autowired
    private DelayTaskExecutor delayTaskExecutor;
    @Autowired
    private Client2ControlTransport client2ControlTransport;
    @Autowired
    private JobService jobService;
    @Qualifier(value="com.ovopark.jobhub.sdk.client.kafka.KafkaConfig.consumerFactory")
    @Autowired
    ConsumerFactory<String, String> consumerFactory;
    private final Map<String, ConcurrentMessageListenerContainer<String, String>> containers = new ConcurrentHashMap<String, ConcurrentMessageListenerContainer<String, String>>();
    @Autowired
    JobInnerContextMgr jobInnerContextMgr;

    public String name() {
        return "kafka";
    }

    public boolean start(final String topic, String beanUrl, final JobService.JobListenerRunnerKafkaConfig jobListenerRunnerKafkaConfig) {
        final String containerId = beanUrl;
        ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer)Util.lock((Comparable)((Object)containerId), (Callable)new Callable<ConcurrentMessageListenerContainer<String, String>>(){

            @Override
            public ConcurrentMessageListenerContainer<String, String> call() throws Exception {
                int pollIntervalTimeSec;
                ConcurrentMessageListenerContainer<String, String> listenerContainer = JobListenerRunnerViaKafkaProvider.this.containers.get(containerId);
                if (listenerContainer != null) {
                    return listenerContainer;
                }
                String groupId = Util.isEmpty((CharSequence)jobListenerRunnerKafkaConfig.getConsumerGroup()) ? containerId : jobListenerRunnerKafkaConfig.getConsumerGroup();
                ContainerProperties containerProps = new ContainerProperties(new String[]{topic});
                containerProps.setGroupId(groupId);
                if (Util.isNotEmpty((CharSequence)jobListenerRunnerKafkaConfig.getOffsetReset())) {
                    containerProps.getKafkaConsumerProperties().put("auto.offset.reset", jobListenerRunnerKafkaConfig.getOffsetReset());
                }
                final boolean autoCommit = jobListenerRunnerKafkaConfig.isAutCommit();
                containerProps.getKafkaConsumerProperties().put("enable.auto.commit", (Object)autoCommit);
                if (!autoCommit) {
                    containerProps.setAckMode(ContainerProperties.AckMode.MANUAL);
                }
                if ((pollIntervalTimeSec = jobListenerRunnerKafkaConfig.getPollIntervalTimeSec()) > 0) {
                    containerProps.getKafkaConsumerProperties().put("max.poll.interval.ms", (Object)(pollIntervalTimeSec * 1000));
                }
                containerProps.getKafkaConsumerProperties().put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
                containerProps.setMessageListener((Object)new AcknowledgingMessageListener<String, String>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
                        String value = (String)record.value();
                        TaskSubmitRequest taskSubmitRequest = (TaskSubmitRequest)JSONAccessor.impl().read(value, TaskSubmitRequest.class);
                        long cronTaskId = taskSubmitRequest.getCronTaskId();
                        long cronTaskHistoryId = taskSubmitRequest.getCronTaskHistoryId();
                        String taskKey = "task4kafka_" + JobListenerRunnerViaKafkaProvider.this.jobInnerContextMgr.key(cronTaskId, cronTaskHistoryId);
                        MDC.put((String)"traceId", (String)taskKey);
                        MDC.put((String)"requestId", (String)taskKey);
                        JobLogImpl jobLog = new JobLogImpl(JobListenerRunnerViaKafkaProvider.this.client2ControlTransport, taskKey, cronTaskHistoryId);
                        try {
                            block31: {
                                TaskLockRequest taskLockRequest;
                                block30: {
                                    taskLockRequest = new TaskLockRequest();
                                    taskLockRequest.setCronTaskId(cronTaskId);
                                    taskLockRequest.setCronTaskHistoryId(cronTaskHistoryId);
                                    taskLockRequest.setNode(ClientNode.UUID_STR);
                                    TaskLockResponse taskLockResponse = JobListenerRunnerViaKafkaProvider.this.client2ControlTransport.lockClient(taskLockRequest);
                                    if (taskLockResponse == null) break block30;
                                    if (taskLockResponse.isSuccess()) break block31;
                                }
                                Util.logLink((String)("cannot get lock: " + JSONAccessor.impl().format((Object)taskLockRequest))).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobLog)jobLog).log(arg_0));
                                return;
                            }
                            JobEndpoint.JobContextImpl jobContext = new JobEndpoint.JobContextImpl(taskSubmitRequest, JobListenerRunnerViaKafkaProvider.this.jobService);
                            jobContext.setCronTaskId(cronTaskId);
                            jobContext.setCronTaskHistoryId(cronTaskHistoryId);
                            jobContext.setIoThread(Thread.currentThread());
                            JobListenerRunnerViaKafkaProvider.this.jobInnerContextMgr.add(cronTaskId, cronTaskHistoryId, (JobInnerContext)jobContext);
                            try {
                                JobListenerRunnerViaKafkaProvider.this.delayTaskExecutor.execute((JobContext)jobContext, (JobLog)jobLog);
                            }
                            catch (Exception e) {
                                log.error(e.getMessage(), (Throwable)e);
                                jobLog.flush();
                                Util.errorStackList((Throwable)e).forEach(arg_0 -> ((JobLog)jobLog).log(arg_0));
                                jobLog.flush();
                            }
                            finally {
                                JobListenerRunnerViaKafkaProvider.this.jobInnerContextMgr.remove(cronTaskId, cronTaskHistoryId);
                                try {
                                    Util.logLink((String)(taskKey + " > completed: " + Util.formatTime((LocalDateTime)LocalDateTime.now(), (String[])new String[0]))).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobLog)jobLog).log(arg_0));
                                    jobLog.flush();
                                }
                                catch (Exception e) {
                                    log.error(e.getMessage(), (Throwable)e);
                                }
                            }
                        }
                        catch (Exception e) {
                            log.error("Error processing record from topic: {}", (Object)topic, (Object)e);
                            jobLog.flush();
                            Util.errorStackList((Throwable)e).forEach(arg_0 -> ((JobLog)jobLog).log(arg_0));
                            jobLog.flush();
                        }
                        finally {
                            try {
                                jobLog.flush(true);
                            }
                            catch (Exception e) {
                                log.error(e.getMessage(), (Throwable)e);
                            }
                            if (!autoCommit) {
                                acknowledgment.acknowledge();
                            }
                            MDC.remove((String)"traceId");
                            MDC.remove((String)"requestId");
                        }
                    }
                });
                ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(JobListenerRunnerViaKafkaProvider.this.consumerFactory, containerProps);
                container.setConcurrency(Math.max(jobListenerRunnerKafkaConfig.getConcurrency(), 1));
                container.setApplicationEventPublisher(event -> {});
                container.start();
                JobListenerRunnerViaKafkaProvider.this.containers.put(containerId, (ConcurrentMessageListenerContainer<String, String>)container);
                log.info("Started dynamic consumer for topic: " + topic + ", group: " + groupId + ", concurrency: " + container.getConcurrency());
                return container;
            }
        });
        return container != null;
    }

    public void close() {
        for (ConcurrentMessageListenerContainer<String, String> container : this.containers.values()) {
            try {
                container.stop();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    public void run(String ... args) throws Exception {
        ShutdownManager.getOrCreate().register(JobListenerRunnerViaKafkaProvider.class.getName(), new Util.CatchRunnable(){

            public void run() throws Exception {
                JobListenerRunnerViaKafkaProvider.this.close();
            }
        });
        log.info("register shutdown hook, jobhub kafka listener");
    }
}

