import os, csv, io, zipfile, requests, time
from collections import defaultdict
QT = os.environ["QUALTRICS_TOKEN"]
DC = os.environ["QUALTRICS_DC"]
MV = os.environ["MAVERA_API_KEY"]
Q_BASE = f"https://{DC}.qualtrics.com/API/v3"
MB = "https://app.mavera.io/api/v1"
Q_H = {"X-API-TOKEN": QT, "Content-Type": "application/json"}
MV_H = {"Authorization": f"Bearer {MV}", "Content-Type": "application/json"}
SURVEY_ID = os.environ.get("BRAND_TRACKER_ID", "SV_xxxxx")
# 1. Export responses (async)
export = requests.post(f"{Q_BASE}/surveys/{SURVEY_ID}/export-responses",
headers=Q_H, json={"format": "csv"}).json()
progress_id = export["result"]["progressId"]
file_id = None
for _ in range(60):
time.sleep(5)
status = requests.get(
f"{Q_BASE}/surveys/{SURVEY_ID}/export-responses/{progress_id}",
headers=Q_H).json()
if status["result"].get("percentComplete") == 100:
file_id = status["result"]["fileId"]
break
zip_data = requests.get(
f"{Q_BASE}/surveys/{SURVEY_ID}/export-responses/{file_id}/file",
headers=Q_H).content
with zipfile.ZipFile(io.BytesIO(zip_data)) as zf:
csv_name = [n for n in zf.namelist() if n.endswith(".csv")][0]
with zf.open(csv_name) as f:
reader = csv.DictReader(io.TextIOWrapper(f, encoding="utf-8-sig"))
rows = [r for r in reader if r.get("Finished") == "1"]
print(f"Brand tracker: {len(rows)} responses")
# 2. Extract brand metrics
BRANDS = os.environ.get("TRACKED_BRANDS", "OurBrand,CompetitorA,CompetitorB,CompetitorC").split(",")
METRICS = ["Awareness", "Consideration", "Preference", "NPS"]
brand_data = defaultdict(lambda: defaultdict(list))
for row in rows:
quarter = row.get("RecordedDate", "")[:7]
for brand in BRANDS:
for metric in METRICS:
col_variants = [
f"{brand}_{metric}", f"{metric}_{brand}",
f"Q_{brand}_{metric}", f"{brand} - {metric}",
]
for col in col_variants:
val = row.get(col, "").strip()
if val:
try:
brand_data[brand][f"{metric}_{quarter}"].append(float(val))
except ValueError:
brand_data[brand][f"{metric}_{quarter}"].append(val)
break
# 3. Build brand comparison summary
summary_parts = []
for brand in BRANDS:
brand_summary = f"### {brand}\n"
metrics_by_type = defaultdict(dict)
for key, values in sorted(brand_data[brand].items()):
metric, quarter = key.rsplit("_", 1)
nums = [v for v in values if isinstance(v, (int, float))]
if nums:
metrics_by_type[metric][quarter] = sum(nums) / len(nums)
for metric, quarters in metrics_by_type.items():
trend = " → ".join(f"{q}: {v:.1f}" for q, v in sorted(quarters.items()))
brand_summary += f" {metric}: {trend}\n"
summary_parts.append(brand_summary)
brand_summary = "\n".join(summary_parts)
if not brand_summary.strip():
brand_summary = f"Raw column sample: {', '.join(list(rows[0].keys())[:20])}"
print("Warning: Could not auto-detect brand metric columns. Check column naming.")
# 4. Mave competitive analysis with web search
analysis = requests.post(f"{MB}/mave/chat", headers=MV_H, json={
"message": f"""Analyze this brand tracking data and research the underlying market dynamics.
BRAND TRACKING DATA ({len(rows)} respondents):
{brand_summary[:4000]}
Tasks:
1) Identify significant shifts in brand perception — which brands gained/lost and by how much
2) Research recent market events that explain these shifts (product launches, funding, PR, partnerships)
3) Competitive positioning analysis — where does each brand sit in the market map?
4) Identify emerging threats not yet reflected in tracking data
5) Recommend strategic responses to competitive movements
6) Predict next quarter's brand perception based on current trajectories
Use web search to ground your analysis in real market events."""
}).json()
print(f"\n{'='*60}")
print(f"COMPETITIVE INTELLIGENCE REPORT")
print(f"{'='*60}")
print(analysis.get("content", "")[:3000])
print(f"\nSources: {len(analysis.get('sources', []))}")
for src in analysis.get("sources", [])[:5]:
print(f" - {src.get('title', src.get('url', ''))}")