package com.ovopark.messagehub.cdc.engine;

import com.ovopark.kernel.shared.Config;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Util;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ovopark/messagehub/cdc/engine/KafkaBasedStarter.class */
public final class KafkaBasedStarter {
    private static final Logger log = LoggerFactory.getLogger(KafkaBasedStarter.class);
    private static final Set<String> prefixSet = new HashSet();

    /* loaded from: input_file:com/ovopark/messagehub/cdc/engine/KafkaBasedStarter$ChangeEventListener.class */
    public interface ChangeEventListener {
        void onChangeEvent(ChangeEvent<String, String> changeEvent);
    }

    /* loaded from: input_file:com/ovopark/messagehub/cdc/engine/KafkaBasedStarter$DataCapture.class */
    public interface DataCapture {
        void onData(Map<String, Object> map);
    }

    /* loaded from: input_file:com/ovopark/messagehub/cdc/engine/KafkaBasedStarter$SchemaCapture.class */
    public interface SchemaCapture {
        void onSchema(Map<String, Object> map);
    }

    public static void register(String str) {
        Config constructConfig = constructConfig(str);
        Iterator<String> it = prefixSet.iterator();
        while (it.hasNext()) {
            Config constructConfig2 = constructConfig(it.next());
            mustBeDiff("cdc.offset.storage.topic", constructConfig, constructConfig2);
            mustBeDiff("cdc.schema.history.topic", constructConfig, constructConfig2);
            mustBeDiff("cdc.engine", constructConfig, constructConfig2);
            mustBeDiff("cdc.topic.prefix", constructConfig, constructConfig2);
            mustBeDiff("cdc.mysql.serverId", constructConfig, constructConfig2);
        }
        prefixSet.add(str);
    }

    private static void mustBeDiff(String str, Config config, Config config2) {
        String string = config.getString(str, "missing");
        String string2 = config2.getString(str, "missing");
        if ("missing".equals(string) || "missing".equals(string2)) {
            throw new IllegalArgumentException("config is null: " + str);
        }
        if (string.equals(string2)) {
            throw new IllegalArgumentException("duplicate config: " + str);
        }
    }

    private static Config constructConfig(final String str) {
        return new Config() { // from class: com.ovopark.messagehub.cdc.engine.KafkaBasedStarter.1
            public boolean contains(String str2) {
                return Config.ConfigPriority.option().contains(str + "." + str2);
            }

            public Object getObject(String str2, Object obj) {
                return Config.ConfigPriority.option().getObject(str + "." + str2, obj);
            }
        };
    }

    public static DebeziumEngine<ChangeEvent<String, String>> createEngine(String str, final SchemaCapture schemaCapture, final DataCapture dataCapture) {
        return createEngine(str, new ChangeEventListener() { // from class: com.ovopark.messagehub.cdc.engine.KafkaBasedStarter.2
            @Override // com.ovopark.messagehub.cdc.engine.KafkaBasedStarter.ChangeEventListener
            public void onChangeEvent(ChangeEvent<String, String> changeEvent) {
                try {
                    String str2 = (String) changeEvent.value();
                    if (Util.isEmpty(str2)) {
                        KafkaBasedStarter.log.info("delete mark: " + ((String) changeEvent.key()));
                        return;
                    }
                    Map<String, Object> read = JSONAccessor.impl().read(str2);
                    Object obj = read.get("payload");
                    if ((obj instanceof Map) && ((Map) obj).containsKey("before") && ((Map) obj).containsKey("after")) {
                        DataCapture.this.onData((Map) obj);
                    } else {
                        schemaCapture.onSchema(read);
                        KafkaBasedStarter.log.info("ignore schema??? : " + JSONAccessor.impl().format(obj));
                    }
                } catch (Exception e) {
                    KafkaBasedStarter.log.error(e.getMessage(), e);
                    throw Util.convert2RuntimeException(e);
                }
            }
        });
    }

