package org.voltdb.exportclient;

import au.com.bytecode.opencsv_voltpatches.CSVWriter;
import com.google_voltpatches.common.net.HostAndPort;
import com.google_voltpatches.common.util.concurrent.ListeningExecutorService;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.voltcore.logging.Level;
import org.voltcore.logging.VoltLogger;
import org.voltcore.utils.CoreUtils;
import org.voltcore.utils.Pair;
import org.voltdb.VoltDB;
import org.voltdb.common.Constants;
import org.voltdb.export.AdvertisedDataSource;
import org.voltdb.export.ExportManager;
import org.voltdb.export.ExportManagerInterface;
import org.voltdb.exportclient.ExportDecoderBase;
import org.voltdb.exportclient.decode.CSVStringDecoder;
import org.voltdb.exportclient.loopback.LoopbackExportClient;
import org.voltdb.utils.CatalogUtil;

/* loaded from: input_file:org/voltdb/exportclient/SocketExporter.class */
public class SocketExporter extends ExportClientBase {
    private static final VoltLogger m_logger = new VoltLogger("ExportClient");
    public static final String SYNC_BLOCK_PROP = "syncblocks";
    public static final String SYNC_BLOCK_MSG = "__SYNC_BLOCK__";
    private static final byte[] SYNC_BLOCK_BYTES;
    String host;
    boolean m_skipInternals = false;
    TimeZone m_timeZone = VoltDB.REAL_DEFAULT_TIMEZONE;
    ExportDecoderBase.BinaryEncoding m_binaryEncoding = ExportDecoderBase.BinaryEncoding.HEX;
    private String[] serverArray;
    private boolean m_syncBlocks;

    /* loaded from: input_file:org/voltdb/exportclient/SocketExporter$SocketExportDecoder.class */
    class SocketExportDecoder extends ExportDecoderBase {
        private final ListeningExecutorService m_es;
        long transactions;
        long totalDecodeTime;
        long timerStart;
        final CSVStringDecoder m_decoder;
        final Map<HostAndPort, Pair<OutputStream, InputStream>> haplist;

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public ListeningExecutorService getExecutor() {
            return this.m_es;
        }

        SocketExportDecoder(AdvertisedDataSource advertisedDataSource) {
            super(advertisedDataSource);
            this.transactions = 0L;
            this.totalDecodeTime = 0L;
            this.timerStart = 0L;
            this.haplist = new HashMap();
            CSVStringDecoder.Builder builder = CSVStringDecoder.builder();
            builder.dateFormatter(Constants.ODBC_DATE_FORMAT_STRING).timeZone(SocketExporter.this.m_timeZone).binaryEncoding(SocketExporter.this.m_binaryEncoding).skipInternalFields(SocketExporter.this.m_skipInternals);
            this.m_decoder = builder.build();
            if (VoltDB.getExportManager().getExportMode() == ExportManagerInterface.ExportMode.BASIC) {
                this.m_es = CoreUtils.getListeningSingleThreadExecutor("Socket Export decoder for partition " + advertisedDataSource.partitionId, 524288);
            } else {
                this.m_es = null;
            }
        }

