package org.voltdb.exportclient.kafka;

import com.google_voltpatches.common.base.Splitter;
import com.google_voltpatches.common.collect.ImmutableList;
import com.google_voltpatches.common.collect.ImmutableMap;
import com.google_voltpatches.common.collect.UnmodifiableIterator;
import com.google_voltpatches.common.util.concurrent.ListeningExecutorService;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons_voltpatches.cli.HelpFormatter;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.hsqldb_voltpatches.ErrorCode;
import org.hsqldb_voltpatches.lib.StringUtil;
import org.voltcore.utils.CoreUtils;
import org.voltdb.AbstractTopology;
import org.voltdb.VoltDB;
import org.voltdb.common.Constants;
import org.voltdb.export.AdvertisedDataSource;
import org.voltdb.export.ExportManagerInterface;
import org.voltdb.exportclient.ExportClientBase;
import org.voltdb.exportclient.ExportClientLogger;
import org.voltdb.exportclient.ExportDecoderBase;
import org.voltdb.exportclient.ExportRow;
import org.voltdb.exportclient.decode.AvroDecoder;
import org.voltdb.exportclient.decode.CSVStringDecoder;
import org.voltdb.serdes.EncodeFormat;
import org.voltdb.utils.CatalogUtil;

/* loaded from: input_file:org/voltdb/exportclient/kafka/KafkaExportClient.class */
public class KafkaExportClient extends ExportClientBase {
    private static final String DEFAULT_CLIENT_ID = "voltdb";
    private static final String TIMEZONE_PN = "timezone";
    private static final String SKIP_INTERNALS_PN = "skipinternals";
    private static final String BINARY_ENCODING_PN = "binaryencoding";
    private static final String BATCH_MODE_PN = "batch.mode";
    private static final String TOPIC_KEY_PN = "topic.key";
    private static final String TOPIC_PREFIX_PN = "topic.prefix";
    private static final String PARTITION_KEY_PN = "partition.key";
    private static final String BROKER_LIST_PN = "metadata.broker.list";
    private static final String OLD_SERIALIZER = "serializer.class";
    private static final String OLD_PARTITIONER = "partitioner.class";
    private static final String ACKS_TIMEOUT = "acks.retry.timeout";
    private static final String LEGACY_ACKS = "request.required.acks";
    public static final String ENCODE_FORMAT = "type";
    private static final String MAX_BLOCK_MS_DEFAULT = "60000";
    private static final int SHUTDOWN_TIMEOUT_MS = 10000;
    public static final String DEFAULT_EXPORT_PREFIX = "voltdbexport";
    Properties m_producerConfig;
    Map<String, String> m_tableTopics;
    Map<String, String> m_tablePartitionColumns;
    private static final Splitter COMMA_SPLITTER = Splitter.on(CatalogUtil.SIGNATURE_DELIMITER).omitEmptyStrings().trimResults();
    private static final Splitter PERIOD_SPLITTER = Splitter.on(".").omitEmptyStrings().trimResults();
    private static final ExportClientLogger LOG = new ExportClientLogger();
    private EncodeFormat m_encodeFormat = EncodeFormat.CSV;
    String m_topicPrefix = DEFAULT_EXPORT_PREFIX;
    boolean m_skipInternals = false;
    TimeZone m_timeZone = VoltDB.REAL_DEFAULT_TIMEZONE;
    ExportDecoderBase.BinaryEncoding m_binaryEncoding = ExportDecoderBase.BinaryEncoding.HEX;
    boolean m_pollFutures = false;
    int m_acksTimeout = ErrorCode.X_42000;

    /* loaded from: input_file:org/voltdb/exportclient/kafka/KafkaExportClient$KafkaExportDecoder.class */
    class KafkaExportDecoder extends ExportDecoderBase {
        String m_topic;
        Properties m_decoderProducerConfig;
        KafkaProducer<String, Object> m_producer;
        Map<String, AvroDecoder> m_tableAvroDecoderMap;
        CSVStringDecoder m_csvDecoder;
        final List<Future<RecordMetadata>> m_futures;
        private final AtomicBoolean m_failure;
        final ListeningExecutorService m_es;
        private volatile boolean m_primed;
        private volatile boolean m_paused;

