import openai
import aiohttp
import asyncio
import json
import os
import logging
from typing import Dict, Any
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load API keys and configuration
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
LEMON_API_KEY = os.getenv("LEMON_API_KEY")
LEMON_API_URL = "https://app.xn--lemn-sqa.com/api/transactional/send"
# Configuration (could be moved to a separate config file)
EMAIL_CONFIG = {
"from_name": "Your Real Estate Platform",
"from_email": "noreply@realestateplatform.com",
"subject_template": "Hi {}, Here's How to Level Up Your Real Estate Strategy"
}
async def analyze_and_generate_email(user_name: str, user_email: str, conversation_history: str) -> str:
openai.api_key = OPENAI_API_KEY
prompt = f"""
You are an AI assistant specialized in real estate investing. Based on the following conversation, generate a personalized and engaging email for the user. Provide actionable tips that align with their goals and include a subtle call to action to upgrade to the paid plan.
User Name: {user_name}
Conversation History: {conversation_history}
Guidelines:
- Use a friendly and professional tone.
- Include 2-3 actionable tips or suggestions.
- End with a clear, compelling call to action for upgrading the user's plan.
"""
try:
response = await openai.Completion.acreate(
model="text-davinci-003",
prompt=prompt,
max_tokens=300,
temperature=0.8
)
return response.choices[0].text.strip()
except Exception as e:
logger.error(f"Error generating email content: {str(e)}")
raise
async def send_email_via_lemon(user_name: str, user_email: str, subject: str, body: str) -> None:
headers = {
"Content-Type": "application/json",
"X-Auth-APIKey": LEMON_API_KEY
}
payload = {
"fromname": EMAIL_CONFIG["from_name"],
"fromemail": EMAIL_CONFIG["from_email"],
"to": user_email,
"subject": subject,
"body": f"<html><body>{body}</body></html>"
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(LEMON_API_URL, headers=headers, json=payload) as response:
if response.status == 200:
logger.info(f"Email successfully sent to {user_email}")
else:
logger.error(f"Failed to send email. Status Code: {response.status}, Response: {await response.text()}")
except Exception as e:
logger.error(f"Error sending email: {str(e)}")
raise
async def end_of_chat_webhook(user_name: str, user_email: str, conversation_history: str) -> None:
try:
email_content = await analyze_and_generate_email(user_name, user_email, conversation_history)
subject = EMAIL_CONFIG["subject_template"].format(user_name)
await send_email_via_lemon(user_name, user_email, subject, email_content)
except Exception as e:
logger.error(f"Error in end_of_chat_webhook: {str(e)}")
async def main() -> None:
# Example conversation data
user_name = "Olivia"
user_email = "olivia@example.com"
conversation_history = """
Olivia: I'm interested in real estate investing but not sure where to start.
AI: Have you considered researching local markets to understand property trends?
Olivia: Not yet. I also want to know how to build a lead generation strategy.
"""
# Trigger the webhook after chat ends
await end_of_chat_webhook(user_name, user_email, conversation_history)
if __name__ == "__main__":
asyncio.run(main())