-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkafka_consumer.py
More file actions
66 lines (53 loc) · 2.33 KB
/
kafka_consumer.py
File metadata and controls
66 lines (53 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import json
import pandas as pd
from kafka import KafkaConsumer
import joblib
from scripts.prepare_labeled_data import parse_meta
from scripts.neo4j_updater import push_anomalies_to_neo4j
# Load trained model and expected feature columns
model, feature_cols = joblib.load("trained_fraud_model.pkl")
# Set of all columns that the model expects
required_columns = feature_cols.copy()
consumer = KafkaConsumer(
'transactions',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='fraud-detector',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print("✅ Kafka Consumer listening on topic 'transactions'...")
for message in consumer:
record = message.value
print(f"\n📥 Received transaction: {record.get('user_uid')} - {record.get('transaction_amount')}")
# Convert single transaction to DataFrame
df = pd.DataFrame([record])
# Parse meta_data JSON and flatten it
if "meta_data" in df.columns:
meta_df = df["meta_data"].apply(parse_meta).apply(pd.Series)
df = pd.concat([df.drop(columns=["meta_data"]), meta_df], axis=1)
# Extract time-based features
df["transaction_dttm"] = pd.to_datetime(df["transaction_dttm"], errors="coerce")
df["hour"] = df["transaction_dttm"].dt.hour
df["dayofweek"] = df["transaction_dttm"].dt.dayofweek
# One-hot encode relevant fields
categorical_cols = ["transaction_type", "transaction_currency", "terminal_uid"]
df = pd.get_dummies(df, columns=categorical_cols, prefix=categorical_cols)
# Add missing one-hot columns (not seen during training)
missing_cols = [col for col in required_columns if col not in df.columns]
if missing_cols:
print(f"[WARN] Adding missing columns: {missing_cols}")
filler_df = pd.DataFrame(0, index=df.index, columns=missing_cols)
df = pd.concat([df, filler_df], axis=1)
# Ensure column order and defragment
df = df.reindex(columns=required_columns)
df = df.copy()
# Run prediction
X = df[feature_cols]
df["predicted_label"] = model.predict(X)
if df["predicted_label"].iloc[0] == 1:
print("🚨 Fraud detected!")
print(df[["first_name", "last_name", "transaction_amount", "transaction_dttm"]])
push_anomalies_to_neo4j(df)
else:
print("✅ Transaction is normal.")