import os, requests, time
from google.ads.googleads.client import GoogleAdsClient
MV = os.environ["MAVERA_API_KEY"]
CUSTOMER_ID = os.environ["GOOGLE_ADS_CUSTOMER_ID"]
MH = {"Authorization": f"Bearer {MV}", "Content-Type": "application/json"}
MB = "https://app.mavera.io/api/v1"
client = GoogleAdsClient.load_from_env()
ga_service = client.get_service("GoogleAdsService")
query = """
SELECT
ad_group_ad.ad.responsive_search_ad.headlines,
ad_group_ad.ad.responsive_search_ad.descriptions,
ad_group_ad.ad.final_urls,
metrics.impressions,
metrics.clicks,
metrics.conversions,
metrics.ctr
FROM ad_group_ad
WHERE ad_group_ad.ad.type = RESPONSIVE_SEARCH_AD
AND segments.date DURING LAST_30_DAYS
AND metrics.conversions > 0
ORDER BY metrics.conversions DESC
LIMIT 10
"""
response = ga_service.search(customer_id=CUSTOMER_ID, query=query)
ads = []
for row in response:
rsa = row.ad_group_ad.ad.responsive_search_ad
headlines = [asset.text for asset in rsa.headlines]
descriptions = [asset.text for asset in rsa.descriptions]
ads.append({
"headlines": headlines,
"descriptions": descriptions,
"impressions": row.metrics.impressions,
"clicks": row.metrics.clicks,
"conversions": row.metrics.conversions,
"ctr": row.metrics.ctr,
})
PERSONA_IDS = os.environ.get("FOCUS_GROUP_PERSONA_IDS", "").split(",")
if not PERSONA_IDS[0]:
for name, desc in [
("SaaS CMO", "Chief Marketing Officer at a 200-person SaaS company. Budget-conscious, ROI-driven."),
("Growth Marketer", "Mid-level growth marketer. Hands-on, cares about tooling and workflow speed."),
("Enterprise IT Buyer", "IT Director evaluating tools for security, compliance, and integration depth."),
]:
p = requests.post(f"{MB}/personas", headers=MH, json={"name": name, "description": desc}).json()
PERSONA_IDS.append(p["id"])
time.sleep(0.3)
ad_context = "\n\n".join(
f"Ad {i+1} (CTR: {a['ctr']:.2%}, Conv: {a['conversions']:.0f}):\n"
f"Headlines: {' | '.join(a['headlines'][:5])}\n"
f"Descriptions: {' | '.join(a['descriptions'][:3])}"
for i, a in enumerate(ads[:5])
)
fg = requests.post(f"{MB}/focus-groups", headers=MH, json={
"name": "Google Ads Copy Validation",
"persona_ids": [pid for pid in PERSONA_IDS if pid],
"questions": [
"Which ad headline would make you click? Why?",
"Which description feels most credible to you?",
"What's missing from these ads that would make you convert?",
"Rank the 5 ads from most to least compelling. Explain your #1 and #5.",
],
"context": f"Here are 5 top-performing Google Ads from our account:\n\n{ad_context}",
"responses_per_persona": 2,
}).json()
for _ in range(20):
time.sleep(5)
data = requests.get(f"{MB}/focus-groups/{fg['id']}", headers=MH).json()
if data.get("status") == "completed":
break
for resp in data.get("responses", []):
print(f"[{resp.get('persona_id','?')}] {resp.get('question','')[:50]}")
print(f" → {resp.get('answer','')[:250]}\n")