package com.saas.shared.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.saas.shared.core.TenantContext;
import com.saas.shared.event.WebhookEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * Webhook Event Consumer - Processes VoIP webhook events asynchronously
 * 
 * Benefits:
 * - Fast webhook response (< 100ms)
 * - Decoupled processing
 * - Automatic retry on failure
 * - Dead letter queue for failed messages
 */
@Component
@Slf4j
public class WebhookEventConsumer {

    private final ObjectMapper objectMapper;

    public WebhookEventConsumer(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    /**
     * Consume webhook events from voip-webhook-events topic
     * 
     * Processing flow:
     * 1. Receive event
     * 2. Set tenant context
     * 3. Process based on provider
     * 4. Acknowledge (manual commit)
     * 5. On failure: retry or DLQ
     */
    @KafkaListener(
            topics = "voip-webhook-events",
            groupId = "saas-voip-group",
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void consumeWebhookEvent(
            @Payload String message,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment acknowledgment) {
        
        try {
            WebhookEvent event = objectMapper.readValue(message, WebhookEvent.class);
            
            log.info("📥 Consuming webhook event: eventId={}, provider={}, type={}, partition={}, offset={}", 
                    event.getEventId(), event.getProvider(), event.getEventType(), partition, offset);
            
            // Set tenant context for multi-tenancy
            if (event.getTenantId() != null) {
                TenantContext.setTenantId(event.getTenantId());
            }
            
            // Process based on provider
            processWebhookEvent(event);
            
            // Manual acknowledgment (ensures at-least-once delivery)
            acknowledgment.acknowledge();
            
            log.info("✅ Webhook event processed successfully: eventId={}", event.getEventId());
            
        } catch (Exception e) {
            log.error("❌ Failed to process webhook event at partition={}, offset={}", 
                    partition, offset, e);
            
            // TODO: Implement retry logic or send to DLQ
            // For now, acknowledge to avoid blocking the queue
            acknowledgment.acknowledge();
            
        } finally {
            TenantContext.clear();
        }
    }

    /**
     * Process webhook event based on provider type
     */
    private void processWebhookEvent(WebhookEvent event) {
        switch (event.getProvider()) {
            case TWILIO:
                processTwilioWebhook(event);
                break;
            case TELNYX:
                processTelnyxWebhook(event);
                break;
            case ZIWO:
                processZiwoWebhook(event);
                break;
            case VAPI_AI:
                processVapiWebhook(event);
                break;
            default:
                log.warn("⚠️ Unknown provider: {}", event.getProvider());
        }
    }

    private void processTwilioWebhook(WebhookEvent event) {
        log.debug("Processing Twilio webhook: type={}, callId={}", 
                event.getEventType(), event.getCallId());
        
        // TODO: Delegate to TwilioWebhookProcessor service
        // - Save call data to tenant DB
        // - Update call status
        // - Trigger follow-up actions (SMS, notifications)
    }

    private void processTelnyxWebhook(WebhookEvent event) {
        log.debug("Processing Telnyx webhook: type={}, callId={}", 
                event.getEventType(), event.getCallId());
        
        // TODO: Delegate to TelnyxWebhookProcessor service
    }

    private void processZiwoWebhook(WebhookEvent event) {
        log.debug("Processing Ziwo webhook: type={}, callId={}", 
                event.getEventType(), event.getCallId());
        
        // TODO: Delegate to ZiwoWebhookProcessor service
    }

    private void processVapiWebhook(WebhookEvent event) {
        log.debug("Processing Vapi.ai webhook: type={}, callId={}", 
                event.getEventType(), event.getCallId());
        
        // TODO: Delegate to VapiWebhookProcessor service
    }
}
