/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.jdbc.datasource.transactions.xa;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import javax.transaction.xa.Xid;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState;
import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransactionResult;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.EmptyTransactionXaException;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.TransientXaException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class XaTransaction
implements Serializable,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(XaTransaction.class);
    private final XaConnectionProvider xaConnectionProvider;
    private final JdbcExactlyOnceOptions exactlyOnceOptions;
    private transient List<TransactionId> preparedXids = new ArrayList<TransactionId>();
    private transient Deque<TransactionId> hangingXids = new LinkedList<TransactionId>();
    private transient TransactionId currentTid;
    private final TransactionId baseTransaction;

    public XaTransaction(JdbcExactlyOnceOptions exactlyOnceOptions, TransactionId transactionId, XaConnectionProvider xaFacade) {
        this.xaConnectionProvider = xaFacade;
        this.exactlyOnceOptions = exactlyOnceOptions;
        this.baseTransaction = transactionId;
    }

    public Xid getCurrentXid() {
        return this.currentTid;
    }

    public XaConnectionProvider getConnectionProvider() {
        return this.xaConnectionProvider;
    }

    public JdbcWriterState getState() {
        return JdbcWriterState.of(this.preparedXids, this.hangingXids);
    }

    public void open(JdbcWriterState state) throws IOException {
        try {
            this.xaConnectionProvider.open();
            this.recoverState(state);
            this.hangingXids = new LinkedList<TransactionId>(this.failOrRollback(this.hangingXids).getForRetry());
            this.commitTx();
            if (this.exactlyOnceOptions.isDiscoverAndRollbackOnRecovery()) {
                this.recoverAndRollback();
            }
        }
        catch (Exception e) {
            ExceptionUtils.rethrowIOException((Throwable)e);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.currentTid != null && this.xaConnectionProvider.isOpen()) {
            try {
                LOG.debug("remove current transaction before closing, xid={}", (Object)this.currentTid.getXidValue());
                this.xaConnectionProvider.failAndRollback(this.currentTid);
            }
            catch (Exception e) {
                LOG.warn("unable to fail/rollback current transaction, xid={}", (Object)this.currentTid.getXidValue(), (Object)e);
            }
        }
        this.xaConnectionProvider.close();
        this.currentTid = null;
        this.hangingXids = null;
        this.preparedXids = null;
    }

    public void recoverState(JdbcWriterState state) {
        this.hangingXids = new LinkedList<TransactionId>(state.getHanging());
        this.preparedXids = new ArrayList<TransactionId>(state.getPrepared());
        LOG.info("initialized state: prepared xids: {}, hanging xids: {}", (Object)this.preparedXids.size(), (Object)this.hangingXids.size());
    }

    public void checkState() {
        Preconditions.checkState((this.currentTid != null ? 1 : 0) != 0, (Object)"current xid must not be null");
        Preconditions.checkState((!this.hangingXids.isEmpty() && this.hangingXids.peekLast().equals(this.currentTid) ? 1 : 0) != 0, (Object)"inconsistent internal state");
    }

    public void createTx(long checkpointId) throws IOException {
        try {
            Preconditions.checkState((this.currentTid == null ? 1 : 0) != 0, (Object)"currentXid not null");
            this.currentTid = this.baseTransaction.withBranch(checkpointId);
            this.hangingXids.offerLast(this.currentTid);
            this.xaConnectionProvider.start(this.currentTid);
        }
        catch (Exception e) {
            ExceptionUtils.rethrowIOException((Throwable)e);
        }
    }

    public void prepareTx() throws IOException {
        this.checkState();
        this.hangingXids.pollLast();
        try {
            this.xaConnectionProvider.endAndPrepare(this.currentTid);
            this.preparedXids.add(this.currentTid);
        }
        catch (EmptyTransactionXaException e) {
            LOG.info("empty XA transaction (skip), xid: {}, checkpoint {}", (Object)this.currentTid.getXidValue(), (Object)this.currentTid.getCheckpointId());
        }
        catch (Exception e) {
            ExceptionUtils.rethrowIOException((Throwable)e);
        }
        this.currentTid = null;
    }

    public void commitTx() {
        List<TransactionId> toCommit = this.preparedXids;
        this.preparedXids = new ArrayList<TransactionId>();
        this.preparedXids.addAll(this.commitXids(toCommit));
    }

    public void commitTxUntil(long checkpointId) {
        Tuple2<List<TransactionId>, List<TransactionId>> splittedXids = this.split(this.preparedXids, checkpointId);
        if (((List)splittedXids.f0).isEmpty()) {
            LOG.warn("nothing to commit up to checkpoint: {}", (Object)checkpointId);
        } else {
            this.preparedXids = (List)splittedXids.f1;
            this.preparedXids.addAll(this.commitXids((List)splittedXids.f0));
        }
    }

    public List<TransactionId> commitXids(List<TransactionId> xids) {
        return this.commit(xids, this.exactlyOnceOptions.isAllowOutOfOrderCommits(), this.exactlyOnceOptions.getMaxCommitAttempts()).getForRetry();
    }

    private Tuple2<List<TransactionId>, List<TransactionId>> split(List<TransactionId> list, long checkpointId) {
        return this.split(list, checkpointId, true);
    }

    private Tuple2<List<TransactionId>, List<TransactionId>> split(List<TransactionId> list, long checkpointId, boolean checkpointIntoLo) {
        ArrayList lo = new ArrayList(list.size() / 2);
        ArrayList hi = new ArrayList(list.size() / 2);
        list.forEach(i -> {
            if (i.getCheckpointId() < checkpointId || i.getCheckpointId() == checkpointId && checkpointIntoLo) {
                lo.add(i);
            } else {
                hi.add(i);
            }
        });
        return new Tuple2(lo, hi);
    }

    private XaTransactionResult<TransactionId> commit(List<TransactionId> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts) {
        XaTransactionResult<TransactionId> result = new XaTransactionResult<TransactionId>();
        int origSize = xids.size();
        LOG.debug("commit {} transactions", (Object)origSize);
        Iterator<TransactionId> i = xids.iterator();
        while (i.hasNext() && (result.hasNoFailures() || allowOutOfOrderCommits)) {
            TransactionId x = i.next();
            i.remove();
            try {
                this.xaConnectionProvider.commit(x, x.getRestored());
                result.succeeded(x);
            }
            catch (TransientXaException e) {
                result.failedTransiently(x.withAttemptsIncremented(), e);
            }
            catch (Exception e) {
                result.failed(x, e);
            }
        }
        result.getForRetry().addAll(xids);
        result.throwIfAnyFailed("commit");
        this.throwIfAnyReachedMaxAttempts(result, maxCommitAttempts);
        result.getTransientFailure().ifPresent(f -> LOG.warn("failed to commit {} transactions out of {} (keep them to retry later)", new Object[]{result.getForRetry().size(), origSize, f}));
        return result;
    }

    private XaTransactionResult<TransactionId> failOrRollback(Collection<TransactionId> xids) {
        XaTransactionResult<TransactionId> result = new XaTransactionResult<TransactionId>();
        if (xids.isEmpty()) {
            return result;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("rolling back {} transactions: {}", (Object)xids.size(), xids);
        }
        for (TransactionId x : xids) {
            try {
                this.xaConnectionProvider.failAndRollback(x);
                result.succeeded(x);
            }
            catch (TransientXaException e) {
                LOG.info("unable to fail/rollback transaction, xid={}: {}", (Object)x, (Object)e.getMessage());
                result.failedTransiently(x, e);
            }
            catch (Exception e) {
                LOG.warn("unable to fail/rollback transaction, xid={}: {}", (Object)x, (Object)e.getMessage());
                result.failed(x, e);
            }
        }
        if (!result.getForRetry().isEmpty()) {
            LOG.info("failed to roll back {} transactions", (Object)result.getForRetry().size());
        }
        return result;
    }

    private void recoverAndRollback() {
        Collection<Xid> recovered = this.xaConnectionProvider.recover();
        if (recovered.isEmpty()) {
            return;
        }
        LOG.warn("rollback {} recovered transactions", (Object)recovered.size());
        for (Xid xid : recovered) {
            if (!this.baseTransaction.belongsTo(xid)) continue;
            try {
                this.xaConnectionProvider.rollback(xid);
            }
            catch (Exception e) {
                LOG.info("unable to rollback recovered transaction, xid={}", (Object)xid, (Object)e);
            }
        }
    }

    private void throwIfAnyReachedMaxAttempts(XaTransactionResult<TransactionId> result, int maxAttempts) {
        ArrayList<TransactionId> reached = null;
        for (TransactionId x : result.getForRetry()) {
            if (x.getAttempts() < maxAttempts) continue;
            if (reached == null) {
                reached = new ArrayList<TransactionId>();
            }
            reached.add(x);
        }
        if (reached != null) {
            throw new RuntimeException(String.format("reached max number of commit attempts (%d) for transactions: %s", maxAttempts, reached));
        }
    }
}

