A runnable, interview‑ready real‑time crypto data warehouse: Binance/Coinbase WebSockets → Kafka → Flink SQL → ClickHouse (facts) + Postgres (dims & governance) → Prometheus/Grafana (observability).
Project Structure & File list
rt-crypto-warehouse/
├─ README.md
├─ docker/
│ ├─ docker-compose.yml
│ ├─ grafana/
│ │ └─ dashboards/crypto.json
│ ├─ prometheus/
│ │ └─ prometheus.yml
│ └─ grafana/provisioning/...
├─ infra/
│ ├─ makefile # 快捷命令(build/run/seed/submit)
│ ├─ .env.example # 本地配置模板
│ └─ ksql/ # 可选:ksql 测试脚本
├─ producers/ # WebSocket → Kafka
│ ├─ binance_producer.py
│ ├─ coinbase_producer.py
│ ├─ common.py
│ └─ requirements.txt
├─ flink/
│ ├─ sql/
│ │ ├─ 00_kafka_connectors.sql
│ │ ├─ 01_raw_tables.sql # 原始层(ODS)
│ │ ├─ 02_cleansed_views.sql # 清洗层(CDM)
│ │ ├─ 03_dwd_trades.sql # 明细事实层(DWD)
│ │ ├─ 04_dwm_candles_1m.sql # 聚合层(DWM - 分钟K线)
│ │ ├─ 05_quality_checks.sql # 数据质量计算
│ │ └─ 06_sinks_clickhouse.sql # 落地 ClickHouse
│ └─ submit.sh # 提交 SQL 作业
├─ warehouse/
│ ├─ clickhouse/
│ │ ├─ 00_databases.sql
│ │ ├─ 10_dim_tables.sql
│ │ ├─ 20_fact_trades.sql
│ │ ├─ 21_fact_orderbook_snapshots.sql
│ │ └─ 30_fact_candles.sql
│ └─ postgres/
│ ├─ 00_databases.sql
│ ├─ 10_dims.sql
│ ├─ 20_governance.sql # 血缘/质量元数据表
│ └─ 90_seed.sql # 初始字典与映射种子数据
├─ governance/
│ ├─ dq_rules.md # 质量规则文档
│ ├─ lineage.md # 简要血缘
│ └─ alerts.md # 报警策略说明
├─ tests/
│ ├─ test_producers.py
│ ├─ test_sql_quality.sql
│ └─ sample_messages.json
└─ scripts/
├─ seed_postgres.sh
├─ init_clickhouse.sh
└─ demo_queries.sql
WebSocket → 标准化 JSON → Kafka 原始主题(raw.binance.trades、raw.coinbase.trades、raw.*.orderbook)
尽量保持“不丢信息、不做重逻辑”,仅做时间戳与必需字段补齐
清洗层(CDM:Conformed Data Model)
Flink SQL 对 ODS 做:字段归一、时区统一(UTC)、数据类型校正、异常值过滤、去重与乱序处理(Watermark)
输出标准化 Topic:cdm.trades、cdm.orderbook
将 CDM 明细写入 ClickHouse fact_trades、fact_orderbook_snapshots(行式+分区/主键设计、去重)
基于 DWD 计算分钟/5分钟/小时 K 线(OHLCV)、成交额、成交笔数等
汇总表写入 ClickHouse:fact_candles_1m 等
Postgres:dim_exchange、dim_symbol、dim_asset、dq_metrics、job_runs、lineage_edges 等。
存放统一符号映射(如 BTCUSDT ⇄ BTC-USD),及质量/血缘元数据。
Grafana + Prometheus 报表、指标与报警
可选)REST/GraphQL 提供查询 API
字段
类型
说明
event_time
TIMESTAMP(3) UTC
事件时间(交易发生)
ingest_time
TIMESTAMP(3) UTC
采集时间
exchange
STRING
BINANCE / COINBASE
symbol_norm
STRING
统一符号,如 BTC-USD
symbol_src
STRING
原始符号,如 BTCUSDT / BTC-USD
trade_id
STRING
交易唯一 ID(exchange + original_id)
side
STRING
BUY/SELL(按成交方向归一)
price
DECIMAL(38, 10)
成交价(USD 或报价币)
qty
DECIMAL(38, 10)
成交量(基础币)
notional
DECIMAL(38, 10)
成交额(= price * qty)
maker
BOOLEAN
是否为 maker
source_ts
TIMESTAMP(3)
原始事件时间,保留审计
OrderBook Snapshot/L2 Model
字段
类型
说明
event_time
TIMESTAMP(3)
exchange
STRING
symbol_norm
STRING
bids
ARRAY<ROW<price DECIMAL(38,10), qty DECIMAL(38,10)>>
Top N
asks
ARRAY<ROW<price DECIMAL(38,10), qty DECIMAL(38,10)>>
seq
BIGINT
序号(源流)
checksum
STRING
选填,用于一致性校验
dim_exchange(exchange_code, name, timezone, url, enabled)
dim_symbol(symbol_norm, base_asset, quote_asset, precision_price, precision_qty, coinbase_id, binance_id, enabled)
dim_asset(asset_code, asset_name)