/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.jobhub.sdk.client.kafka;

import com.ovopark.jobhub.sdk.client.Client2ControlTransport;
import com.ovopark.jobhub.sdk.client.DelayTaskExecutor;
import com.ovopark.jobhub.sdk.client.JobClientActive;
import com.ovopark.jobhub.sdk.client.JobContext;
import com.ovopark.jobhub.sdk.client.JobEndpoint;
import com.ovopark.jobhub.sdk.client.JobLog;
import com.ovopark.jobhub.sdk.client.JobLogImpl;
import com.ovopark.jobhub.sdk.client.JobService;
import com.ovopark.jobhub.sdk.model.internal.TaskSubmitRequest;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.ShutdownManager;
import com.ovopark.kernel.shared.Util;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;

@JobClientActive
@Component
public class JobListenerRunnerViaKafkaProvider
implements JobService.JobListenerKafkaRunnerProvider,
CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(JobListenerRunnerViaKafkaProvider.class);
    @Autowired
    private DelayTaskExecutor delayTaskExecutor;
    @Autowired
    private Client2ControlTransport client2ControlTransport;
    @Autowired
    private JobService jobService;
    @Autowired
    ConsumerFactory<String, String> consumerFactory;
    private final Map<String, ConcurrentMessageListenerContainer<String, String>> containers = new ConcurrentHashMap<String, ConcurrentMessageListenerContainer<String, String>>();

    public String name() {
        return "kafka";
    }

    public boolean start(final String topic, String beanUrl, final JobService.JobListenerRunnerKafkaConfig jobListenerRunnerKafkaConfig) {
        final String containerId = beanUrl;
        ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer)Util.lock((Comparable)((Object)containerId), (Callable)new Callable<ConcurrentMessageListenerContainer<String, String>>(){

            @Override
            public ConcurrentMessageListenerContainer<String, String> call() throws Exception {
                ConcurrentMessageListenerContainer<String, String> listenerContainer = JobListenerRunnerViaKafkaProvider.this.containers.get(containerId);
                if (listenerContainer != null) {
                    return listenerContainer;
                }
                String groupId = Util.isEmpty((CharSequence)jobListenerRunnerKafkaConfig.getConsumerGroup()) ? containerId : jobListenerRunnerKafkaConfig.getConsumerGroup();
                ContainerProperties containerProps = new ContainerProperties(new String[]{topic});
                containerProps.setGroupId(groupId);
                if (Util.isNotEmpty((CharSequence)jobListenerRunnerKafkaConfig.getOffsetReset())) {
                    containerProps.getKafkaConsumerProperties().put("auto.offset.reset", jobListenerRunnerKafkaConfig.getOffsetReset());
                }
                containerProps.setMessageListener(record -> {
                    try {
                        String value = (String)record.value();
                        TaskSubmitRequest taskSubmitRequest = (TaskSubmitRequest)JSONAccessor.impl().read(value, TaskSubmitRequest.class);
                        String taskKey = "task4kafka_" + taskSubmitRequest.getCronTaskId() + "_" + taskSubmitRequest.getCronTaskHistoryId();
                        MDC.put((String)"traceId", (String)taskKey);
                        MDC.put((String)"requestId", (String)taskKey);
                        JobLogImpl jobLog = new JobLogImpl(JobListenerRunnerViaKafkaProvider.this.client2ControlTransport, taskKey, taskSubmitRequest.getCronTaskHistoryId());
                        try {
                            JobListenerRunnerViaKafkaProvider.this.delayTaskExecutor.execute((JobContext)new JobEndpoint.JobContextImpl(taskSubmitRequest, JobListenerRunnerViaKafkaProvider.this.jobService), (JobLog)jobLog);
                        }
                        catch (Exception e) {
                            log.error(e.getMessage(), (Throwable)e);
                            jobLog.flush();
                            Util.errorStackList((Throwable)e).forEach(arg_0 -> ((JobLog)jobLog).log(arg_0));
                            jobLog.flush();
                        }
                        finally {
                            try {
                                Util.logLink((String)(taskKey + " > completed: " + Util.formatTime((LocalDateTime)LocalDateTime.now(), (String[])new String[0]))).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobLog)jobLog).log(arg_0));
                                jobLog.flush();
                                jobLog.close();
                            }
                            catch (Exception e) {
                                log.error(e.getMessage(), (Throwable)e);
                            }
                        }
                    }
                    catch (Exception e) {
                        log.error("Error processing record from topic: {}", (Object)topic, (Object)e);
                    }
                    finally {
                        MDC.remove((String)"traceId");
                        MDC.remove((String)"requestId");
                    }
                });
                ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(JobListenerRunnerViaKafkaProvider.this.consumerFactory, containerProps);
                container.setConcurrency(Math.max(jobListenerRunnerKafkaConfig.getConcurrency(), 1));
                container.setApplicationEventPublisher(event -> {});
                container.start();
                JobListenerRunnerViaKafkaProvider.this.containers.put(containerId, (ConcurrentMessageListenerContainer<String, String>)container);
                log.info("Started dynamic consumer for topic: " + topic + ", group: " + groupId + ", concurrency: " + container.getConcurrency());
                return container;
            }
        });
        return container != null;
    }

    public void close() {
        for (ConcurrentMessageListenerContainer<String, String> container : this.containers.values()) {
            try {
                container.stop();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    public void run(String ... args) throws Exception {
        ShutdownManager.getOrCreate().register(JobListenerRunnerViaKafkaProvider.class.getName(), new Util.CatchRunnable(){

            public void run() throws Exception {
                JobListenerRunnerViaKafkaProvider.this.close();
            }
        });
        log.info("register shutdown hook, jobhub kafka listener");
    }
}

