Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ Slack integration:

- `SLACK_CLIENT_ID`: Slack OAuth app client ID.
- `SLACK_CLIENT_SECRET`: Slack OAuth app client secret.
- `SLACK_SIGNING_SECRET`: Slack App signing secret used for verifying webhook signatures.
- `SLACK_REDIRECT_URI`: Optional override for the Slack OAuth callback URL. Defaults to the current request host plus `/api/integrations/slack/callback`.

### Slack Webhooks
For real-time message ingestion, configure "Event Subscriptions" in your Slack App settings to point to:
`https://<your-public-domain>/api/webhooks/slack`

Ensure you have subscribed to the `message.channels` bot user event.

GitHub integration:

- `GITHUB_CLIENT_ID`: GitHub OAuth app client ID.
Expand Down
225 changes: 225 additions & 0 deletions handlers/integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,231 @@ func GetSlackChannels(w http.ResponseWriter, r *http.Request) {
_ = json.NewEncoder(w).Encode(map[string]interface{}{"channels": channels})
}

func SlackWebhookHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Invalid body", http.StatusBadRequest)
return
}

// Verify signature if secret is configured
signingSecret := os.Getenv("SLACK_SIGNING_SECRET")
if signingSecret != "" {
signature := r.Header.Get("X-Slack-Signature")
timestamp := r.Header.Get("X-Slack-Request-Timestamp")
if err := slackClient.ValidateWebhookRequest(body, signature, timestamp, signingSecret); err != nil {
log.Printf("Slack webhook signature validation failed: %v", err)
http.Error(w, "Invalid signature", http.StatusUnauthorized)
return
}
}

var event services.SlackWebhookEvent
if err := json.Unmarshal(body, &event); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}

// Handle URL verification challenge
if event.Type == "url_verification" {
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte(event.Challenge))
return
}

// Handle event callbacks
if event.Type == "event_callback" {
var innerEvent services.SlackMessageEvent
if err := json.Unmarshal(event.Event, &innerEvent); err == nil {
if innerEvent.Type == "message" && innerEvent.User != "" {
go handleSlackMessageEvent(event.TeamID, innerEvent)
}
}
}

w.WriteHeader(http.StatusOK)
}

func handleSlackMessageEvent(teamID string, event services.SlackMessageEvent) {
// Find integrations for this team
rows, err := database.DB.Query(
"SELECT user_id, workspace_id, metadata FROM external_integrations WHERE provider = 'slack'",
)
if err != nil {
log.Printf("Failed to query integrations for Slack webhook: %v", err)
return
}
defer rows.Close()

for rows.Next() {
var userID, workspaceID int
var metadataStr string
if err := rows.Scan(&userID, &workspaceID, &metadataStr); err != nil {
continue
}

var metadata map[string]interface{}
json.Unmarshal([]byte(metadataStr), &metadata)

if metadata["team_id"] == teamID {
// Check if channel is selected
selectedChannels, ok := metadata["selected_channels"].([]interface{})
isChannelSelected := false
if !ok || len(selectedChannels) == 0 {
isChannelSelected = true // Default to all if none selected (as per sync.go)
} else {
for _, ch := range selectedChannels {
if chStr, ok := ch.(string); ok && chStr == event.Channel {
isChannelSelected = true
break
}
}
}

if isChannelSelected {
msg := services.SlackMessage{
User: event.User,
Text: event.Text,
TS: event.TS,
}
// Parse timestamp
parts := strings.Split(event.TS, ".")
if len(parts) > 0 {
sec, _ := strconv.ParseInt(parts[0], 10, 64)
msg.Timestamp = sec
}

if err := services.SaveSlackSignal(database.DB, userID, workspaceID, msg, event.Channel, event.User); err != nil {
log.Printf("Failed to save Slack webhook signal: %v", err)
}
}
}
}
}

func SlackReplyHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

userID, err := getUserIDFromContext(r)
if err != nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}

