/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.messagehub.cdc.engine;

import com.ovopark.kernel.shared.Config;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Util;
import com.ovopark.messagehub.cdc.engine.KafkaBasedStarter;
import com.ovopark.messagehub.cdc.engine.Timestamp2UnixMsConverter;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.relational.history.MemorySchemaHistory;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryBasedStarter {
    private static final Logger log = LoggerFactory.getLogger(MemoryBasedStarter.class);
    private static final Set<String> prefixSet = new HashSet<String>();

    public static void register(String prefix) {
        Config configProxy = MemoryBasedStarter.constructConfig(prefix);
        for (String p : prefixSet) {
            Config preConfig = MemoryBasedStarter.constructConfig(p);
            MemoryBasedStarter.mustBeDiff("cdc.engine", configProxy, preConfig);
            MemoryBasedStarter.mustBeDiff("cdc.topic.prefix", configProxy, preConfig);
            MemoryBasedStarter.mustBeDiff("cdc.mysql.serverId", configProxy, preConfig);
        }
        prefixSet.add(prefix);
    }

    private static void mustBeDiff(String property, Config a, Config b) {
        String as = a.getString(property, "missing");
        String bs = b.getString(property, "missing");
        if ("missing".equals(as) || "missing".equals(bs)) {
            throw new IllegalArgumentException("config is null: " + property);
        }
        if (as.equals(bs)) {
            throw new IllegalArgumentException("duplicate config: " + property);
        }
    }

    private static Config constructConfig(final String prefix) {
        Config configProxy = new Config(){

            public boolean contains(String s) {
                return Config.ConfigPriority.option().contains(prefix + "." + s);
            }

            public Object getObject(String s, Object o) {
                return Config.ConfigPriority.option().getObject(prefix + "." + s, o);
            }
        };
        return configProxy;
    }

    public static DebeziumEngine<ChangeEvent<String, String>> createEngine(String prefix, final KafkaBasedStarter.SchemaCapture schemaCapture, final KafkaBasedStarter.DataCapture dataCapture) {
        return MemoryBasedStarter.createEngine(prefix, new KafkaBasedStarter.ChangeEventListener(){

            @Override
            public void onChangeEvent(ChangeEvent<String, String> changeEvent) {
                try {
                    String value = (String)changeEvent.value();
                    if (Util.isEmpty((CharSequence)value)) {
                        log.info("delete mark: " + (String)changeEvent.key());
                        return;
                    }
                    Map map = JSONAccessor.impl().read(value);
                    Object payload = map.get("payload");
                    if (payload instanceof Map && ((Map)payload).containsKey("before") && ((Map)payload).containsKey("after")) {
                        dataCapture.onData((Map)payload);
                    } else {
                        schemaCapture.onSchema(map);
                        log.info("ignore schema??? : " + JSONAccessor.impl().format(payload));
                    }
                }
                catch (Exception e) {
                    log.error(e.getMessage(), (Throwable)e);
                    throw Util.convert2RuntimeException((Throwable)e);
                }
            }
        });
    }

    public static DebeziumEngine<ChangeEvent<String, String>> createEngine(String prefix, KafkaBasedStarter.ChangeEventListener changeEventListener) {
        if (!prefixSet.contains(prefix)) {
            throw new IllegalArgumentException("prefix is not registered: " + prefix);
        }
        Config configProxy = MemoryBasedStarter.constructConfig(prefix);
        Object object = configProxy.getObject("cdc.extendProps", Map.of());
        String mysqlHost = configProxy.getString("cdc.mysql.hostname", "localhost");
        String mysqlPort = configProxy.getString("cdc.mysql.port", "3306");
        String mysqlUser = configProxy.getString("cdc.mysql.user", "root");
        String mysqlPassword = configProxy.getString("cdc.mysql.password", "123456");
        String mysqlServerId = configProxy.getString("cdc.mysql.serverId", "missing");
        String mysqlConnectionTimeZone = configProxy.getString("cdc.mysql.connectionTimeZone", "GMT+8");
        String mysqlIncludeDatabase = configProxy.getString("cdc.mysql.database.include.list", "local");
        String mysqlIncludeTable = configProxy.getString("cdc.mysql.table.include.list", "local");
        String topicPrefix = configProxy.getString("cdc.topic.prefix", "my-app-connector-local");
        Properties props = new Properties();
        if (object != null) {
            Map map = (Map)object;
            map.forEach((k, v) -> props.setProperty(String.valueOf(k), Util.convert2String((Object)v, null)));
        }
        props.setProperty("name", configProxy.getString("cdc.engine", "engine"));
        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
        props.setProperty("snapshot.mode", configProxy.getString("cdc.snapshot.mode", "missing"));
        props.setProperty("snapshot.locking.mode", configProxy.getString("cdc.snapshot.locking.mode", "none"));
        props.setProperty("offset.storage", MemoryOffsetBackingStore.class.getName());
        props.setProperty("database.hostname", mysqlHost);
        props.setProperty("database.port", mysqlPort);
        props.setProperty("database.user", mysqlUser);
        props.setProperty("database.password", mysqlPassword);
        props.setProperty("database.server.id", mysqlServerId);
        props.setProperty("topic.prefix", topicPrefix);
        props.setProperty("database.include.list", mysqlIncludeDatabase);
        props.setProperty("database.connectionTimeZone", mysqlConnectionTimeZone);
        props.setProperty("table.include.list", mysqlIncludeTable);
        props.setProperty("schema.history.internal.store.only.captured.tables.ddl", "false");
        props.setProperty("schema.history.internal", MemorySchemaHistory.class.getName());
        props.setProperty("transforms", "tz");
        props.setProperty("transforms.tz.type", "io.debezium.transforms.TimezoneConverter");
        props.setProperty("transforms.tz.converted.timezone", "Asia/Shanghai");
        props.setProperty("converters", "timestamp2UnixMsConverter");
        props.setProperty("timestamp2UnixMsConverter.type", Timestamp2UnixMsConverter.class.getName());
        props.setProperty("timestamp2UnixMsConverter.database", mysqlIncludeDatabase);
        props.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> log.info(String.valueOf(k) + " = " + String.valueOf(v))));
        DebeziumEngine engine = DebeziumEngine.create(Json.class).using(props).notifying(changeEventListener::onChangeEvent).build();
        return engine;
    }

    public static interface SchemaCapture {
        public void onSchema(Map<String, Object> var1);
    }

    public static interface ChangeEventListener {
        public void onChangeEvent(ChangeEvent<String, String> var1);
    }

    public static interface DataCapture {
        public void onData(Map<String, Object> var1);
    }
}