        public KafkaExportDecoder(AdvertisedDataSource advertisedDataSource) {
            super(advertisedDataSource);
            this.m_topic = null;
            this.m_futures = new ArrayList();
            this.m_failure = new AtomicBoolean(false);
            this.m_primed = false;
            this.m_paused = false;
            if (KafkaExportClient.this.m_encodeFormat == EncodeFormat.AVRO) {
                this.m_tableAvroDecoderMap = new ConcurrentHashMap();
            } else {
                CSVStringDecoder.Builder builder = CSVStringDecoder.builder();
                builder.dateFormatter(Constants.ODBC_DATE_FORMAT_STRING).timeZone(KafkaExportClient.this.m_timeZone).binaryEncoding(KafkaExportClient.this.m_binaryEncoding).skipInternalFields(KafkaExportClient.this.m_skipInternals);
                this.m_csvDecoder = builder.build();
            }
            if (VoltDB.getExportManager().getExportMode() == ExportManagerInterface.ExportMode.BASIC) {
                this.m_es = CoreUtils.getListeningSingleThreadExecutor("Kafka Export decoder for partition " + advertisedDataSource.tableName + " - " + advertisedDataSource.partitionId, 524288);
            } else {
                this.m_es = null;
            }
            this.m_decoderProducerConfig = new Properties();
            this.m_decoderProducerConfig.putAll(KafkaExportClient.this.m_producerConfig);
        }

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public synchronized void pause() {
            this.m_paused = true;
            if (this.m_producer != null) {
                this.m_producer.close(Duration.ofMillis(0L));
            }
            this.m_primed = false;
        }

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public synchronized void resume() {
            this.m_paused = false;
        }

        final synchronized void checkOnFirstRow() throws ExportDecoderBase.RestartBlockException {
            if (this.m_paused) {
                throw new ExportDecoderBase.RestartBlockException("Exporter has been paused", false);
            }
            if (!this.m_primed) {
                try {
                    setClientId();
                    this.m_producer = new KafkaProducer<>(this.m_decoderProducerConfig);
                } catch (ConfigException e) {
                    KafkaExportClient.LOG.error("Unable to instantiate a Kafka producer", e, new Object[0]);
                    throw new ExportDecoderBase.RestartBlockException("Unable to instantiate a Kafka producer", e, true);
                } catch (KafkaException e2) {
                    KafkaExportClient.LOG.error("Unable to instantiate a Kafka producer", e2, new Object[0]);
                    throw new ExportDecoderBase.RestartBlockException("Unable to instantiate a Kafka producer", e2, true);
                }
            }
            this.m_primed = true;
        }

        private void setClientId() {
            if ("voltdb".equals(this.m_decoderProducerConfig.getProperty("client.id"))) {
                this.m_decoderProducerConfig.setProperty("client.id", "producer-" + this.m_source.tableName + HelpFormatter.DEFAULT_OPT_PREFIX + this.m_source.partitionId);
            }
        }

