#!/usr/bin/env python3 """ SendGrid Live Test - Direct API test without Mailpit """ import os import sys import asyncio import uuid from datetime import datetime # Add backend to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'backend')) async def test_sendgrid_direct(): """Test SendGrid directly using the API key from environment.""" # Get SendGrid API key from environment sendgrid_api_key = os.getenv("SENDGRID_API_KEY") if not sendgrid_api_key: print("āŒ SENDGRID_API_KEY not found in environment") return False print(f"āœ… SendGrid API key found (length: {len(sendgrid_api_key)})") try: from sendgrid import SendGridAPIClient from sendgrid.helpers.mail import Mail, Content # Create test email test_id = str(uuid.uuid4())[:8] test_email = f"sf-test-{test_id}@example.com" # Using example.com for test # Create email message message = Mail( from_email="test@servicefinder.hu", to_emails=test_email, subject=f"SendGrid Live Test - {test_id}", html_content=f"""

SendGrid Live Fire Test

Test ID: {test_id}

Timestamp: {datetime.utcnow().isoformat()}

This is a test email to verify SendGrid integration is working.

If you receive this, SendGrid is properly configured and sending emails.

""" ) # Send email print(f"šŸ“§ Sending test email to: {test_email}") sg = SendGridAPIClient(sendgrid_api_key) response = sg.send(message) print(f"āœ… Email sent! Status code: {response.status_code}") print(f"Response headers: {response.headers}") # Check response if response.status_code in [200, 202]: print("\nšŸŽ‰ SUCCESS: SendGrid API accepted the email!") print("Note: Email sent to example.com (not real inbox)") print("For full live test, use Mail7.io with real disposable email") return True else: print(f"āŒ SendGrid returned error: {response.status_code}") print(f"Response body: {response.body}") return False except Exception as e: print(f"āŒ Error testing SendGrid: {e}") import traceback traceback.print_exc() return False async def test_email_service(): """Test using the EmailService with SendGrid provider.""" print("\n" + "="*60) print("Testing EmailService with SendGrid configuration") print("="*60) try: # Temporarily set environment to use SendGrid os.environ["EMAIL_PROVIDER"] = "sendgrid" from app.services.email_manager import EmailManager from app.db.session import AsyncSessionLocal test_id = str(uuid.uuid4())[:8] test_email = f"sf-service-test-{test_id}@example.com" print(f"Testing EmailService with recipient: {test_email}") variables = { "first_name": "TestUser", "link": f"https://servicefinder.hu/verify?token=TEST-{test_id}", "token": f"TEST-{test_id}", } async with AsyncSessionLocal() as db: result = await EmailManager.send_email( recipient=test_email, template_key="verification", variables=variables, lang="en", db=db ) print(f"EmailService result: {result}") if result and result.get("status") == "success": print("āœ… EmailService sent email successfully") return True else: print("āŒ EmailService failed to send email") return False except Exception as e: print(f"āŒ Error testing EmailService: {e}") import traceback traceback.print_exc() return False async def main(): """Run all tests.""" print("šŸš€ Starting SendGrid Live Fire Tests") print("="*60) # Test 1: Direct SendGrid API print("\n1. Testing Direct SendGrid API...") direct_success = await test_sendgrid_direct() # Test 2: EmailService print("\n2. Testing EmailService integration...") service_success = await test_email_service() # Summary print("\n" + "="*60) print("TEST SUMMARY") print("="*60) print(f"Direct SendGrid API: {'āœ… PASS' if direct_success else 'āŒ FAIL'}") print(f"EmailService Integration: {'āœ… PASS' if service_success else 'āŒ FAIL'}") if direct_success: print("\nšŸŽ‰ SendGrid is properly configured and can send emails!") print("For complete live delivery verification:") print("1. Get Mail7.io API credentials") print("2. Update tests/fire_drill_email.py with MAIL7_API_KEY/SECRET") print("3. Run: python tests/fire_drill_email.py") else: print("\nāŒ SendGrid configuration issues detected") print("Check SENDGRID_API_KEY environment variable") return direct_success and service_success if __name__ == "__main__": success = asyncio.run(main()) sys.exit(0 if success else 1)