package com.ovopark.messagehub.plugins.sms.aliyun;

import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.stream.Stream;
import com.ovopark.messagehub.plugins.bridge.KafkaReply;
import com.ovopark.messagehub.plugins.bridge.MsgContext;
import com.ovopark.messagehub.plugins.bridge.SMSMsg;
import com.ovopark.messagehub.plugins.kernel.ConditionOnSubs;
import com.ovopark.messagehub.plugins.kernel.service.PluginsCfgService;
import com.ovopark.messagehub.plugins.kernel.service.SubsRateLimiterClient;
import com.ovopark.messagehub.sdk.model.Subs;
import com.ovopark.messagehub.sdk.model.internal.PluginsManager;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@ConditionOnSubs("SMS")
@Component
/* loaded from: input_file:com/ovopark/messagehub/plugins/sms/aliyun/AliSMSInStream.class */
public class AliSMSInStream {
    private static final Logger log = LoggerFactory.getLogger(AliSMSInStream.class);

    @Autowired
    private KafkaReply kafkaReply;

    @Autowired
    private SubsRateLimiterClient subsRateLimiterClient;

    @Autowired
    private PluginsCfgService pluginsCfgService;

    @Autowired
    private AliSMSConfig aliSmsConfig;

    @KafkaListener(concurrency = "${messagehub.plugins.sms.kafka.concurrency:10}", properties = {"partition.assignment.strategy:org.apache.kafka.clients.consumer.CooperativeStickyAssignor"}, topicPattern = "messagehub-plugins-sms", groupId = "${messagehub.plugins.sms.kafka.group:messagehub-group-sms-aliyun}", containerFactory = "smsKafkaContainerFactory")
    public void message(ConsumerRecord<String, Object> consumerRecord) {
        log.info(Thread.currentThread().hashCode() + ",topic: " + consumerRecord.topic() + ", partition: " + consumerRecord.partition() + ", offset: " + consumerRecord.offset());
        String str = null;
        Iterator it = consumerRecord.headers().headers("sms-impl").iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Header header = (Header) it.next();
            if ("sms-impl".equals(header.key())) {
                str = new String(header.value(), StandardCharsets.UTF_8);
                break;
            }
        }
        log.info("smsImpl: " + str);
        if ("aliyun".equalsIgnoreCase(str)) {
            Object value = consumerRecord.value();
            SMSMsg sMSMsg = (SMSMsg) JSONAccessor.impl().read((String) value, SMSMsg.class);
            log.info(sMSMsg.getMsgTraceId() + " from KAFKA: " + String.valueOf(value));
            this.subsRateLimiterClient.get(Subs.SMS).acquire();
            String msgTraceId = sMSMsg.getMsgTraceId();
            MDC.put("requestId", msgTraceId);
            MDC.put("traceId", msgTraceId);
            try {
                try {
                    Stream.from(new MsgContext(msgTraceId, sMSMsg)).doFinally(str2 -> {
                        HashMap hashMap = new HashMap();
                        hashMap.put("msgTraceId", msgTraceId);
                        PluginsManager.getOrCreate().heartbeat(Subs.SMS, hashMap);
                    }).subscribe(new AliSMSSubscriber(this.kafkaReply, this.pluginsCfgService, this.aliSmsConfig));
                    MDC.remove("requestId");
                    MDC.remove("traceId");
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    MDC.remove("requestId");
                    MDC.remove("traceId");
                }
            } catch (Throwable th) {
                MDC.remove("requestId");
                MDC.remove("traceId");
                throw th;
            }
        }
    }
}
