package com.ovopark.log.flume.sink;

import com.google.common.base.Optional;
import com.ovopark.log.flume.consts.FlumeConst;
import com.ovopark.log.flume.util.ThreadPoolUtil;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.conf.LogPrivacyUtil;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
import org.apache.flume.shared.kafka.KafkaSSLUtil;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ovopark/log/flume/sink/LogKafkaSink.class */
public class LogKafkaSink extends AbstractSink implements Configurable, BatchSizeSupported, KafkaTemplate {
    private KafkaProducer<String, byte[]> producer;
    private String retention;
    private String topic;
    private int batchSize;
    private List<Future<RecordMetadata>> kafkaFutures;
    private KafkaSinkCounter counter;
    private boolean useAvroEventFormat;
    private boolean allowTopicOverride;
    private static final Logger logger = LoggerFactory.getLogger(LogKafkaSink.class);
    private static final Event[] LAST_EVENT = new Event[1];
    private static final BlockingQueue<Object> listenExecuteBell = new ArrayBlockingQueue(1);
    private static final Object bellItem = new Object();
    private static final ThreadPoolExecutor POOL = ThreadPoolUtil.getPool(2, 2, 10);
    private final Properties kafkaProps = new Properties();
    private String markedIndex = "0";
    private String topicHeader = null;
    private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = Optional.absent();
    private final Optional<SpecificDatumReader<AvroFlumeEvent>> reader = Optional.absent();
    private Optional<ByteArrayOutputStream> tempOutStream = Optional.absent();
    private BinaryEncoder encoder = null;

    private void waitToBell() {
        try {
            if (null == listenExecuteBell.poll(1L, TimeUnit.SECONDS) && LAST_EVENT[0] != null) {
                goCommit(LAST_EVENT[0]);
            }
        } catch (Exception e) {
        }
    }

    private void putTheBell() {
        listenExecuteBell.offer(bellItem);
    }

    @Override // com.ovopark.log.flume.sink.KafkaTemplate
    public List<Future<RecordMetadata>> getKafkaFutures() {
        return this.kafkaFutures;
    }

    @Override // com.ovopark.log.flume.sink.KafkaTemplate
    public long getBatchSize() {
        return this.batchSize;
    }

    @Override // com.ovopark.log.flume.sink.KafkaTemplate
    public KafkaProducer<String, byte[]> getProducer() {
        return this.producer;
    }

    @Override // com.ovopark.log.flume.sink.KafkaTemplate
    public KafkaSinkCounter getKafkaSinkCounter() {
        return this.counter;
    }

    @Override // com.ovopark.log.flume.sink.KafkaTemplate
    public Future<RecordMetadata> toFuture(Event event, Long l) throws IOException {
        String str;
        byte[] body = event.getBody();
        Map headers = event.getHeaders();
        if (this.allowTopicOverride) {
            str = (String) headers.get(this.topicHeader);
            if (str == null) {
                str = BucketPath.escapeString(this.topic, event.getHeaders());
                logger.debug("{} was set to true but header {} was null. Producing to {} topic instead.", new Object[]{"allowTopicOverride", this.topicHeader, str});
            }
        } else {
            str = this.topic;
        }
        String str2 = (String) headers.get("key");
        if (logger.isTraceEnabled()) {
            if (LogPrivacyUtil.allowLogRawData()) {
                logger.trace("{Event} " + str + " : " + str2 + " : " + new String(body, StandardCharsets.UTF_8));
            } else {
                logger.trace("{Event} " + str + " : " + str2);
            }
        }
        return this.producer.send(new ProducerRecord(str, str2, serializeEvent(event, this.useAvroEventFormat)), new SinkCallback(Long.valueOf(null == l ? System.currentTimeMillis() : l.longValue()).longValue()));
    }

    @Override // com.ovopark.log.flume.sink.KafkaTemplate
    public void coreProcess(Event event, boolean z) throws EventDeliveryException {
        Map headers = event.getHeaders();
        long currentTimeMillis = System.currentTimeMillis();
        try {
            String str = (String) headers.get(FlumeConst.MARKED_INDEX);
            if (Objects.equals(str, this.markedIndex)) {
                LAST_EVENT[0] = event;
                return;
            }
            Event event2 = LAST_EVENT[0];
            LAST_EVENT[0] = event;
            this.markedIndex = str;
            event2.getHeaders().remove(FlumeConst.MARKED_INDEX);
            this.kafkaFutures.add(toFuture(event2, Long.valueOf(currentTimeMillis)));
        } catch (NumberFormatException e) {
            throw new EventDeliveryException("Non integer partition id specified", e);
        } catch (Exception e2) {
            throw new EventDeliveryException("Could not send event", e2);
        }
    }

    public Sink.Status process() throws EventDeliveryException {
        Sink.Status readyToCommit = readyToCommit();
        CompletableFuture.runAsync(() -> {
            waitToBell();
            putTheBell();
        }, POOL);
        return readyToCommit;
    }