    public static DebeziumEngine<ChangeEvent<String, String>> createEngine(String str, ChangeEventListener changeEventListener) {
        if (!prefixSet.contains(str)) {
            throw new IllegalArgumentException("prefix is not registered: " + str);
        }
        Config constructConfig = constructConfig(str);
        Object object = constructConfig.getObject("cdc.extendProps", Map.of());
        String string = constructConfig.getString("cdc.kafkaServer", "localhost:9092");
        String string2 = constructConfig.getString("cdc.offset.flushIntervalMs", "5000");
        String string3 = constructConfig.getString("cdc.mysql.hostname", "localhost");
        String string4 = constructConfig.getString("cdc.mysql.port", "3306");
        String string5 = constructConfig.getString("cdc.mysql.user", "root");
        String string6 = constructConfig.getString("cdc.mysql.password", "123456");
        String string7 = constructConfig.getString("cdc.mysql.serverId", "missing");
        String string8 = constructConfig.getString("cdc.mysql.connectionTimeZone", "GMT+8");
        String string9 = constructConfig.getString("cdc.mysql.database.include.list", "local");
        String string10 = constructConfig.getString("cdc.mysql.table.include.list", "local");
        String string11 = constructConfig.getString("cdc.offset.storage.topic", "messagehub-connect-offset-local");
        String string12 = constructConfig.getString("cdc.schema.history.topic", "messagehub-schema-history-local");
        String string13 = constructConfig.getString("cdc.topic.prefix", "my-app-connector-local");
        Properties properties = new Properties();
        if (object != null) {
            ((Map) object).forEach((obj, obj2) -> {
                properties.setProperty(String.valueOf(obj), Util.convert2String(obj2, (String) null));
            });
        }
        properties.setProperty("name", constructConfig.getString("cdc.engine", "engine"));
        properties.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
        properties.setProperty("snapshot.mode", constructConfig.getString("cdc.snapshot.mode", "missing"));
        properties.setProperty("snapshot.locking.mode", constructConfig.getString("cdc.snapshot.locking.mode", "none"));
        properties.setProperty("bootstrap.servers", string);
        properties.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore");
        properties.setProperty("offset.storage.kafka.bootstrap.servers", string);
        properties.setProperty("offset.storage.topic", string11);
        properties.setProperty("offset.storage.partitions", "1");
        properties.setProperty("offset.storage.replication.factor", "1");
        properties.setProperty("offset.flush.interval.ms", string2);
        properties.setProperty("database.hostname", string3);
        properties.setProperty("database.port", string4);
        properties.setProperty("database.user", string5);
        properties.setProperty("database.password", string6);
        properties.setProperty("database.server.id", string7);
        properties.setProperty("topic.prefix", string13);
        properties.setProperty("database.include.list", string9);
        properties.setProperty("database.connectionTimeZone", string8);
        properties.setProperty("table.include.list", string10);
        properties.setProperty("schema.history.internal.store.only.captured.tables.ddl", "false");
        properties.setProperty("schema.history.internal.retention.ms", "2592000000");
        properties.setProperty("schema.history.internal.kafka.bootstrap.servers", string);
        properties.setProperty("schema.history.internal.kafka.topic", string12);
        properties.setProperty("transforms", "tz");
        properties.setProperty("transforms.tz.type", "io.debezium.transforms.TimezoneConverter");
        properties.setProperty("transforms.tz.converted.timezone", "Asia/Shanghai");
        properties.setProperty("converters", "timestamp2UnixMsConverter");
        properties.setProperty("timestamp2UnixMsConverter.type", Timestamp2UnixMsConverter.class.getName());
        properties.setProperty("timestamp2UnixMsConverter.database", string9);
        properties.forEach((obj3, obj4) -> {
            log.info(String.valueOf(obj3) + " = " + String.valueOf(obj4));
        });
        DebeziumEngine.Builder using = DebeziumEngine.create(Json.class).using(properties);
        Objects.requireNonNull(changeEventListener);
        return using.notifying(changeEventListener::onChangeEvent).build();
    }
}