        private void populateTopic(String str) {
            if (KafkaExportClient.this.m_tableTopics == null || !KafkaExportClient.this.m_tableTopics.containsKey(str.toLowerCase())) {
                this.m_topic = (KafkaExportClient.this.m_topicPrefix + str).intern();
            } else {
                this.m_topic = KafkaExportClient.this.m_tableTopics.get(str.toLowerCase()).intern();
            }
        }

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public ListeningExecutorService getExecutor() {
            return this.m_es;
        }

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public void onBlockCompletion(ExportRow exportRow) throws ExportDecoderBase.RestartBlockException {
            try {
                try {
                    try {
                        try {
                            if (KafkaExportClient.this.m_pollFutures || this.m_failure.get()) {
                                this.m_producer.flush();
                                UnmodifiableIterator it = ImmutableList.copyOf((Collection) this.m_futures).iterator();
                                while (it.hasNext()) {
                                    ((Future) it.next()).get(1L, TimeUnit.MILLISECONDS);
                                }
                            }
                        } catch (ExecutionException e) {
                            KafkaExportClient.LOG.warn("Send operation failed to complete", e.getCause(), new Object[0]);
                            throw new ExportDecoderBase.RestartBlockException("Send operation failed to complete", e.getCause(), true);
                        }
                    } catch (InterruptedException e2) {
                        KafkaExportClient.LOG.warn("Iterrupted send operation", e2, new Object[0]);
                        throw new ExportDecoderBase.RestartBlockException("Iterrupted send operation", e2, true);
                    }
                } catch (TimeoutException e3) {
                    throw new ExportDecoderBase.RestartBlockException("Send operation timed out", e3, true);
                }
            } finally {
                this.m_futures.clear();
                this.m_failure.set(false);
            }
        }

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public void onBlockStart(ExportRow exportRow) throws ExportDecoderBase.RestartBlockException {
            checkOnFirstRow();
            if (this.m_topic == null) {
                populateTopic(exportRow.tableName);
            }
        }

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public boolean processRow(ExportRow exportRow) throws ExportDecoderBase.RestartBlockException {
            checkOnFirstRow();
            String valueOf = exportRow.partitionValue == null ? String.valueOf(exportRow.partitionId) : exportRow.partitionValue.toString();
            ProducerRecord producerRecord = KafkaExportClient.this.m_encodeFormat == EncodeFormat.AVRO ? new ProducerRecord(this.m_topic, valueOf, this.m_tableAvroDecoderMap.computeIfAbsent(exportRow.tableName, str -> {
                return new AvroDecoder.Builder().build();
            }).decode2(exportRow.generation, exportRow.tableName, exportRow.types, exportRow.names, (GenericRecord) null, exportRow.values)) : new ProducerRecord(this.m_topic, valueOf, this.m_csvDecoder.decode2(exportRow.generation, exportRow.tableName, exportRow.types, exportRow.names, (String) null, exportRow.values));
            try {
                this.m_futures.add(this.m_producer.send(producerRecord, new Callback() { // from class: org.voltdb.exportclient.kafka.KafkaExportClient.KafkaExportDecoder.1
                    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                        if (exc != null) {
                            KafkaExportClient.LOG.warn("Failed to send data. Verify if the kafka server matches bootstrap.servers %s", exc, KafkaExportDecoder.this.m_decoderProducerConfig.getProperty("bootstrap.servers"));
                            KafkaExportDecoder.this.m_failure.compareAndSet(false, true);
                        }
                    }
                }));
                return true;
            } catch (KafkaException e) {
                KafkaExportClient.LOG.warn("Unable to send %s", e, producerRecord);
                throw new ExportDecoderBase.RestartBlockException("Unable to send message", e, true);
            } catch (IllegalStateException e2) {
                throw new ExportDecoderBase.RestartBlockException("IllegalStateException, possibly because kafka producer was closed", e2, false);
            }
        }

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public void sourceNoLongerAdvertised(AdvertisedDataSource advertisedDataSource) {
            if (this.m_producer != null) {
                try {
                    this.m_producer.close(Duration.ofMillis(0L));
                } catch (Exception e) {
                    KafkaExportClient.LOG.debug("Unexpected error trying to close KafkaProducer", e, new Object[0]);
                }
            }
            if (this.m_es != null) {
                this.m_es.shutdown();
                try {
                    if (!this.m_es.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                        forceExecutorShutdown();
                    }
                } catch (InterruptedException e2) {
                    KafkaExportClient.LOG.warn("Interrupted while awaiting executor shutdown on source:" + this.m_source, new Object[0]);
                    forceExecutorShutdown();
                }
            }
        }