    private void goCommit(Event event) {
        try {
            doCommit(event);
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    public synchronized void start() {
        AdminClient create = AdminClient.create(this.kafkaProps);
        Throwable th = null;
        try {
            try {
                create.createTopics(Collections.singletonList(new NewTopic(this.topic, 1, (short) 1).configs(Collections.singletonMap("retention.ms", this.retention)))).all().get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            this.producer = new KafkaProducer<>(this.kafkaProps);
            this.counter.start();
            super.start();
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    public synchronized void stop() {
        this.producer.close();
        this.counter.stop();
        logger.info("Kafka Sink {} stopped. Metrics: {}", getName(), this.counter);
        super.stop();
    }

    public static boolean isNum(String str) {
        try {
            Integer.parseInt(str);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    public void configure(Context context) {
        translateOldProps(context);
        String string = context.getString("kafka.topic");
        if (string == null || string.isEmpty()) {
            string = "default-flume-topic";
            logger.warn("Topic was not specified. Using {} as the topic.", string);
        } else {
            logger.info("Using the static topic {}. This may be overridden by event headers", string);
        }
        this.topic = string;
        this.retention = context.getString("retention");
        if (!isNum(this.retention)) {
            this.retention = "600000";
        }
        this.batchSize = context.getInteger("flumeBatchSize", 100).intValue();
        if (logger.isDebugEnabled()) {
            logger.debug("Using batch size: {}", Integer.valueOf(this.batchSize));
        }
        this.useAvroEventFormat = context.getBoolean("useFlumeEventFormat", false).booleanValue();
        this.allowTopicOverride = context.getBoolean("allowTopicOverride", true).booleanValue();
        this.topicHeader = context.getString("topicHeader", "topic");
        if (logger.isDebugEnabled()) {
            logger.debug("useFlumeEventFormat set to: {}", Boolean.valueOf(this.useAvroEventFormat));
        }
        this.kafkaFutures = new LinkedList();
        String string2 = context.getString("kafka.bootstrap.servers");
        if (string2 == null || string2.isEmpty()) {
            throw new ConfigurationException("Bootstrap Servers must be specified");
        }
        setProducerProps(context, string2);
        if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
            logger.debug("Kafka producer properties: {}", this.kafkaProps);
        }
        if (this.counter == null) {
            this.counter = new KafkaSinkCounter(getName());
        }
    }

    private void translateOldProps(Context context) {
        String string;
        String string2;
        if (!context.containsKey("kafka.topic")) {
            context.put("kafka.topic", context.getString("topic"));
            logger.warn("{} is deprecated. Please use the parameter {}", "topic", "kafka.topic");
        }
        if (!context.containsKey("kafka.bootstrap.servers")) {
            String string3 = context.getString("brokerList");
            if (string3 == null || string3.isEmpty()) {
                throw new ConfigurationException("Bootstrap Servers must be specified");
            }
            context.put("kafka.bootstrap.servers", string3);
            logger.warn("{} is deprecated. Please use the parameter {}", "brokerList", "kafka.bootstrap.servers");
        }
        if (!context.containsKey("flumeBatchSize") && (string2 = context.getString("batchSize")) != null && !string2.isEmpty()) {
            context.put("flumeBatchSize", string2);
            logger.warn("{} is deprecated. Please use the parameter {}", "batchSize", "flumeBatchSize");
        }
        if (!context.containsKey("kafka.producer.acks") && (string = context.getString("requiredAcks")) != null && !string.isEmpty()) {
            context.put("kafka.producer.acks", string);
            logger.warn("{} is deprecated. Please use the parameter {}", "requiredAcks", "kafka.producer.acks");
        }
        if (context.containsKey("key.serializer.class")) {
            logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements a different interface for serializers. Please use the parameter {}", "key.serializer.class", "kafka.producer.key.serializer");
        }
        if (context.containsKey("serializer.class")) {
            logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements a different interface for serializers. Please use the parameter {}", "serializer.class", "kafka.producer.value.serializer");
        }
    }

    private void setProducerProps(Context context, String str) {
        this.kafkaProps.clear();
        this.kafkaProps.put("acks", "1");
        this.kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.kafkaProps.putAll(context.getSubProperties("kafka.producer."));
        this.kafkaProps.put("bootstrap.servers", str);
        KafkaSSLUtil.addGlobalSSLParameters(this.kafkaProps);
    }

    protected Properties getKafkaProps() {
        return this.kafkaProps;
    }

    private byte[] serializeEvent(Event event, boolean z) throws IOException {
        byte[] body;
        if (z) {
            if (!this.tempOutStream.isPresent()) {
                this.tempOutStream = Optional.of(new ByteArrayOutputStream());
            }
            if (!this.writer.isPresent()) {
                this.writer = Optional.of(new SpecificDatumWriter(AvroFlumeEvent.class));
            }
            ((ByteArrayOutputStream) this.tempOutStream.get()).reset();
            AvroFlumeEvent avroFlumeEvent = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody()));
            this.encoder = EncoderFactory.get().directBinaryEncoder((OutputStream) this.tempOutStream.get(), this.encoder);
            ((SpecificDatumWriter) this.writer.get()).write(avroFlumeEvent, this.encoder);
            this.encoder.flush();
            body = ((ByteArrayOutputStream) this.tempOutStream.get()).toByteArray();
        } else {
            body = event.getBody();
        }
        return body;
    }

    private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue());
        }
        return hashMap;
    }

    static {
        listenExecuteBell.offer(bellItem);
    }
}
