package com.saas.shared.service;

import com.saas.shared.event.CallDataEvent;
import com.saas.shared.event.WebhookEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/**
 * Event Publisher Service - Publishes events to Kafka topics
 * 
 * Usage in webhook controllers:
 * eventPublisher.publishWebhookEvent(webhookEvent);
 * return ResponseEntity.ok("Accepted"); // Fast 200 OK
 */
@Service
@Slf4j
public class EventPublisherService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    private static final String WEBHOOK_TOPIC = "voip-webhook-events";
    private static final String CALL_DATA_TOPIC = "call-data-events";
    private static final String VAPI_WEBHOOK_TOPIC = "vapi-webhook-events";

    public EventPublisherService(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * Publish generic webhook event (Twilio, Telnyx, Ziwo, Retell)
     * 
     * @param event WebhookEvent with provider-specific payload
     * @return Future for async tracking
     */
    public CompletableFuture<SendResult<String, Object>> publishWebhookEvent(WebhookEvent event) {
        if (event.getEventId() == null) {
            event.setEventId(UUID.randomUUID().toString());
        }
        
        log.info("📤 Publishing webhook event: eventId={}, provider={}, type={}, tenantId={}", 
                event.getEventId(), event.getProvider(), event.getEventType(), event.getTenantId());
        
        CompletableFuture<SendResult<String, Object>> future = 
                kafkaTemplate.send(WEBHOOK_TOPIC, event.getCallId(), event);
        
        future.whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("❌ Failed to publish webhook event: {}", event.getEventId(), ex);
            } else {
                log.debug("✅ Webhook event published successfully: partition={}, offset={}", 
                        result.getRecordMetadata().partition(), 
                        result.getRecordMetadata().offset());
            }
        });
        
        return future;
    }

    /**
     * Publish Vapi.ai specific webhook event
     * 
     * @param event WebhookEvent for Vapi.ai webhooks
     * @return Future for async tracking
     */
    public CompletableFuture<SendResult<String, Object>> publishVapiWebhookEvent(WebhookEvent event) {
        if (event.getEventId() == null) {
            event.setEventId(UUID.randomUUID().toString());
        }
        
        log.info("📤 Publishing Vapi webhook event: eventId={}, type={}, tenantId={}", 
                event.getEventId(), event.getEventType(), event.getTenantId());
        
        CompletableFuture<SendResult<String, Object>> future = 
                kafkaTemplate.send(VAPI_WEBHOOK_TOPIC, event.getCallId(), event);
        
        future.whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("❌ Failed to publish Vapi webhook event: {}", event.getEventId(), ex);
            } else {
                log.debug("✅ Vapi webhook event published successfully");
            }
        });
        
        return future;
    }

    /**
     * Publish call data event for dual-save (Admin DB + Tenant DB)
     * 
     * @param event CallDataEvent with call details
     * @return Future for async tracking
     */
    public CompletableFuture<SendResult<String, Object>> publishCallDataEvent(CallDataEvent event) {
        if (event.getEventId() == null) {
            event.setEventId(UUID.randomUUID().toString());
        }
        
        log.info("📤 Publishing call data event: eventId={}, tenantId={}, callSid={}", 
                event.getEventId(), event.getTenantId(), event.getCallSid());
        
        CompletableFuture<SendResult<String, Object>> future = 
                kafkaTemplate.send(CALL_DATA_TOPIC, event.getTenantId(), event);
        
        future.whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("❌ Failed to publish call data event: {}", event.getEventId(), ex);
            } else {
                log.debug("✅ Call data event published successfully");
            }
        });
        
        return future;
    }

    /**
     * Publish event synchronously (for critical operations)
     * 
     * @param topic Kafka topic
     * @param key Message key (for partitioning)
     * @param event Event object
     */
    public void publishSync(String topic, String key, Object event) {
        try {
            SendResult<String, Object> result = kafkaTemplate.send(topic, key, event).get();
            log.info("✅ Event published synchronously to topic={}, partition={}, offset={}", 
                    topic, 
                    result.getRecordMetadata().partition(), 
                    result.getRecordMetadata().offset());
        } catch (Exception e) {
            log.error("❌ Failed to publish event synchronously to topic={}", topic, e);
            throw new RuntimeException("Failed to publish event", e);
        }
    }
}