        private void forceExecutorShutdown() {
            if (this.m_es == null) {
                return;
            }
            KafkaExportClient.LOG.warn("Forcing executor shutdown on source: " + this.m_source, new Object[0]);
            try {
                this.m_es.shutdownNow();
            } catch (Exception e) {
                KafkaExportClient.LOG.error("Failed to force executor shutdown on source: " + this.m_source, e, new Object[0]);
            }
        }
    }

    @Override // org.voltdb.exportclient.ExportClientBase
    public void configure(Properties properties) throws Exception {
        if (properties.getProperty(ENCODE_FORMAT, "").trim().toLowerCase().equals("avro")) {
            this.m_encodeFormat = EncodeFormat.AVRO;
        } else {
            this.m_encodeFormat = EncodeFormat.CSV;
        }
        this.m_producerConfig = new Properties();
        this.m_producerConfig.putAll(properties);
        this.m_producerConfig.remove("__EXPORT_TO_TYPE__");
        this.m_producerConfig.remove(BATCH_MODE_PN);
        this.m_producerConfig.remove(OLD_SERIALIZER);
        this.m_producerConfig.remove(OLD_PARTITIONER);
        this.m_producerConfig.remove(ENCODE_FORMAT);
        this.m_timeZone = VoltDB.GMT_TIMEZONE;
        String trim = properties.getProperty(TIMEZONE_PN, "").trim();
        if (!trim.isEmpty()) {
            this.m_timeZone = TimeZone.getTimeZone(trim);
        }
        this.m_producerConfig.remove(TIMEZONE_PN);
        String trim2 = properties.getProperty("skipinternals", "").trim();
        if (!trim2.isEmpty()) {
            this.m_skipInternals = Boolean.parseBoolean(trim2);
        }
        this.m_producerConfig.remove("skipinternals");
        String upperCase = properties.getProperty(BINARY_ENCODING_PN, "").trim().toUpperCase();
        if (!upperCase.isEmpty()) {
            this.m_binaryEncoding = ExportDecoderBase.BinaryEncoding.valueOf(upperCase);
        }
        this.m_producerConfig.remove(BINARY_ENCODING_PN);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        String property = properties.getProperty(TOPIC_KEY_PN, "");
        Iterator<String> it = COMMA_SPLITTER.split(property).iterator();
        while (it.hasNext()) {
            List<String> splitToList = PERIOD_SPLITTER.splitToList(it.next());
            if (splitToList.size() != 2) {
                throw new IllegalArgumentException("Malformed value \"" + property + "\" for property " + TOPIC_KEY_PN);
            }
            builder.put(splitToList.get(0).toLowerCase(), splitToList.get(1));
        }
        try {
            this.m_tableTopics = builder.build();
            this.m_producerConfig.remove(TOPIC_KEY_PN);
            String property2 = properties.getProperty(TOPIC_PREFIX_PN);
            if (property2 != null) {
                this.m_topicPrefix = property2.trim();
            }
            this.m_producerConfig.remove(TOPIC_PREFIX_PN);
            ImmutableMap.Builder builder2 = ImmutableMap.builder();
            String property3 = properties.getProperty(PARTITION_KEY_PN, "");
            Iterator<String> it2 = COMMA_SPLITTER.split(property3).iterator();
            while (it2.hasNext()) {
                List<String> splitToList2 = PERIOD_SPLITTER.splitToList(it2.next());
                if (splitToList2.size() != 2) {
                    throw new IllegalArgumentException("Malformed value \"" + property3 + "\" for property " + PARTITION_KEY_PN);
                }
                builder2.put(splitToList2.get(0).toLowerCase(), splitToList2.get(1));
            }
            try {
                this.m_tablePartitionColumns = builder2.build();
                this.m_producerConfig.remove(PARTITION_KEY_PN);
                if (properties.getProperty("client.id", "").trim().isEmpty()) {
                    this.m_producerConfig.setProperty("client.id", "voltdb");
                }
                if (properties.getProperty("acks", "").trim().isEmpty()) {
                    this.m_producerConfig.setProperty("acks", "-1");
                }
                this.m_pollFutures = !AbstractTopology.PLACEMENT_GROUP_DEFAULT.equals(this.m_producerConfig.get("acks"));
                if (!this.m_pollFutures) {
                    this.m_pollFutures = Boolean.getBoolean(properties.getProperty(LEGACY_ACKS, "false"));
                }
                try {
                    String property4 = properties.getProperty("retries", "4");
                    if (Integer.parseInt(property4) < 0) {
                        throw new IllegalArgumentException("\"retries\" must be >= 0");
                    }
                    this.m_producerConfig.setProperty("retries", property4);
                    try {
                        String property5 = properties.getProperty("buffer.memory", "2097152");
                        if (Long.parseLong(property5) <= 0) {
                            throw new IllegalArgumentException("\"buffer.memory\" must be > 0");
                        }
                        this.m_producerConfig.setProperty("buffer.memory", property5);
                        try {
                            String property6 = properties.getProperty("batch.size", "1024");
                            if (Integer.parseInt(property6) <= 0) {
                                throw new IllegalArgumentException("\"batch.size\" must be >= 0");
                            }
                            this.m_producerConfig.setProperty("batch.size", property6);
                            String trim3 = properties.getProperty("key.serializer", "").trim();
                            if (trim3.isEmpty()) {
                                this.m_producerConfig.setProperty("key.serializer", StringSerializer.class.getName());
                            } else {
                                try {
                                    Class.forName(trim3);
                                } catch (ClassNotFoundException | ExceptionInInitializerError | UnknownError e) {
                                    throw new IllegalArgumentException("Unable to load serializer class " + trim3, e);
                                }
                            }
                            if (this.m_encodeFormat == EncodeFormat.AVRO) {
                                this.m_producerConfig.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
                                String trim4 = properties.getProperty("schema.registry.url", "").trim();
                                if (StringUtil.isEmpty(trim4)) {
                                    throw new IllegalArgumentException("Property \"schema.registry.url\" cannot be empty.");
                                }
                                this.m_producerConfig.setProperty("schema.registry.url", trim4);
                            } else {
                                String trim5 = properties.getProperty("value.serializer", "").trim();
                                if (trim5.isEmpty()) {
                                    this.m_producerConfig.setProperty("value.serializer", StringSerializer.class.getName());
                                } else {
                                    try {
                                        Class.forName(trim5);
                                    } catch (ClassNotFoundException | ExceptionInInitializerError | UnknownError e2) {
                                        throw new IllegalArgumentException("Unable to load serializer class " + trim5, e2);
                                    }
                                }
                            }
                            if (properties.getProperty("bootstrap.servers", "").trim().isEmpty()) {
                                String trim6 = properties.getProperty(BROKER_LIST_PN, "").trim();
                                if (trim6.isEmpty()) {
                                    throw new IllegalArgumentException("Required property bootstrap.servers is undefined");
                                }
                                this.m_producerConfig.remove(BROKER_LIST_PN);
                                this.m_producerConfig.setProperty("bootstrap.servers", trim6);
                            }
                            this.m_producerConfig.setProperty("max.block.ms", MAX_BLOCK_MS_DEFAULT);
                            LOG.info("Configuring Kafka export client: %s", this.m_producerConfig);
                        } catch (NumberFormatException e3) {
                            throw new IllegalArgumentException("\"batch.size\" must be an integer", e3);
                        }
                    } catch (NumberFormatException e4) {
                        throw new IllegalArgumentException("\"buffer.memory\" must be a long", e4);
                    }
                } catch (NumberFormatException e5) {
                    throw new IllegalArgumentException("\"retries\" must be an integer", e5);
                }
            } catch (IllegalArgumentException e6) {
                throw new IllegalArgumentException("Repetitions found in \"" + property3 + "\" for property " + PARTITION_KEY_PN, e6);
            }
        } catch (IllegalArgumentException e7) {
            throw new IllegalArgumentException("Repetitions found in \"" + property + "\" for property " + TOPIC_KEY_PN, e7);
        }
    }

    @Override // org.voltdb.exportclient.ExportClientBase
    public ExportDecoderBase constructExportDecoder(AdvertisedDataSource advertisedDataSource) {
        return new KafkaExportDecoder(advertisedDataSource);
    }
}