workspaceID, statusCode, err := getAuthorizedWorkspaceID(r, userID)
if err != nil {
http.Error(w, err.Error(), statusCode)
return
}

var req struct {
ChannelID string `json:"channel_id"`
ThreadTS string `json:"thread_ts"`
Text string `json:"text"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}

// Fetch token
var encryptedToken string
err = database.DB.QueryRow(
`SELECT access_token FROM external_integrations
WHERE user_id = ? AND workspace_id = ? AND provider = 'slack'`,
userID, workspaceID,
).Scan(&encryptedToken)
if err != nil {
http.Error(w, "Integration not found", http.StatusNotFound)
return
}

accessToken, err := tokenEncryptor.Decrypt(encryptedToken)
if err != nil {
http.Error(w, "Failed to decrypt token", http.StatusInternalServerError)
return
}

rateLimit, err := slackClient.PostMessage(accessToken, req.ChannelID, req.Text, req.ThreadTS)
if err != nil {
if rateLimit != nil && rateLimit.IsRateLimited() {
http.Error(w, "Rate limited", http.StatusTooManyRequests)
return
}
http.Error(w, "Failed to post message: "+err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
}

func SlackDisconnectHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

userID, err := getUserIDFromContext(r)
if err != nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}

workspaceID, statusCode, err := getAuthorizedWorkspaceID(r, userID)
if err != nil {
http.Error(w, err.Error(), statusCode)
return
}

if _, err := database.DB.Exec(
"DELETE FROM external_integrations WHERE user_id = ? AND workspace_id = ? AND provider = 'slack'",
userID, workspaceID,
); err != nil {
http.Error(w, "Failed to disconnect Slack", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]string{"status": "disconnected"})
}

func SlackSyncHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

userID, err := getUserIDFromContext(r)
if err != nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}

workspaceID, statusCode, err := getAuthorizedWorkspaceID(r, userID)
if err != nil {
http.Error(w, err.Error(), statusCode)
return
}

go func() {
if err := services.SyncSlackSignals(userID, workspaceID); err != nil {
log.Printf("Manual Slack sync error: %v", err)
}
}()

w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]string{"status": "sync_started"})
}

func GmailAuthHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func main() {
mux.Handle("/api/integrations", middleware.AuthMiddleware(http.HandlerFunc(handlers.GetIntegrations)))
mux.Handle("/api/integrations/slack/auth", middleware.AuthMiddleware(http.HandlerFunc(handlers.SlackAuth)))
mux.Handle("/api/integrations/slack/channels", middleware.AuthMiddleware(http.HandlerFunc(handlers.GetSlackChannels)))
mux.Handle("/api/integrations/slack/sync", middleware.AuthMiddleware(http.HandlerFunc(handlers.SlackSyncHandler)))
mux.Handle("/api/integrations/github/auth", middleware.AuthMiddleware(http.HandlerFunc(handlers.GitHubAuthHandler)))
mux.Handle("/api/integrations/github/repos", middleware.AuthMiddleware(http.HandlerFunc(handlers.GitHubReposHandler)))
mux.Handle("/api/integrations/github/sync", middleware.AuthMiddleware(http.HandlerFunc(handlers.GitHubSyncHandler)))
Expand All @@ -121,6 +122,8 @@ func main() {
mux.Handle("/api/integrations/jira/sync", middleware.AuthMiddleware(http.HandlerFunc(handlers.JiraSyncHandler)))
mux.Handle("/api/integrations/jira/issues/", middleware.AuthMiddleware(http.HandlerFunc(handlers.JiraIssueActionHandler)))
mux.Handle("/api/integrations/jira", middleware.AuthMiddleware(http.HandlerFunc(handlers.JiraDisconnectHandler)))
mux.Handle("/api/integrations/slack/reply", middleware.AuthMiddleware(http.HandlerFunc(handlers.SlackReplyHandler)))
mux.Handle("/api/integrations/slack", middleware.AuthMiddleware(http.HandlerFunc(handlers.SlackDisconnectHandler)))
mux.Handle("/api/integrations/status", middleware.AuthMiddleware(http.HandlerFunc(handlers.IntegrationStatusHandler)))
mux.Handle("/api/integrations/", middleware.AuthMiddleware(http.HandlerFunc(handlers.DeleteIntegration)))

Expand Down Expand Up @@ -156,6 +159,7 @@ func main() {

// Webhook routes (public, but should verify signature in production)
mux.HandleFunc("/api/webhooks/github", handlers.GitHubWebhookHandler)
mux.HandleFunc("/api/webhooks/slack", handlers.SlackWebhookHandler)

// Apply CORS and logging middleware
handler := loggingMiddleware(middleware.CorsMiddleware(mux))
Expand Down
Binary file added sentinent.db-shm
Binary file not shown.
Binary file added sentinent.db-wal
Binary file not shown.
87 changes: 87 additions & 0 deletions services/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package services
import (
"crypto/hmac"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"errors"
Expand Down Expand Up @@ -319,6 +320,50 @@ func (c *SlackClient) MarkMessageAsRead(accessToken, channelID, timestamp string
return rateLimit, nil
}

// PostMessage posts a message to a channel, optionally in a thread
func (c *SlackClient) PostMessage(accessToken, channelID, text, threadTS string) (*RateLimitInfo, error) {
data := url.Values{}
data.Set("channel", channelID)
data.Set("text", text)
if threadTS != "" {
data.Set("thread_ts", threadTS)
}

req, err := http.NewRequest("POST", c.BaseURL+"/chat.postMessage", strings.NewReader(data.Encode()))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Authorization", "Bearer "+accessToken)

resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

rateLimit := extractRateLimit(resp)

body, err := io.ReadAll(resp.Body)
if err != nil {
return rateLimit, err
}

var result struct {
OK bool `json:"ok"`
Error string `json:"error"`
}
if err := json.Unmarshal(body, &result); err != nil {
return rateLimit, err
}

if !result.OK {
return rateLimit, &SlackAPIError{Code: result.Error}
}

return rateLimit, nil
}

// extractRateLimit extracts rate limit headers from the response
func extractRateLimit(resp *http.Response) *RateLimitInfo {
info := &RateLimitInfo{}
Expand Down Expand Up @@ -423,3 +468,45 @@ func (c *SlackClient) ValidateWebhookRequest(body []byte, signature, timestamp,

return nil
}

// SaveSlackSignal saves a Slack message as a signal in the database
func SaveSlackSignal(db *sql.DB, userID, workspaceID int, msg SlackMessage, channelID string, authorName string) error {
sourceID := channelID + ":" + msg.TS

// Store metadata
msgMetadata := map[string]interface{}{
"channel_id": channelID,
"ts": msg.TS,
"user_id": msg.User,
}
metadataJSON, _ := json.Marshal(msgMetadata)

title := truncateString(msg.Text, 100)
if title == "" {
title = "Slack Message"
}

// Use UPSERT to handle duplicates and updates
_, err := db.Exec(
`INSERT INTO signals
(user_id, workspace_id, source_type, source_id, external_id, title, content, body, author, status, source_metadata, received_at, updated_at)
VALUES (?, ?, 'slack', ?, ?, ?, ?, ?, ?, 'unread', ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(user_id, source_type, source_id) DO UPDATE SET
title = excluded.title,
content = excluded.content,
body = excluded.body,
author = excluded.author,
source_metadata = excluded.source_metadata,
updated_at = CURRENT_TIMESTAMP`,
userID, workspaceID, sourceID, msg.TS, title, msg.Text, msg.Text, authorName,
string(metadataJSON), time.Unix(msg.Timestamp, 0),
)
return err
}

func truncateString(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}
Loading
Loading