diff --git a/.env.example b/.env.example index 2a03494..3f25b3d 100644 --- a/.env.example +++ b/.env.example @@ -9,7 +9,11 @@ DATABASE_URL=sqlite:///./workmate.db # AI / LLM GEMINI_API_KEY=your-gemini-api-key -VOYAGE_API_KEY=your-voyageai-api-key +VOYAGE_API_KEY=your-voyageai-api-key -# Notion (direct API access — optional, JSON import works without this) +# Notion NOTION_TOKEN=your-notion-api-token +NOTION_OAUTH_CLIENT_ID=your-notion-oauth-client-id +NOTION_OAUTH_CLIENT_SECRET=your-notion-oauth-client-secret +NOTION_REDIRECT_URI=http://localhost:8000/api/notion/callback +NOTION_ENCRYPTION_KEY=your-fernet-encryption-key diff --git a/frontend/src/app/components/Sidebar.tsx b/frontend/src/app/components/Sidebar.tsx index b2fc007..f2d1a65 100644 --- a/frontend/src/app/components/Sidebar.tsx +++ b/frontend/src/app/components/Sidebar.tsx @@ -18,16 +18,10 @@ import { Card } from './ui/card'; import { Button } from './ui/button'; import { useAuth } from '../contexts/AuthContext'; import { useIsAdmin } from '../../hooks/useIsAdmin'; -import { listConversations, deleteConversation, renameConversation } from '../../services/api'; +import { listConversations, deleteConversation, renameConversation, getWorkspaces } from '../../services/api'; import { toast } from 'sonner'; import type { ConversationSummary, NotionWorkspace } from '../../types/chat'; -const mockWorkspaces: NotionWorkspace[] = [ - { id: '1', name: 'Product Requirements', pageCount: 24, connected: true }, - { id: '2', name: 'Engineering Docs', pageCount: 156, connected: true }, - { id: '3', name: 'Team Wiki', pageCount: 89, connected: true }, -]; - interface SidebarProps { activeConversationId: number | null; onSelectConversation: (id: number | null, title?: string) => void; @@ -63,11 +57,15 @@ export function Sidebar({ const [searchQuery, setSearchQuery] = useState(''); const [editingId, setEditingId] = useState(null); const [editingTitle, setEditingTitle] = useState(''); + const [workspaces, setWorkspaces] = useState([]); useEffect(() => { listConversations() .then(setConversations) .catch((err) => console.error('Failed to load conversations:', err)); + getWorkspaces() + .then(setWorkspaces) + .catch(() => {}); // Silently fail if no workspaces }, [refreshKey]); const handleDelete = async (e: React.MouseEvent, id: number) => { @@ -270,25 +268,41 @@ export function Sidebar({

Connected Workspaces

