140 lines
4.9 KiB
Python
140 lines
4.9 KiB
Python
import logging
|
|
|
|
import requests
|
|
from django.db import connections
|
|
from django.utils import timezone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VibnUmamiService:
|
|
"""
|
|
Vibn Umami Bridge Service.
|
|
Supports querying Umami analytics via either its official REST API or
|
|
performing direct, high-performance database JOINs against your self-hosted Postgres tables.
|
|
"""
|
|
|
|
def __init__(self, api_url=None, token=None, website_id=None):
|
|
self.api_url = api_url or "https://analytics.vibnai.com/api"
|
|
self.token = token
|
|
self.website_id = website_id
|
|
|
|
# ── DRIVER 1: REST API INTEGRATION ────────────────────────────────────────
|
|
|
|
def _get_headers(self):
|
|
return {
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
def fetch_website_stats(self, start_at, end_at):
|
|
"""
|
|
Query website aggregated statistics (pageviews, visitors, bounce_rate)
|
|
via Umami REST API.
|
|
"""
|
|
if not self.website_id or not self.token:
|
|
logger.warning("Umami API connection details missing.")
|
|
return None
|
|
|
|
url = f"{self.api_url}/websites/{self.website_id}/stats"
|
|
params = {
|
|
"startAt": int(start_at.timestamp() * 1000),
|
|
"endAt": int(end_at.timestamp() * 1000),
|
|
}
|
|
try:
|
|
response = requests.get(
|
|
url, headers=self._get_headers(), params=params, timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch Umami API stats: {e}")
|
|
return None
|
|
|
|
# ── DRIVER 2: DIRECT POSTGRESQL READ-ONLY QUERIES ─────────────────────────
|
|
|
|
def get_user_session_click_trail(self, user_id, db_connection_name="umami_db"):
|
|
"""
|
|
Retrieves raw browsing timeline for a identified user directly from
|
|
the self-hosted Umami DB instance using Django cross-database routing.
|
|
"""
|
|
if db_connection_name not in connections:
|
|
logger.warning(
|
|
f"Database connection '{db_connection_name}' is not configured."
|
|
)
|
|
return []
|
|
|
|
query = """
|
|
SELECT
|
|
we.created_at,
|
|
we.event_name,
|
|
we.url_path,
|
|
we.url_query,
|
|
s.device,
|
|
s.browser,
|
|
s.country
|
|
FROM website_event we
|
|
JOIN session s ON we.session_id = s.session_id
|
|
WHERE s.user_id = %s
|
|
ORDER BY we.created_at DESC
|
|
LIMIT 50;
|
|
"""
|
|
try:
|
|
with connections[db_connection_name].cursor() as cursor:
|
|
cursor.execute(query, [str(user_id)])
|
|
rows = cursor.fetchall()
|
|
|
|
trail = []
|
|
for r in rows:
|
|
trail.append(
|
|
{
|
|
"timestamp": r[0].isoformat()
|
|
if hasattr(r[0], "isoformat")
|
|
else r[0],
|
|
"event": r[1],
|
|
"path": r[2],
|
|
"query": r[3],
|
|
"device": r[4],
|
|
"browser": r[5],
|
|
"country": r[6],
|
|
}
|
|
)
|
|
return trail
|
|
except Exception as e:
|
|
logger.error(f"Direct Umami DB query failed: {e}")
|
|
return []
|
|
|
|
def get_aggregated_funnel_for_user(self, user_id, db_connection_name="umami_db"):
|
|
"""
|
|
Retrieves high-level counts for pageviews, unique sessions, and first-seen dates
|
|
directly from the raw Umami tables.
|
|
"""
|
|
if db_connection_name not in connections:
|
|
return None
|
|
|
|
query = """
|
|
SELECT
|
|
COUNT(*),
|
|
COUNT(DISTINCT we.session_id),
|
|
MIN(we.created_at)
|
|
FROM website_event we
|
|
JOIN session s ON we.session_id = s.session_id
|
|
WHERE s.user_id = %s;
|
|
"""
|
|
try:
|
|
with connections[db_connection_name].cursor() as cursor:
|
|
cursor.execute(query, [str(user_id)])
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return {
|
|
"total_interactions": row[0],
|
|
"total_sessions": row[1],
|
|
"first_seen_at": row[2].isoformat()
|
|
if hasattr(row[2], "isoformat")
|
|
else row[2],
|
|
}
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch aggregated user funnel: {e}")
|
|
return None
|