package com.ovopark.log.flume.sink;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ovopark/log/flume/sink/KafkaTemplate.class */
public interface KafkaTemplate {
    public static final Logger logger = LoggerFactory.getLogger(LogKafkaSink.class);
    public static final BlockingQueue<CommitBean> COMMIT_BEANS_ARRAY = new ArrayBlockingQueue(1);
    public static final LinkedList<Event> EVENT_CACHE = new LinkedList<>();

    List<Future<RecordMetadata>> getKafkaFutures();

    long getBatchSize();

    KafkaProducer<String, byte[]> getProducer();

    KafkaSinkCounter getKafkaSinkCounter();

    void coreProcess(Event event, boolean z) throws EventDeliveryException;

    Channel getChannel();

    Future<RecordMetadata> toFuture(Event event, Long l) throws IOException;

    default Sink.Status readyToCommit() throws EventDeliveryException {
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = null;
        try {
            transaction = channel.getTransaction();
            transaction.begin();
            long nanoTime = System.nanoTime();
            long batchSize = getBatchSize();
            long j = -1;
            while (j < batchSize) {
                EVENT_CACHE.offer(channel.take());
                if (j != -1) {
                    Event poll = EVENT_CACHE.poll();
                    Event peek = EVENT_CACHE.peek();
                    if (poll == null) {
                        if (j == 0) {
                            status = Sink.Status.BACKOFF;
                            getKafkaSinkCounter().incrementBatchEmptyCount();
                        } else {
                            getKafkaSinkCounter().incrementBatchUnderflowCount();
                        }
                    }
                    getKafkaSinkCounter().incrementEventDrainAttemptCount();
                    boolean z = null == peek;
                    if (null != poll) {
                        logger.info("[sink] 准备核心处理了");
                        coreProcess(poll, z);
                    }
                    if (z) {
                        break;
                    }
                }
                j++;
            }
            CommitBean commitBean = new CommitBean();
            commitBean.batchStartTime = nanoTime;
            commitBean.transaction = transaction;
            commitBean.processedEvents = j;
            logger.info("[sink] 交出了commitBean");
            COMMIT_BEANS_ARRAY.offer(commitBean);
            doCommit(null);
            return status;
        } catch (Exception e) {
            logger.error("Failed to publish events", e);
            getKafkaSinkCounter().incrementEventWriteOrChannelFail(e);
            if (transaction != null) {
                try {
                    getKafkaFutures().clear();
                    transaction.rollback();
                    getKafkaSinkCounter().incrementRollbackCount();
                } catch (Exception e2) {
                    logger.error("Transaction rollback failed", e2);
                    throw Throwables.propagate(e2);
                }
            }
            throw new EventDeliveryException("Failed to publish events", e);
        }
    }

    default void doCommit(Event event) throws ExecutionException, InterruptedException, IOException {
        logger.info("[sink] 准备commit了");
        if (null != event) {
            logger.info("[sink] event为空 commit失败");
            getKafkaFutures().add(toFuture(event, Long.valueOf(System.currentTimeMillis())));
        }
        CommitBean take = COMMIT_BEANS_ARRAY.take();
        if (take.transaction == null) {
            logger.info("[sink] transaction为空 commit失败: " + take);
            return;
        }
        try {
            getProducer().flush();
            if (take.processedEvents > 0) {
                Iterator<Future<RecordMetadata>> it = getKafkaFutures().iterator();
                while (it.hasNext()) {
                    it.next().get();
                }
                getKafkaSinkCounter().addToKafkaEventSendTimer((System.nanoTime() - take.batchStartTime) / 1000000);
                getKafkaSinkCounter().addToEventDrainSuccessCount(getKafkaFutures().size());
            }
            take.transaction.commit();
            logger.info("[sink] commit成功");
            getKafkaFutures().clear();
            if (take.transaction != null) {
                take.transaction.close();
            }
        } catch (Throwable th) {
            getKafkaFutures().clear();
            if (take.transaction != null) {
                take.transaction.close();
            }
            throw th;
        }
    }
}