- {mockWorkspaces.map((workspace) => ( - -
-
-
-
-

{workspace.name}

+ Connect a Notion workspace + + ) : ( + workspaces.map((workspace) => ( + + +
+
+
+
+

+ {workspace.workspace_name} +

+
+

+ {workspace.sync_status === 'syncing' ? 'Syncing...' : + workspace.last_synced_at ? `Synced ${formatTime(workspace.last_synced_at)}` : 'Pending sync'} +

-

- {workspace.pageCount} pages indexed -

+
- -
-
- ))} + + + )) + )}
diff --git a/frontend/src/app/pages/SettingsPage.tsx b/frontend/src/app/pages/SettingsPage.tsx index 2ad2509..92a5f6a 100644 --- a/frontend/src/app/pages/SettingsPage.tsx +++ b/frontend/src/app/pages/SettingsPage.tsx @@ -1,6 +1,6 @@ -import { useState } from 'react'; -import { useNavigate } from 'react-router-dom'; -import { Sun, Moon, Monitor, Link2 } from 'lucide-react'; +import { useState, useEffect, useRef } from 'react'; +import { useNavigate, useSearchParams } from 'react-router-dom'; +import { Sun, Moon, Monitor, Link2, RefreshCw, Unplug, Plus, Loader2 } from 'lucide-react'; import { toast } from 'sonner'; import { Button } from '../components/ui/button'; import { Card } from '../components/ui/card'; @@ -20,14 +20,125 @@ import { import { useAuth } from '../contexts/AuthContext'; import { useTheme } from 'next-themes'; import { updateProfile, deleteAccount } from '../../services/auth'; +import { getNotionAuthUrl, getWorkspaces, disconnectWorkspace, syncWorkspace } from '../../services/api'; +import type { NotionWorkspace } from '../../types/chat'; export function SettingsPage() { const { user, logout, updateUser } = useAuth(); const { theme, setTheme } = useTheme(); const navigate = useNavigate(); + const [searchParams, setSearchParams] = useSearchParams(); const [name, setName] = useState(user?.name ?? ''); const [saving, setSaving] = useState(false); + const [workspaces, setWorkspaces] = useState([]); + const [loadingWorkspaces, setLoadingWorkspaces] = useState(true); + const [connecting, setConnecting] = useState(false); + const [syncingId, setSyncingId] = useState(null); + + useEffect(() => { + loadWorkspaces(); + // Handle redirect from Notion OAuth callback + if (searchParams.get('notion') === 'connected') { + toast.success('Notion workspace connected! Ingestion is running in the background.'); + searchParams.delete('notion'); + setSearchParams(searchParams, { replace: true }); + } + }, []); // eslint-disable-line react-hooks/exhaustive-deps + + // Poll for status updates while any workspace is syncing + const isSyncingRef = useRef(false); + isSyncingRef.current = workspaces.some((w) => w.sync_status === 'syncing'); + + useEffect(() => { + if (!isSyncingRef.current) return; + + const poll = () => { + getWorkspaces().then((ws) => { + setWorkspaces(ws); + if (!ws.some((w) => w.sync_status === 'syncing')) { + const failed = ws.find((w) => w.sync_status === 'error'); + if (failed) { + toast.error(`Sync failed for ${failed.workspace_name}`); + } else { + toast.success('Sync complete!'); + } + } + }).catch(() => {}); + }; + + // Poll immediately, then every 5 seconds + const timeout = setTimeout(poll, 2000); + const interval = setInterval(poll, 5000); + + return () => { + clearTimeout(timeout); + clearInterval(interval); + }; + }, [isSyncingRef.current]); // eslint-disable-line react-hooks/exhaustive-deps + + const loadWorkspaces = async () => { + try { + const ws = await getWorkspaces(); + setWorkspaces(ws); + } catch { + // Silently fail — user may not have any workspaces + } finally { + setLoadingWorkspaces(false); + } + }; + + const handleConnectNotion = async () => { + setConnecting(true); + try { + const url = await getNotionAuthUrl(); + window.location.href = url; + } catch { + toast.error('Failed to initiate Notion connection'); + setConnecting(false); + } + }; + + const handleDisconnect = async (wsId: number) => { + try { + await disconnectWorkspace(wsId); + setWorkspaces((prev) => prev.filter((w) => w.id !== wsId)); + toast.success('Workspace disconnected'); + } catch { + toast.error('Failed to disconnect workspace'); + } + }; + + const handleSync = async (wsId: number) => { + setSyncingId(wsId); + try { + const result = await syncWorkspace(wsId); + if (result.status === 'already_syncing') { + toast.info('Sync is already in progress'); + } else { + toast.success('Sync started — this may take a few minutes'); + setWorkspaces((prev) => + prev.map((w) => (w.id === wsId ? { ...w, sync_status: 'syncing' } : w)) + ); + } + } catch { + toast.error('Failed to start sync'); + } finally { + setSyncingId(null); + } + }; + + const formatTimeAgo = (dateStr: string | null) => { + if (!dateStr) return 'Never'; + const diff = Date.now() - new Date(dateStr).getTime(); + const mins = Math.floor(diff / 60000); + if (mins < 1) return 'Just now'; + if (mins < 60) return `${mins}m ago`; + const hours = Math.floor(mins / 60); + if (hours < 24) return `${hours}h ago`; + const days = Math.floor(hours / 24); + return `${days}d ago`; + }; const handleSaveName = async () => { const trimmed = name.trim(); @@ -147,17 +258,95 @@ export function SettingsPage() {

Connected Accounts

-
-
- -
-

Notion

-

Coming soon

+
+ {loadingWorkspaces ? ( +
+
-
- + ) : ( + <> + {workspaces.map((ws) => ( +
+
+ +
+

+ {ws.workspace_name} +

+

+ Connected {formatTimeAgo(ws.connected_at)} + {ws.last_synced_at && ` · Last synced ${formatTimeAgo(ws.last_synced_at)}`} + {ws.sync_status === 'syncing' && ( + · Syncing... + )} + {ws.sync_status === 'error' && ( + · Sync failed + )} +

+
+
+
+ + + + + + + + Disconnect {ws.workspace_name}? + + This will remove your connection to this Notion workspace. + If no other users are connected, the workspace data will also be removed. + + + + Cancel + handleDisconnect(ws.id)} + className="bg-red-600 hover:bg-red-700 text-white" + > + Disconnect + + + + +
+
+ ))} + + + )}
diff --git a/frontend/src/services/api.ts b/frontend/src/services/api.ts index 81acd30..0c9c760 100644 --- a/frontend/src/services/api.ts +++ b/frontend/src/services/api.ts @@ -213,14 +213,38 @@ export async function uploadFiles(files: File[]): Promise { return res.json(); } -/** - * Fetch connected Notion workspaces. - * Currently returns mock data. - */ +// --- Notion workspace endpoints --- + +export async function getNotionAuthUrl(): Promise { + const res = await fetch(`${BASE_URL}/notion/connect`, { + headers: authHeaders(), + }); + if (!res.ok) throw new Error('Failed to get Notion auth URL'); + const data = await res.json(); + return data.authorization_url; +} + export async function getWorkspaces(): Promise { - return [ - { id: '1', name: 'Product Requirements', pageCount: 24, connected: true }, - { id: '2', name: 'Engineering Docs', pageCount: 156, connected: true }, - { id: '3', name: 'Team Wiki', pageCount: 89, connected: true }, - ]; + const res = await fetch(`${BASE_URL}/notion/workspaces`, { + headers: authHeaders(), + }); + if (!res.ok) throw new Error('Failed to fetch workspaces'); + return res.json(); +} + +export async function disconnectWorkspace(workspaceId: number): Promise { + const res = await fetch(`${BASE_URL}/notion/workspaces/${workspaceId}`, { + method: 'DELETE', + headers: authHeaders(), + }); + if (!res.ok) throw new Error('Failed to disconnect workspace'); +} + +export async function syncWorkspace(workspaceId: number): Promise<{ status: string }> { + const res = await fetch(`${BASE_URL}/notion/workspaces/${workspaceId}/sync`, { + method: 'POST', + headers: authHeaders(), + }); + if (!res.ok) throw new Error('Failed to sync workspace'); + return res.json(); } diff --git a/frontend/src/types/chat.ts b/frontend/src/types/chat.ts index a74a9a7..7df8641 100644 --- a/frontend/src/types/chat.ts +++ b/frontend/src/types/chat.ts @@ -20,10 +20,13 @@ export interface ChatMessage { } export interface NotionWorkspace { - id: string; - name: string; - pageCount: number; - connected: boolean; + id: number; + workspace_id: string; + workspace_name: string; + workspace_icon: string | null; + sync_status: string; + last_synced_at: string | null; + connected_at: string; } export interface RecentProject { diff --git a/src/Notion/__init__.py b/src/Notion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/Notion/notion_fetcher/fetchers/__init__.py b/src/Notion/notion_fetcher/fetchers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/Notion/notion_fetcher/models/__init__.py b/src/Notion/notion_fetcher/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/Notion/notion_fetcher/parsers/__init__.py b/src/Notion/notion_fetcher/parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/app.py b/src/backend/app.py index 4a0005b..3b72960 100644 --- a/src/backend/app.py +++ b/src/backend/app.py @@ -3,7 +3,7 @@ from src.backend.config import settings from src.backend.database import Base, engine -from src.backend.routers import admin, auth, conversations, upload +from src.backend.routers import admin, auth, conversations, notion, upload def create_app() -> FastAPI: @@ -23,5 +23,6 @@ def create_app() -> FastAPI: app.include_router(admin.router) app.include_router(conversations.router) app.include_router(upload.router) + app.include_router(notion.router) return app diff --git a/src/backend/config.py b/src/backend/config.py index f48c471..28ec115 100644 --- a/src/backend/config.py +++ b/src/backend/config.py @@ -13,6 +13,10 @@ class Settings(BaseSettings): GEMINI_API_KEY: str = "" VOYAGE_API_KEY: str = "" NOTION_TOKEN: str = "" + NOTION_OAUTH_CLIENT_ID: str = "" + NOTION_OAUTH_CLIENT_SECRET: str = "" + NOTION_REDIRECT_URI: str = "http://localhost:8000/api/notion/callback" + NOTION_ENCRYPTION_KEY: str = "" model_config = {"env_file": ".env", "extra": "ignore"} diff --git a/src/backend/dependencies/auth.py b/src/backend/dependencies/auth.py index 431fc3c..e12943b 100644 --- a/src/backend/dependencies/auth.py +++ b/src/backend/dependencies/auth.py @@ -19,6 +19,17 @@ def create_access_token(data: dict) -> str: return jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) +def verify_token(token: str) -> dict | None: + """Decode a JWT token and return the payload, or None if invalid.""" + try: + payload = jwt.decode( + token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM] + ) + return payload + except JWTError: + return None + + def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), diff --git a/src/backend/dependencies/workspace.py b/src/backend/dependencies/workspace.py new file mode 100644 index 0000000..84e867d --- /dev/null +++ b/src/backend/dependencies/workspace.py @@ -0,0 +1,17 @@ +from sqlalchemy.orm import Session, joinedload + +from src.backend.models.notion import NotionConnection + + +def get_workspace_filter(user_id: int, db: Session) -> dict | None: + """Build a ChromaDB where filter for the user's connected workspaces.""" + connections = ( + db.query(NotionConnection) + .options(joinedload(NotionConnection.workspace)) + .filter(NotionConnection.user_id == user_id) + .all() + ) + workspace_ids = [conn.workspace.workspace_id for conn in connections] + if not workspace_ids: + return None # No filter — allow legacy data access + return {"workspace_id": {"$in": workspace_ids}} diff --git a/src/backend/load/bm25_manager.py b/src/backend/load/bm25_manager.py index 2d06100..eb9e47b 100644 --- a/src/backend/load/bm25_manager.py +++ b/src/backend/load/bm25_manager.py @@ -32,18 +32,21 @@ def build_index(self, chunks: list[str], metadatas: list[dict], ids: list[str]): self.index.index(tokenized_corpus) logger.info(f"BM25 index built with {len(chunks)} documents") - def search(self, query: str, top_k: int = 10) -> list[dict]: + def search(self, query: str, top_k: int = 10, where: dict | None = None) -> list[dict]: if self.index is None: logger.warning("BM25 index not built, returning empty results") return [] - k = min(top_k, len(self.chunks)) + # Over-fetch when filtering to compensate for filtered-out results + fetch_k = min(top_k * 3 if where else top_k, len(self.chunks)) query_tokens = bm25s.tokenize([query.lower()]) - results, _ = self.index.retrieve(query_tokens, k=k) + results, _ = self.index.retrieve(query_tokens, k=fetch_k) output = [] for idx in results[0]: meta = self.metadatas[idx] + if where and not self._matches_filter(meta, where): + continue output.append({ "chunk_id": self.ids[idx], "text": self.chunks[idx], @@ -51,8 +54,22 @@ def search(self, query: str, top_k: int = 10) -> list[dict]: "section": meta.get("parent_title", ""), **meta, }) + if len(output) >= top_k: + break return output + @staticmethod + def _matches_filter(meta: dict, where: dict) -> bool: + """Check if metadata matches a ChromaDB-style where filter.""" + for key, condition in where.items(): + if isinstance(condition, dict): + if "$in" in condition: + if meta.get(key) not in condition["$in"]: + return False + elif meta.get(key) != condition: + return False + return True + def save(self, path: str = BM25_INDEX_PATH): os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as f: diff --git a/src/backend/load/chroma_manager.py b/src/backend/load/chroma_manager.py index d8f792b..8379528 100644 --- a/src/backend/load/chroma_manager.py +++ b/src/backend/load/chroma_manager.py @@ -55,7 +55,7 @@ def add_documents(self, documents, metadatas, ids, batch_size=20): try: # Generate embeddings with Google Embedder embeddings = self.embedder(batch_docs) - self.collection.add( + self.collection.upsert( documents=batch_docs, metadatas=batch_meta, ids=batch_ids, @@ -96,6 +96,15 @@ def get_by_parent(self, parent_id, limit=20): ) return results + def delete_by_workspace(self, workspace_id: str): + """Delete all chunks belonging to a specific workspace.""" + try: + self.collection.delete(where={"workspace_id": workspace_id}) + print(f"Deleted all chunks for workspace '{workspace_id}'") + except Exception as e: + logger.error(f"Error deleting chunks for workspace {workspace_id}: {e}") + raise + def reset(self): """ DANGER: Deletes and recreates the collection. diff --git a/src/backend/load/hybrid_retriever.py b/src/backend/load/hybrid_retriever.py index 00bbe9d..224c19a 100644 --- a/src/backend/load/hybrid_retriever.py +++ b/src/backend/load/hybrid_retriever.py @@ -17,15 +17,16 @@ def search( vector_top_k: int = 15, bm25_top_k: int = 15, final_top_k: int = 15, + where: dict | None = None, ) -> list[dict]: - vector_results = self._query_chroma(query, vector_top_k) - bm25_results = self.bm25.search(query, top_k=bm25_top_k) + vector_results = self._query_chroma(query, vector_top_k, where=where) + bm25_results = self.bm25.search(query, top_k=bm25_top_k, where=where) merged = self.reciprocal_rank_fusion([vector_results, bm25_results]) return merged[:final_top_k] - def _query_chroma(self, query: str, top_k: int) -> list[dict]: - results = self.chroma.query(query, n_results=top_k) + def _query_chroma(self, query: str, top_k: int, where: dict | None = None) -> list[dict]: + results = self.chroma.query(query, n_results=top_k, where=where) output = [] if results and results.get("documents") and results["documents"][0]: docs = results["documents"][0] diff --git a/src/backend/models/__init__.py b/src/backend/models/__init__.py index d87a78b..eafc8e5 100644 --- a/src/backend/models/__init__.py +++ b/src/backend/models/__init__.py @@ -1,2 +1,3 @@ from src.backend.models.conversation import Conversation, MessageRecord # noqa: F401 +from src.backend.models.notion import NotionConnection, NotionWorkspace # noqa: F401 from src.backend.models.user import User # noqa: F401 diff --git a/src/backend/models/notion.py b/src/backend/models/notion.py new file mode 100644 index 0000000..705a688 --- /dev/null +++ b/src/backend/models/notion.py @@ -0,0 +1,59 @@ +from datetime import datetime, timezone + +from sqlalchemy import DateTime, ForeignKey, String, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from src.backend.database import Base + + +class NotionWorkspace(Base): + __tablename__ = "notion_workspaces" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + workspace_id: Mapped[str] = mapped_column( + String, unique=True, index=True, nullable=False + ) + workspace_name: Mapped[str] = mapped_column(String, nullable=False) + workspace_icon: Mapped[str | None] = mapped_column(String, nullable=True) + bot_id: Mapped[str] = mapped_column(String, nullable=False) + sync_status: Mapped[str] = mapped_column(String, default="idle", nullable=False) + last_synced_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + nullable=False, + ) + + connections: Mapped[list["NotionConnection"]] = relationship( + "NotionConnection", back_populates="workspace", cascade="all, delete-orphan" + ) + + +class NotionConnection(Base): + __tablename__ = "notion_connections" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + user_id: Mapped[int] = mapped_column( + ForeignKey("users.id"), nullable=False, index=True + ) + workspace_id: Mapped[int] = mapped_column( + ForeignKey("notion_workspaces.id"), nullable=False, index=True + ) + access_token: Mapped[str] = mapped_column(String, nullable=False) + token_type: Mapped[str] = mapped_column(String, default="bearer", nullable=False) + connected_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + nullable=False, + ) + + user: Mapped["User"] = relationship("User", back_populates="notion_connections") # noqa: F821 + workspace: Mapped["NotionWorkspace"] = relationship( + "NotionWorkspace", back_populates="connections" + ) + + __table_args__ = ( + UniqueConstraint("user_id", "workspace_id", name="uq_user_workspace"), + ) diff --git a/src/backend/models/user.py b/src/backend/models/user.py index a491a70..1578f29 100644 --- a/src/backend/models/user.py +++ b/src/backend/models/user.py @@ -30,3 +30,6 @@ class User(Base): conversations: Mapped[list["Conversation"]] = relationship( # noqa: F821 "Conversation", back_populates="user" ) + notion_connections: Mapped[list["NotionConnection"]] = relationship( # noqa: F821 + "NotionConnection", back_populates="user", cascade="all, delete-orphan" + ) diff --git a/src/backend/routers/chat.py b/src/backend/routers/chat.py deleted file mode 100644 index 17c4724..0000000 --- a/src/backend/routers/chat.py +++ /dev/null @@ -1,124 +0,0 @@ -# import logging - -# from fastapi import APIRouter, Depends, HTTPException - -# from src.backend.dependencies.services import get_chroma_manager, get_gemini_client -# from src.backend.load.chroma_manager import ChromaManager -# from src.backend.llm.gemini_client import GeminiClient -# from src.backend.schemas.chat import ChatRequest, ChatResponse - -# router = APIRouter(prefix="/api/chat", tags=["Chat"]) -# logger = logging.getLogger(__name__) - -# MAX_CONTEXT_CHARS = 4000 - -# @router.post("/ask", response_model=ChatResponse) -# async def ask_question( -# request: ChatRequest, -# chroma: ChromaManager = Depends(get_chroma_manager), -# gemini: GeminiClient = Depends(get_gemini_client), -# ): -# """ -# RAG Pipeline Endpoint: -# 1. Retrieves relevant documents from the ChromaDB vector store. -# 2. Expands context by fetching sibling chunks from the same parent. -# 3. Combines them into a structured chunks list. -# 4. Calls Gemini LLM using the WorkMate prompt. -# """ -# try: -# # Step 1: Initial Retrieval (Fetch 10 to balance page coverage and API limits) -# results = chroma.query(request.question, n_results=10) - -# # Collect initial results into a dict keyed by chunk ID for deduplication -# seen_ids = set() -# chunks_with_meta = [] # list of (doc_text, metadata, chunk_id) -# source_titles = set() - -# if results and results.get("documents") and results["documents"][0]: -# docs = results["documents"][0] -# metas = ( -# results["metadatas"][0] -# if results.get("metadatas") -# else [{}] * len(docs) -# ) -# ids = ( -# results["ids"][0] -# if results.get("ids") -# else [str(i) for i in range(len(docs))] -# ) - -# for doc, meta, chunk_id in zip(docs, metas, ids): -# if chunk_id not in seen_ids: -# seen_ids.add(chunk_id) -# chunks_with_meta.append((doc, meta, chunk_id)) - -# # Step 2: Sibling Expansion — only for very short chunks (likely incomplete) -# parent_ids_to_expand = set() -# for doc_text, meta, chunk_id in chunks_with_meta: -# if len(doc_text.strip()) < 100: # Only expand short/incomplete chunks -# parent_id = meta.get("parent_id") -# if parent_id: -# parent_ids_to_expand.add(parent_id) - -# for parent_id in parent_ids_to_expand: -# sibling_results = chroma.get_by_parent(parent_id, limit=5) -# if sibling_results and sibling_results.get("documents"): -# sib_docs = sibling_results["documents"] -# sib_metas = sibling_results.get("metadatas", [{}] * len(sib_docs)) -# sib_ids = sibling_results.get("ids", [str(i) for i in range(len(sib_docs))]) - -# for doc, meta, chunk_id in zip(sib_docs, sib_metas, sib_ids): -# if chunk_id not in seen_ids: -# seen_ids.add(chunk_id) -# chunks_with_meta.append((doc, meta, chunk_id)) - -# # Step 3: Build formatted context list -# all_chunks = [] -# for doc, meta, chunk_id in chunks_with_meta: -# clean_doc = doc.strip().replace("\r\n", "\n") -# all_chunks.append({ -# "chunk_id": chunk_id, -# "page_title": meta.get("title", "Unknown Source"), -# "section": meta.get("parent_title", ""), -# "text": clean_doc, -# }) - -# # Step 4: LLM Re-ranking (Filter out irrelevant project proposals) -# filtered_chunks = gemini.filter_chunks(all_chunks, request.question) - -# # Enforce maximum of 3 chunks and context cap -# final_chunks = [] -# total_chars = 0 -# for chunk in filtered_chunks[:3]: -# # Respect context cap -# text = chunk["text"] -# if total_chars + len(text) > MAX_CONTEXT_CHARS: -# remaining = MAX_CONTEXT_CHARS - total_chars -# if remaining > 100: -# chunk["text"] = text[:remaining] + "... [truncated]" -# else: -# break - -# final_chunks.append(chunk) -# source_titles.add(chunk["page_title"]) -# total_chars += len(chunk["text"]) - -# sources_list = sorted(source_titles) - -# # Step 5: Generation using only the filtered, relevant chunks -# answer = gemini.ask_workmate( -# chunks=final_chunks, -# user_question=request.question, -# debug=request.debug, -# ) - -# return ChatResponse( -# answer=answer, -# sources=sources_list, -# chunks=final_chunks, -# unfiltered_chunks=all_chunks, -# ) - -# except Exception as e: -# logger.error(f"Error processing RAG query: {e}") -# raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/backend/routers/conversations.py b/src/backend/routers/conversations.py index 62759b0..52dfca5 100644 --- a/src/backend/routers/conversations.py +++ b/src/backend/routers/conversations.py @@ -9,11 +9,11 @@ from src.backend.database import get_db from src.backend.dependencies.auth import get_current_user from src.backend.dependencies.services import get_gemini_client, get_hybrid_retriever, get_voyage_reranker +from src.backend.dependencies.workspace import get_workspace_filter from src.backend.load.hybrid_retriever import HybridRetriever from src.backend.llm.gemini_client import GeminiClient from src.backend.llm.voyage_reranker import VoyageReranker from src.backend.models.conversation import Conversation, MessageRecord -MAX_CONTEXT_CHARS = 5000 from src.backend.models.user import User from src.backend.schemas.conversation import ( ConversationDetail, @@ -26,6 +26,8 @@ router = APIRouter(prefix="/api/conversations", tags=["conversations"]) logger = logging.getLogger(__name__) +MAX_CONTEXT_CHARS = 5000 + @router.post("/", response_model=ConversationSummary) async def create_conversation( @@ -104,7 +106,11 @@ async def send_message( # RAG pipeline try: # Step 1: Hybrid Retrieval (vector + BM25, merged via RRF) - all_chunks = hybrid.search(request.question, vector_top_k=15, bm25_top_k=15, final_top_k=15) + where_filter = get_workspace_filter(current_user.id, db) + all_chunks = hybrid.search( + request.question, vector_top_k=15, bm25_top_k=15, final_top_k=15, + where=where_filter, + ) # Step 2: Sibling Expansion seen_ids = {chunk["chunk_id"] for chunk in all_chunks} @@ -258,7 +264,11 @@ async def stream_message( # RAG retrieval try: # Step 1: Hybrid Retrieval (vector + BM25, merged via RRF) - all_chunks = hybrid.search(request.question, vector_top_k=15, bm25_top_k=15, final_top_k=15) + where_filter = get_workspace_filter(current_user.id, db) + all_chunks = hybrid.search( + request.question, vector_top_k=15, bm25_top_k=15, final_top_k=15, + where=where_filter, + ) # Step 2: Sibling Expansion seen_ids = {chunk["chunk_id"] for chunk in all_chunks} diff --git a/src/backend/routers/notion.py b/src/backend/routers/notion.py new file mode 100644 index 0000000..b60c2bc --- /dev/null +++ b/src/backend/routers/notion.py @@ -0,0 +1,311 @@ +import logging +from datetime import datetime, timezone +from urllib.parse import urlencode + +import httpx +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status +from fastapi.responses import RedirectResponse +from sqlalchemy.orm import Session, joinedload + +from src.backend.config import settings +from src.backend.database import SessionLocal, get_db +from src.backend.dependencies.auth import get_current_user, verify_token +from src.backend.dependencies.services import get_chroma_manager +from src.backend.load.chroma_manager import ChromaManager +from src.backend.models.notion import NotionConnection, NotionWorkspace +from src.backend.models.user import User +from src.backend.schemas.notion import NotionAuthURL, NotionWorkspaceResponse +from src.backend.utils.encryption import decrypt_token, encrypt_token + +router = APIRouter(prefix="/api/notion", tags=["notion"]) +logger = logging.getLogger(__name__) + +NOTION_AUTH_URL = "https://api.notion.com/v1/oauth/authorize" +NOTION_TOKEN_URL = "https://api.notion.com/v1/oauth/token" + + +def _ingest_workspace(workspace_db_id: int, access_token: str, notion_workspace_id: str): + """Background task: fetch from Notion, chunk, and store in ChromaDB.""" + import os + import sys + + # notion_fetcher uses bare imports (e.g. "from notion_fetcher.client import ...") + # that resolve relative to src/Notion/, so it must be on sys.path. + notion_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../Notion")) + if notion_dir not in sys.path: + sys.path.insert(0, notion_dir) + + from src.Notion.notion_fetcher.Notion_Fetcher import NotionFetcher + from src.backend.transform.notion_ingestory import NotionIngestor + + db = SessionLocal() + try: + workspace = db.query(NotionWorkspace).get(workspace_db_id) + if not workspace: + logger.error(f"Workspace {workspace_db_id} not found for ingestion") + return + + workspace.sync_status = "syncing" + db.commit() + + logger.info(f"Starting ingestion for workspace: {workspace.workspace_name}") + + # Fetch documents from Notion using the user's access token + fetcher = NotionFetcher(access_token) + documents = fetcher.fetch_all() + + # Convert NotionDocument objects to dicts for the ingestor + raw_docs = [doc.to_dict() for doc in documents] + + # Run the ingestion pipeline with workspace_id tagging + ingestor = NotionIngestor(workspace_id=notion_workspace_id) + ingestor.run_pipeline_from_docs(raw_docs) + + workspace.sync_status = "idle" + workspace.last_synced_at = datetime.now(timezone.utc) + db.commit() + + logger.info( + f"Ingestion complete for workspace: {workspace.workspace_name} " + f"({len(raw_docs)} documents)" + ) + except Exception as e: + logger.error(f"Ingestion failed for workspace {workspace_db_id}: {e}") + try: + workspace = db.query(NotionWorkspace).get(workspace_db_id) + if workspace: + workspace.sync_status = "error" + db.commit() + except Exception: + pass + finally: + db.close() + + +@router.get("/connect", response_model=NotionAuthURL) +def notion_connect(current_user: User = Depends(get_current_user)): + """Return the Notion OAuth authorization URL. + Embeds the user's JWT in the state parameter so we can identify them in the callback. + """ + from src.backend.dependencies.auth import create_access_token + + jwt_token = create_access_token({"sub": str(current_user.id)}) + + params = { + "client_id": settings.NOTION_OAUTH_CLIENT_ID, + "redirect_uri": settings.NOTION_REDIRECT_URI, + "response_type": "code", + "owner": "user", + "state": jwt_token, + } + authorization_url = f"{NOTION_AUTH_URL}?{urlencode(params)}" + return NotionAuthURL(authorization_url=authorization_url) + + +@router.get("/callback") +async def notion_callback( + code: str, + state: str, + background_tasks: BackgroundTasks, + db: Session = Depends(get_db), +): + """Handle the Notion OAuth callback. + Exchanges the code for an access token, stores workspace + connection, + and triggers background ingestion if the workspace is new. + """ + # Verify the JWT from the state parameter to identify the user + payload = verify_token(state) + if payload is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or expired state token", + ) + user_id = int(payload.get("sub")) + user = db.query(User).filter(User.id == user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + + # Exchange the authorization code for an access token + async with httpx.AsyncClient() as client: + token_response = await client.post( + NOTION_TOKEN_URL, + json={ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": settings.NOTION_REDIRECT_URI, + }, + auth=(settings.NOTION_OAUTH_CLIENT_ID, settings.NOTION_OAUTH_CLIENT_SECRET), + ) + + if token_response.status_code != 200: + logger.error(f"Notion token exchange failed: {token_response.text}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Failed to exchange code for Notion access token", + ) + + token_data = token_response.json() + access_token = token_data.get("access_token") + bot_id = token_data.get("bot_id", "") + workspace_info = token_data.get("workspace", {}) + notion_workspace_id = token_data.get("workspace_id", "") + workspace_name = token_data.get("workspace_name", workspace_info.get("name", "Untitled Workspace")) + workspace_icon = token_data.get("workspace_icon", workspace_info.get("icon", None)) + + # Check if this workspace already exists in our DB + workspace = ( + db.query(NotionWorkspace) + .filter(NotionWorkspace.workspace_id == notion_workspace_id) + .first() + ) + is_new_workspace = workspace is None + + if is_new_workspace: + workspace = NotionWorkspace( + workspace_id=notion_workspace_id, + workspace_name=workspace_name, + workspace_icon=workspace_icon, + bot_id=bot_id, + sync_status="idle", + ) + db.add(workspace) + db.commit() + db.refresh(workspace) + + # Check if the user already has a connection to this workspace + existing_connection = ( + db.query(NotionConnection) + .filter( + NotionConnection.user_id == user_id, + NotionConnection.workspace_id == workspace.id, + ) + .first() + ) + + if existing_connection: + # Update the access token (user re-authorized) + existing_connection.access_token = encrypt_token(access_token) + db.commit() + else: + connection = NotionConnection( + user_id=user_id, + workspace_id=workspace.id, + access_token=encrypt_token(access_token), + ) + db.add(connection) + db.commit() + + # Trigger background ingestion if this is a new workspace + if is_new_workspace: + background_tasks.add_task( + _ingest_workspace, workspace.id, access_token, notion_workspace_id + ) + + redirect_url = f"{settings.FRONTEND_URL}/settings?notion=connected" + return RedirectResponse(url=redirect_url) + + +@router.get("/workspaces", response_model=list[NotionWorkspaceResponse]) +def list_workspaces( + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """List all Notion workspaces connected by the current user.""" + connections = ( + db.query(NotionConnection) + .options(joinedload(NotionConnection.workspace)) + .filter(NotionConnection.user_id == current_user.id) + .all() + ) + results = [] + for conn in connections: + ws = conn.workspace + results.append( + NotionWorkspaceResponse( + id=ws.id, + workspace_id=ws.workspace_id, + workspace_name=ws.workspace_name, + workspace_icon=ws.workspace_icon, + sync_status=ws.sync_status, + last_synced_at=ws.last_synced_at, + connected_at=conn.connected_at, + ) + ) + return results + + +@router.post("/workspaces/{workspace_id}/sync") +def sync_workspace( + workspace_id: int, + background_tasks: BackgroundTasks, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), + chroma: ChromaManager = Depends(get_chroma_manager), +): + """Trigger a manual re-sync for a connected workspace.""" + connection = ( + db.query(NotionConnection) + .filter( + NotionConnection.user_id == current_user.id, + NotionConnection.workspace_id == workspace_id, + ) + .first() + ) + if not connection: + raise HTTPException(status_code=404, detail="Workspace connection not found") + + workspace = connection.workspace + if workspace.sync_status == "syncing": + return {"status": "already_syncing"} + + # Delete existing chunks for this workspace before re-ingesting + chroma.delete_by_workspace(workspace.workspace_id) + + workspace.sync_status = "syncing" + db.commit() + + access_token = decrypt_token(connection.access_token) + background_tasks.add_task( + _ingest_workspace, workspace.id, access_token, workspace.workspace_id + ) + + return {"status": "syncing"} + + +@router.delete("/workspaces/{workspace_id}", status_code=status.HTTP_204_NO_CONTENT) +def disconnect_workspace( + workspace_id: int, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), + chroma: ChromaManager = Depends(get_chroma_manager), +): + """Disconnect the current user from a Notion workspace. + If this is the last connected user, also delete workspace data from ChromaDB. + """ + connection = ( + db.query(NotionConnection) + .filter( + NotionConnection.user_id == current_user.id, + NotionConnection.workspace_id == workspace_id, + ) + .first() + ) + if not connection: + raise HTTPException(status_code=404, detail="Workspace connection not found") + + workspace = connection.workspace + db.delete(connection) + db.commit() + + # Check if any other users are still connected to this workspace + remaining = ( + db.query(NotionConnection) + .filter(NotionConnection.workspace_id == workspace_id) + .count() + ) + + if remaining == 0: + # Last user disconnected — purge workspace data + chroma.delete_by_workspace(workspace.workspace_id) + db.delete(workspace) + db.commit() diff --git a/src/backend/schemas/chat.py b/src/backend/schemas/chat.py deleted file mode 100644 index 9e3345f..0000000 --- a/src/backend/schemas/chat.py +++ /dev/null @@ -1,30 +0,0 @@ -# from typing import Optional - -# from pydantic import BaseModel, Field - - -# class ChatRequest(BaseModel): -# question: str = Field( -# ..., description="The user's question to ask the RAG pipeline." -# ) -# debug: bool = Field( -# False, description="Enable debug mode to include chunk IDs in the response." -# ) - - -# class ChatResponse(BaseModel): -# answer: str = Field( -# ..., description="The generated context-aware answer from the Gemini LLM." -# ) -# sources: Optional[list[str]] = Field( -# default=None, -# description="List of Notion page titles used as context for the answer.", -# ) -# chunks: Optional[list[dict]] = Field( -# default=None, -# description="The raw context chunks sent to the LLM.", -# ) -# unfiltered_chunks: Optional[list[dict]] = Field( -# default=None, -# description="The raw context chunks retrieved from ChromaDB before LLM filtering.", -# ) diff --git a/src/backend/schemas/notion.py b/src/backend/schemas/notion.py new file mode 100644 index 0000000..d44aaca --- /dev/null +++ b/src/backend/schemas/notion.py @@ -0,0 +1,19 @@ +from datetime import datetime + +from pydantic import BaseModel + + +class NotionAuthURL(BaseModel): + authorization_url: str + + +class NotionWorkspaceResponse(BaseModel): + id: int + workspace_id: str + workspace_name: str + workspace_icon: str | None + sync_status: str + last_synced_at: datetime | None + connected_at: datetime + + model_config = {"from_attributes": True} diff --git a/src/backend/transform/notion_ingestory.py b/src/backend/transform/notion_ingestory.py index 2ef51e2..adaf681 100644 --- a/src/backend/transform/notion_ingestory.py +++ b/src/backend/transform/notion_ingestory.py @@ -4,13 +4,14 @@ from src.backend.load.bm25_manager import BM25Manager, BM25_INDEX_PATH class NotionIngestor: - def __init__(self, file_path="./notion_data.json", chunk_size=1000, chunk_overlap=200): + def __init__(self, file_path="./notion_data.json", chunk_size=1000, chunk_overlap=200, workspace_id=None): """ Initializes the ingestion pipeline, database connection, and splitters. """ self.file_path = file_path self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap + self.workspace_id = workspace_id # Instantiate the database manager self.db = ChromaManager() @@ -33,6 +34,24 @@ def _load_data(self): with open(self.file_path, 'r', encoding='utf-8') as f: return json.load(f) + def _deduplicate_docs(self, raw_docs): + """ + Remove duplicate documents by ID. + Notion's search API returns database rows as both pages and database rows, + so the same document can appear twice. Prefer the 'database_row' version + since it contains richer property metadata. + """ + seen = {} + for doc in raw_docs: + doc_id = doc.get("id") + if doc_id in seen: + # Keep the database_row version over the page version + if doc.get("source_type") == "database_row": + seen[doc_id] = doc + else: + seen[doc_id] = doc + return list(seen.values()) + def _build_parent_child_maps(self, raw_docs): """ Build lookup maps for parent-child relationships. @@ -105,9 +124,11 @@ def _process_document(self, doc, id_to_doc, parent_to_children): "url": doc.get("url"), "source_type": doc.get("source_type", "page"), "chunk_index": i, - "section_header": "", } + if self.workspace_id: + chunk_metadata["workspace_id"] = self.workspace_id + if parent_title: chunk_metadata["parent_title"] = parent_title if parent_id: @@ -122,6 +143,7 @@ def run_pipeline(self): """Public method to execute the full ingestion pipeline.""" print(f"Loading data from {self.file_path}...") raw_docs = self._load_data() + raw_docs = self._deduplicate_docs(raw_docs) # Build parent-child relationship maps print("Building parent-child relationship maps...") id_to_doc, parent_to_children = self._build_parent_child_maps(raw_docs) @@ -150,6 +172,37 @@ def run_pipeline(self): else: print("No chunks were created.") + def run_pipeline_from_docs(self, raw_docs): + """Run the ingestion pipeline from in-memory document dicts (no file I/O).""" + raw_docs = self._deduplicate_docs(raw_docs) + print("Building parent-child relationship maps...") + id_to_doc, parent_to_children = self._build_parent_child_maps(raw_docs) + print(f" Found {len(parent_to_children)} parent documents with children") + + all_chunks = [] + all_metadatas = [] + all_ids = [] + + print(f"Running Hybrid Chunking on {len(raw_docs)} documents (with context enrichment)...") + + for doc in raw_docs: + chunks, metadatas, ids = self._process_document(doc, id_to_doc, parent_to_children) + all_chunks.extend(chunks) + all_metadatas.extend(metadatas) + all_ids.extend(ids) + + if all_chunks: + print(f"Storing {len(all_chunks)} chunks into Chroma...") + self.db.add_documents(all_chunks, all_metadatas, all_ids) + + print("Building BM25 index...") + bm25 = BM25Manager() + bm25.build_index(all_chunks, all_metadatas, all_ids) + bm25.save(BM25_INDEX_PATH) + print(f"BM25 index saved to {BM25_INDEX_PATH}") + else: + print("No chunks were created.") + # --- Execution --- if __name__ == "__main__": # The instantiation is clean, and the parameters can easily be swapped for testing. diff --git a/src/backend/utils/__init__.py b/src/backend/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/utils/encryption.py b/src/backend/utils/encryption.py new file mode 100644 index 0000000..fe9affd --- /dev/null +++ b/src/backend/utils/encryption.py @@ -0,0 +1,26 @@ +from cryptography.fernet import Fernet + +from src.backend.config import settings + +_fernet = None + + +def _get_fernet() -> Fernet: + global _fernet + if _fernet is None: + key = settings.NOTION_ENCRYPTION_KEY + if not key: + raise RuntimeError( + "NOTION_ENCRYPTION_KEY is not set. " + "Generate one with: python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\"" + ) + _fernet = Fernet(key.encode()) + return _fernet + + +def encrypt_token(plain: str) -> str: + return _get_fernet().encrypt(plain.encode()).decode() + + +def decrypt_token(cipher: str) -> str: + return _get_fernet().decrypt(cipher.encode()).decode()