        void connect() throws IOException {
            SocketExporter.m_logger.info("Connecting to Socket export endpoint...");
            for (String str : SocketExporter.this.serverArray) {
                int i = 5001;
                HostAndPort fromString = HostAndPort.fromString(str);
                if (fromString.hasPort()) {
                    i = fromString.getPort();
                }
                this.haplist.put(fromString, SocketExporter.connectToOneServer(fromString.getHost(), i));
            }
        }

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public void sourceNoLongerAdvertised(AdvertisedDataSource advertisedDataSource) {
            OutputStream first;
            Throwable th;
            try {
                for (Pair<OutputStream, InputStream> pair : this.haplist.values()) {
                    if (SocketExporter.m_logger.isDebugEnabled()) {
                        SocketExporter.m_logger.debug("Flushing " + pair.getFirst() + " for source " + advertisedDataSource);
                    }
                    try {
                        first = pair.getFirst();
                        th = null;
                    } catch (IOException e) {
                        SocketExporter.m_logger.error("Failed to close streams for " + advertisedDataSource, e);
                    }
                    try {
                        try {
                            InputStream second = pair.getSecond();
                            Throwable th2 = null;
                            try {
                                try {
                                    pair.getFirst().flush();
                                    if (second != null) {
                                        if (0 != 0) {
                                            try {
                                                second.close();
                                            } catch (Throwable th3) {
                                                th2.addSuppressed(th3);
                                            }
                                        } else {
                                            second.close();
                                        }
                                    }
                                    if (first != null) {
                                        if (0 != 0) {
                                            try {
                                                first.close();
                                            } catch (Throwable th4) {
                                                th.addSuppressed(th4);
                                            }
                                        } else {
                                            first.close();
                                        }
                                    }
                                } catch (Throwable th5) {
                                    if (second != null) {
                                        if (th2 != null) {
                                            try {
                                                second.close();
                                            } catch (Throwable th6) {
                                                th2.addSuppressed(th6);
                                            }
                                        } else {
                                            second.close();
                                        }
                                    }
                                    throw th5;
                                }
                            } catch (Throwable th7) {
                                th2 = th7;
                                throw th7;
                            }
                        } catch (Throwable th8) {
                            th = th8;
                            throw th8;
                        }
                    } catch (Throwable th9) {
                        if (first != null) {
                            if (th != null) {
                                try {
                                    first.close();
                                } catch (Throwable th10) {
                                    th.addSuppressed(th10);
                                }
                            } else {
                                first.close();
                            }
                        }
                        throw th9;
                    }
                }
                if (this.m_es != null) {
                    this.m_es.shutdown();
                    try {
                        this.m_es.awaitTermination(365L, TimeUnit.DAYS);
                    } catch (InterruptedException e2) {
                        throw new RuntimeException(e2);
                    }
                }
            } finally {
                this.haplist.clear();
            }
        }

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public boolean processRow(ExportRow exportRow) throws ExportDecoderBase.RestartBlockException {
            try {
                if (this.haplist.isEmpty()) {
                    connect();
                }
                if (this.haplist.isEmpty()) {
                    SocketExporter.m_logger.rateLimitedLog(120L, Level.ERROR, null, "Failed to connect to export socket endpoint %s, some servers may be down.", SocketExporter.this.host);
                    throw new ExportDecoderBase.RestartBlockException(true);
                }
                if (SocketExporter.m_logger.isDebugEnabled()) {
                    SocketExporter.m_logger.debug(this.m_source.tableName + ":P" + this.m_source.partitionId + " sending seqNum: " + exportRow.values[2]);
                }
                byte[] bytes = this.m_decoder.decode2(exportRow.generation, exportRow.tableName, exportRow.types, exportRow.names, (String) null, exportRow.values).concat(CSVWriter.DEFAULT_LINE_END).getBytes();
                ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
                allocate.put(bytes);
                allocate.flip();
                for (Pair<OutputStream, InputStream> pair : this.haplist.values()) {
                    pair.getFirst().write(allocate.array());
                    pair.getFirst().flush();
                }
                return true;
            } catch (Exception e) {
                SocketExporter.m_logger.warn("Unexpected error processing row: " + e.getLocalizedMessage() + ". Row will be retried.");
                this.haplist.clear();
                throw new ExportDecoderBase.RestartBlockException(true);
            }
        }

        @Override // org.voltdb.exportclient.ExportDecoderBase
        public void onBlockCompletion(ExportRow exportRow) throws ExportDecoderBase.RestartBlockException {
            try {
                for (Pair<OutputStream, InputStream> pair : this.haplist.values()) {
                    if (SocketExporter.this.m_syncBlocks) {
                        pair.getFirst().write(SocketExporter.SYNC_BLOCK_BYTES);
                    }
                    pair.getFirst().flush();
                    if (SocketExporter.this.m_syncBlocks && pair.getSecond().read() == -1) {
                        throw new ExportDecoderBase.RestartBlockException("Target may not have received the block", true);
                    }
                }
            } catch (Exception e) {
                SocketExporter.m_logger.rateLimitedLog(120L, Level.WARN, null, "Failed to flush block to socket endpoint %s, some servers may be down. The rows will be retried", SocketExporter.this.host);
                this.haplist.clear();
                throw new ExportDecoderBase.RestartBlockException("Error finishing the block", e, true);
            }
        }
    }

    @Override // org.voltdb.exportclient.ExportClientBase
    public void configure(Properties properties) throws Exception {
        this.host = properties.getProperty("socket.dest", "localhost:5001");
        setRunEverywhere(Boolean.parseBoolean(properties.getProperty("replicated", "false")));
        this.m_skipInternals = Boolean.parseBoolean(properties.getProperty(LoopbackExportClient.Config.SKIP_INTERNALS, "false").trim());
        String trim = properties.getProperty("timezone", "").trim();
        if (!trim.isEmpty()) {
            this.m_timeZone = TimeZone.getTimeZone(trim);
        }
        this.m_syncBlocks = Boolean.parseBoolean(properties.getProperty(SYNC_BLOCK_PROP));
        if (Boolean.parseBoolean(properties.getProperty(ExportManager.CONFIG_CHECK_ONLY, "false"))) {
            return;
        }
        this.serverArray = this.host.split(CatalogUtil.SIGNATURE_DELIMITER);
    }

    static Pair<OutputStream, InputStream> connectToOneServer(String str, int i) throws IOException {
        Socket socket = new Socket(str, i);
        return new Pair<>(socket.getOutputStream(), socket.getInputStream());
    }

    @Override // org.voltdb.exportclient.ExportClientBase
    public ExportDecoderBase constructExportDecoder(AdvertisedDataSource advertisedDataSource) {
        return new SocketExportDecoder(advertisedDataSource);
    }

    static {
        byte[] bArr = null;
        try {
            bArr = "__SYNC_BLOCK__\n".getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
        }
        SYNC_BLOCK_BYTES = bArr;
    }
}
