diff --git a/README.md b/README.md index 8a295f8..1120a38 100644 --- a/README.md +++ b/README.md @@ -33,8 +33,15 @@ Slack integration: - `SLACK_CLIENT_ID`: Slack OAuth app client ID. - `SLACK_CLIENT_SECRET`: Slack OAuth app client secret. +- `SLACK_SIGNING_SECRET`: Slack App signing secret used for verifying webhook signatures. - `SLACK_REDIRECT_URI`: Optional override for the Slack OAuth callback URL. Defaults to the current request host plus `/api/integrations/slack/callback`. +### Slack Webhooks +For real-time message ingestion, configure "Event Subscriptions" in your Slack App settings to point to: +`https:///api/webhooks/slack` + +Ensure you have subscribed to the `message.channels` bot user event. + GitHub integration: - `GITHUB_CLIENT_ID`: GitHub OAuth app client ID. diff --git a/handlers/integrations.go b/handlers/integrations.go index 4356641..54fa67b 100644 --- a/handlers/integrations.go +++ b/handlers/integrations.go @@ -443,6 +443,231 @@ func GetSlackChannels(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(map[string]interface{}{"channels": channels}) } +func SlackWebhookHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Invalid body", http.StatusBadRequest) + return + } + + // Verify signature if secret is configured + signingSecret := os.Getenv("SLACK_SIGNING_SECRET") + if signingSecret != "" { + signature := r.Header.Get("X-Slack-Signature") + timestamp := r.Header.Get("X-Slack-Request-Timestamp") + if err := slackClient.ValidateWebhookRequest(body, signature, timestamp, signingSecret); err != nil { + log.Printf("Slack webhook signature validation failed: %v", err) + http.Error(w, "Invalid signature", http.StatusUnauthorized) + return + } + } + + var event services.SlackWebhookEvent + if err := json.Unmarshal(body, &event); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + // Handle URL verification challenge + if event.Type == "url_verification" { + w.Header().Set("Content-Type", "text/plain") + w.Write([]byte(event.Challenge)) + return + } + + // Handle event callbacks + if event.Type == "event_callback" { + var innerEvent services.SlackMessageEvent + if err := json.Unmarshal(event.Event, &innerEvent); err == nil { + if innerEvent.Type == "message" && innerEvent.User != "" { + go handleSlackMessageEvent(event.TeamID, innerEvent) + } + } + } + + w.WriteHeader(http.StatusOK) +} + +func handleSlackMessageEvent(teamID string, event services.SlackMessageEvent) { + // Find integrations for this team + rows, err := database.DB.Query( + "SELECT user_id, workspace_id, metadata FROM external_integrations WHERE provider = 'slack'", + ) + if err != nil { + log.Printf("Failed to query integrations for Slack webhook: %v", err) + return + } + defer rows.Close() + + for rows.Next() { + var userID, workspaceID int + var metadataStr string + if err := rows.Scan(&userID, &workspaceID, &metadataStr); err != nil { + continue + } + + var metadata map[string]interface{} + json.Unmarshal([]byte(metadataStr), &metadata) + + if metadata["team_id"] == teamID { + // Check if channel is selected + selectedChannels, ok := metadata["selected_channels"].([]interface{}) + isChannelSelected := false + if !ok || len(selectedChannels) == 0 { + isChannelSelected = true // Default to all if none selected (as per sync.go) + } else { + for _, ch := range selectedChannels { + if chStr, ok := ch.(string); ok && chStr == event.Channel { + isChannelSelected = true + break + } + } + } + + if isChannelSelected { + msg := services.SlackMessage{ + User: event.User, + Text: event.Text, + TS: event.TS, + } + // Parse timestamp + parts := strings.Split(event.TS, ".") + if len(parts) > 0 { + sec, _ := strconv.ParseInt(parts[0], 10, 64) + msg.Timestamp = sec + } + + if err := services.SaveSlackSignal(database.DB, userID, workspaceID, msg, event.Channel, event.User); err != nil { + log.Printf("Failed to save Slack webhook signal: %v", err) + } + } + } + } +} + +func SlackReplyHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + userID, err := getUserIDFromContext(r) + if err != nil { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + workspaceID, statusCode, err := getAuthorizedWorkspaceID(r, userID) + if err != nil { + http.Error(w, err.Error(), statusCode) + return + } + + var req struct { + ChannelID string `json:"channel_id"` + ThreadTS string `json:"thread_ts"` + Text string `json:"text"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + // Fetch token + var encryptedToken string + err = database.DB.QueryRow( + `SELECT access_token FROM external_integrations + WHERE user_id = ? AND workspace_id = ? AND provider = 'slack'`, + userID, workspaceID, + ).Scan(&encryptedToken) + if err != nil { + http.Error(w, "Integration not found", http.StatusNotFound) + return + } + + accessToken, err := tokenEncryptor.Decrypt(encryptedToken) + if err != nil { + http.Error(w, "Failed to decrypt token", http.StatusInternalServerError) + return + } + + rateLimit, err := slackClient.PostMessage(accessToken, req.ChannelID, req.Text, req.ThreadTS) + if err != nil { + if rateLimit != nil && rateLimit.IsRateLimited() { + http.Error(w, "Rate limited", http.StatusTooManyRequests) + return + } + http.Error(w, "Failed to post message: "+err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) +} + +func SlackDisconnectHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + userID, err := getUserIDFromContext(r) + if err != nil { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + workspaceID, statusCode, err := getAuthorizedWorkspaceID(r, userID) + if err != nil { + http.Error(w, err.Error(), statusCode) + return + } + + if _, err := database.DB.Exec( + "DELETE FROM external_integrations WHERE user_id = ? AND workspace_id = ? AND provider = 'slack'", + userID, workspaceID, + ); err != nil { + http.Error(w, "Failed to disconnect Slack", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]string{"status": "disconnected"}) +} + +func SlackSyncHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + userID, err := getUserIDFromContext(r) + if err != nil { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + workspaceID, statusCode, err := getAuthorizedWorkspaceID(r, userID) + if err != nil { + http.Error(w, err.Error(), statusCode) + return + } + + go func() { + if err := services.SyncSlackSignals(userID, workspaceID); err != nil { + log.Printf("Manual Slack sync error: %v", err) + } + }() + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]string{"status": "sync_started"}) +} + func GmailAuthHandler(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) diff --git a/main.go b/main.go index 5fab5c5..08871a3 100644 --- a/main.go +++ b/main.go @@ -99,6 +99,7 @@ func main() { mux.Handle("/api/integrations", middleware.AuthMiddleware(http.HandlerFunc(handlers.GetIntegrations))) mux.Handle("/api/integrations/slack/auth", middleware.AuthMiddleware(http.HandlerFunc(handlers.SlackAuth))) mux.Handle("/api/integrations/slack/channels", middleware.AuthMiddleware(http.HandlerFunc(handlers.GetSlackChannels))) + mux.Handle("/api/integrations/slack/sync", middleware.AuthMiddleware(http.HandlerFunc(handlers.SlackSyncHandler))) mux.Handle("/api/integrations/github/auth", middleware.AuthMiddleware(http.HandlerFunc(handlers.GitHubAuthHandler))) mux.Handle("/api/integrations/github/repos", middleware.AuthMiddleware(http.HandlerFunc(handlers.GitHubReposHandler))) mux.Handle("/api/integrations/github/sync", middleware.AuthMiddleware(http.HandlerFunc(handlers.GitHubSyncHandler))) @@ -121,6 +122,8 @@ func main() { mux.Handle("/api/integrations/jira/sync", middleware.AuthMiddleware(http.HandlerFunc(handlers.JiraSyncHandler))) mux.Handle("/api/integrations/jira/issues/", middleware.AuthMiddleware(http.HandlerFunc(handlers.JiraIssueActionHandler))) mux.Handle("/api/integrations/jira", middleware.AuthMiddleware(http.HandlerFunc(handlers.JiraDisconnectHandler))) + mux.Handle("/api/integrations/slack/reply", middleware.AuthMiddleware(http.HandlerFunc(handlers.SlackReplyHandler))) + mux.Handle("/api/integrations/slack", middleware.AuthMiddleware(http.HandlerFunc(handlers.SlackDisconnectHandler))) mux.Handle("/api/integrations/status", middleware.AuthMiddleware(http.HandlerFunc(handlers.IntegrationStatusHandler))) mux.Handle("/api/integrations/", middleware.AuthMiddleware(http.HandlerFunc(handlers.DeleteIntegration))) @@ -156,6 +159,7 @@ func main() { // Webhook routes (public, but should verify signature in production) mux.HandleFunc("/api/webhooks/github", handlers.GitHubWebhookHandler) + mux.HandleFunc("/api/webhooks/slack", handlers.SlackWebhookHandler) // Apply CORS and logging middleware handler := loggingMiddleware(middleware.CorsMiddleware(mux)) diff --git a/sentinent.db-shm b/sentinent.db-shm new file mode 100644 index 0000000..fbb3602 Binary files /dev/null and b/sentinent.db-shm differ diff --git a/sentinent.db-wal b/sentinent.db-wal new file mode 100644 index 0000000..c76db70 Binary files /dev/null and b/sentinent.db-wal differ diff --git a/services/slack.go b/services/slack.go index ea5a139..d78182c 100644 --- a/services/slack.go +++ b/services/slack.go @@ -3,6 +3,7 @@ package services import ( "crypto/hmac" "crypto/sha256" + "database/sql" "encoding/hex" "encoding/json" "errors" @@ -319,6 +320,50 @@ func (c *SlackClient) MarkMessageAsRead(accessToken, channelID, timestamp string return rateLimit, nil } +// PostMessage posts a message to a channel, optionally in a thread +func (c *SlackClient) PostMessage(accessToken, channelID, text, threadTS string) (*RateLimitInfo, error) { + data := url.Values{} + data.Set("channel", channelID) + data.Set("text", text) + if threadTS != "" { + data.Set("thread_ts", threadTS) + } + + req, err := http.NewRequest("POST", c.BaseURL+"/chat.postMessage", strings.NewReader(data.Encode())) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Authorization", "Bearer "+accessToken) + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + rateLimit := extractRateLimit(resp) + + body, err := io.ReadAll(resp.Body) + if err != nil { + return rateLimit, err + } + + var result struct { + OK bool `json:"ok"` + Error string `json:"error"` + } + if err := json.Unmarshal(body, &result); err != nil { + return rateLimit, err + } + + if !result.OK { + return rateLimit, &SlackAPIError{Code: result.Error} + } + + return rateLimit, nil +} + // extractRateLimit extracts rate limit headers from the response func extractRateLimit(resp *http.Response) *RateLimitInfo { info := &RateLimitInfo{} @@ -423,3 +468,45 @@ func (c *SlackClient) ValidateWebhookRequest(body []byte, signature, timestamp, return nil } + +// SaveSlackSignal saves a Slack message as a signal in the database +func SaveSlackSignal(db *sql.DB, userID, workspaceID int, msg SlackMessage, channelID string, authorName string) error { + sourceID := channelID + ":" + msg.TS + + // Store metadata + msgMetadata := map[string]interface{}{ + "channel_id": channelID, + "ts": msg.TS, + "user_id": msg.User, + } + metadataJSON, _ := json.Marshal(msgMetadata) + + title := truncateString(msg.Text, 100) + if title == "" { + title = "Slack Message" + } + + // Use UPSERT to handle duplicates and updates + _, err := db.Exec( + `INSERT INTO signals + (user_id, workspace_id, source_type, source_id, external_id, title, content, body, author, status, source_metadata, received_at, updated_at) + VALUES (?, ?, 'slack', ?, ?, ?, ?, ?, ?, 'unread', ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(user_id, source_type, source_id) DO UPDATE SET + title = excluded.title, + content = excluded.content, + body = excluded.body, + author = excluded.author, + source_metadata = excluded.source_metadata, + updated_at = CURRENT_TIMESTAMP`, + userID, workspaceID, sourceID, msg.TS, title, msg.Text, msg.Text, authorName, + string(metadataJSON), time.Unix(msg.Timestamp, 0), + ) + return err +} + +func truncateString(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + "..." +} diff --git a/services/sync.go b/services/sync.go index f0b378b..57e1cb3 100644 --- a/services/sync.go +++ b/services/sync.go @@ -202,6 +202,9 @@ func (s *SyncService) syncSlackIntegration(integration *models.ExternalIntegrati continue } + // Cache for user names to avoid redundant API calls + userCache := make(map[string]string) + // Process and store messages for _, msg := range messages { if msg.Type != "message" || msg.User == "" { @@ -212,38 +215,51 @@ func (s *SyncService) syncSlackIntegration(integration *models.ExternalIntegrati // Get user info for author name authorName := msg.User - userResp, _, err := s.slackClient.GetUserInfo(accessToken, msg.User) - if err == nil && userResp != nil { - if userResp.User.RealName != "" { - authorName = userResp.User.RealName - } else if userResp.User.Name != "" { - authorName = userResp.User.Name + if cachedName, ok := userCache[msg.User]; ok { + authorName = cachedName + } else { + userResp, _, err := s.slackClient.GetUserInfo(accessToken, msg.User) + if err == nil && userResp != nil { + if userResp.User.RealName != "" { + authorName = userResp.User.RealName + } else if userResp.User.Name != "" { + authorName = userResp.User.Name + } + userCache[msg.User] = authorName } } - // Check if signal already exists - var existingID int - err = database.DB.QueryRow( - `SELECT id FROM signals - WHERE user_id = ? AND source_type = ? - AND (source_id = ? OR external_id = ?)`, - integration.UserID, models.SourceTypeSlack, sourceID, msg.TS, - ).Scan(&existingID) - - if err == sql.ErrNoRows { - // Insert new signal - _, err = database.DB.Exec( - `INSERT INTO signals (user_id, workspace_id, source_type, source_id, external_id, title, content, author, status, received_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - integration.UserID, integration.WorkspaceID, models.SourceTypeSlack, - sourceID, msg.TS, truncate(msg.Text, 100), msg.Text, authorName, - models.SignalStatusUnread, time.Unix(msg.Timestamp, 0), - ) - if err != nil { - log.Printf("Failed to insert signal: %v", err) - } - } else if err != nil { - log.Printf("Failed to check existing signal: %v", err) + // Store metadata + msgMetadata := map[string]interface{}{ + "channel_id": channelID, + "ts": msg.TS, + "user_id": msg.User, + } + metadataJSON, _ := json.Marshal(msgMetadata) + + title := truncate(msg.Text, 100) + if title == "" { + title = "Slack Message" + } + + // Use UPSERT to handle duplicates and updates + _, err = database.DB.Exec( + `INSERT INTO signals + (user_id, workspace_id, source_type, source_id, external_id, title, content, body, author, status, source_metadata, received_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(user_id, source_type, source_id) DO UPDATE SET + title = excluded.title, + content = excluded.content, + body = excluded.body, + author = excluded.author, + source_metadata = excluded.source_metadata, + updated_at = CURRENT_TIMESTAMP`, + integration.UserID, integration.WorkspaceID, models.SourceTypeSlack, + sourceID, msg.TS, title, msg.Text, msg.Text, authorName, + models.SignalStatusUnread, string(metadataJSON), time.Unix(msg.Timestamp, 0), + ) + if err != nil { + log.Printf("Failed to upsert Slack signal: %v", err) } } @@ -309,3 +325,35 @@ func (s *SyncService) ManualSync(integrationID int) error { return nil } + +// SyncSlackSignals triggers a manual sync for Slack signals +func SyncSlackSignals(userID, workspaceID int) error { + var integration models.ExternalIntegration + var encryptedToken string + err := database.DB.QueryRow( + `SELECT id, user_id, workspace_id, provider, access_token, metadata + FROM external_integrations + WHERE user_id = ? AND workspace_id = ? AND provider = 'slack'`, + userID, workspaceID, + ).Scan( + &integration.ID, &integration.UserID, &integration.WorkspaceID, + &integration.Provider, &encryptedToken, &integration.Metadata, + ) + if err != nil { + return err + } + + encryptor, err := utils.NewTokenEncryptor() + if err != nil { + return err + } + + accessToken, err := encryptor.Decrypt(encryptedToken) + if err != nil { + return err + } + + s := NewSyncService(encryptor) + s.syncSlackIntegration(&integration, accessToken) + return nil +}