Files

140 lines
4.9 KiB
Python

import logging
import requests
from django.db import connections
from django.utils import timezone
logger = logging.getLogger(__name__)
class VibnUmamiService:
"""
Vibn Umami Bridge Service.
Supports querying Umami analytics via either its official REST API or
performing direct, high-performance database JOINs against your self-hosted Postgres tables.
"""
def __init__(self, api_url=None, token=None, website_id=None):
self.api_url = api_url or "https://analytics.vibnai.com/api"
self.token = token
self.website_id = website_id
# ── DRIVER 1: REST API INTEGRATION ────────────────────────────────────────
def _get_headers(self):
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
}
def fetch_website_stats(self, start_at, end_at):
"""
Query website aggregated statistics (pageviews, visitors, bounce_rate)
via Umami REST API.
"""
if not self.website_id or not self.token:
logger.warning("Umami API connection details missing.")
return None
url = f"{self.api_url}/websites/{self.website_id}/stats"
params = {
"startAt": int(start_at.timestamp() * 1000),
"endAt": int(end_at.timestamp() * 1000),
}
try:
response = requests.get(
url, headers=self._get_headers(), params=params, timeout=10
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Failed to fetch Umami API stats: {e}")
return None
# ── DRIVER 2: DIRECT POSTGRESQL READ-ONLY QUERIES ─────────────────────────
def get_user_session_click_trail(self, user_id, db_connection_name="umami_db"):
"""
Retrieves raw browsing timeline for a identified user directly from
the self-hosted Umami DB instance using Django cross-database routing.
"""
if db_connection_name not in connections:
logger.warning(
f"Database connection '{db_connection_name}' is not configured."
)
return []
query = """
SELECT
we.created_at,
we.event_name,
we.url_path,
we.url_query,
s.device,
s.browser,
s.country
FROM website_event we
JOIN session s ON we.session_id = s.session_id
WHERE s.user_id = %s
ORDER BY we.created_at DESC
LIMIT 50;
"""
try:
with connections[db_connection_name].cursor() as cursor:
cursor.execute(query, [str(user_id)])
rows = cursor.fetchall()
trail = []
for r in rows:
trail.append(
{
"timestamp": r[0].isoformat()
if hasattr(r[0], "isoformat")
else r[0],
"event": r[1],
"path": r[2],
"query": r[3],
"device": r[4],
"browser": r[5],
"country": r[6],
}
)
return trail
except Exception as e:
logger.error(f"Direct Umami DB query failed: {e}")
return []
def get_aggregated_funnel_for_user(self, user_id, db_connection_name="umami_db"):
"""
Retrieves high-level counts for pageviews, unique sessions, and first-seen dates
directly from the raw Umami tables.
"""
if db_connection_name not in connections:
return None
query = """
SELECT
COUNT(*),
COUNT(DISTINCT we.session_id),
MIN(we.created_at)
FROM website_event we
JOIN session s ON we.session_id = s.session_id
WHERE s.user_id = %s;
"""
try:
with connections[db_connection_name].cursor() as cursor:
cursor.execute(query, [str(user_id)])
row = cursor.fetchone()
if row:
return {
"total_interactions": row[0],
"total_sessions": row[1],
"first_seen_at": row[2].isoformat()
if hasattr(row[2], "isoformat")
else row[2],
}
return None
except Exception as e:
logger.error(f"Failed to fetch aggregated user funnel: {e}")
return None