Skip to content

Commit 060924e

Browse files
committed
feat(asyncpg): Add query source to methods that were missing it
Add add_query_source() calls to execute(), executemany(), and cursor() methods. Previously only fetch() had query source tracking. Handle StreamedSpan and regular span cases separately since StreamedSpan requires the call while still inside the context manager. Add comprehensive test coverage for query source tracking across all wrapped methods.
1 parent 2869f76 commit 060924e

2 files changed

Lines changed: 274 additions & 56 deletions

File tree

sentry_sdk/integrations/asyncpg.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,17 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
143143
params_list = args[2] if len(args) > 2 else None
144144
with _record(None, query, params_list, executemany=executemany) as span:
145145
_set_db_data(span, args[0])
146+
146147
res = await f(*args, **kwargs)
147148

149+
if isinstance(span, StreamedSpan):
150+
with capture_internal_exceptions():
151+
add_query_source(span)
152+
153+
if not isinstance(span, StreamedSpan):
154+
with capture_internal_exceptions():
155+
add_query_source(span)
156+
148157
return res
149158

150159
return _inner
@@ -163,8 +172,17 @@ def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807
163172
executemany=False,
164173
) as span:
165174
_set_db_data(span, args[0])
175+
166176
res = f(*args, **kwargs)
167177

178+
if isinstance(span, StreamedSpan):
179+
with capture_internal_exceptions():
180+
add_query_source(span)
181+
182+
if not isinstance(span, StreamedSpan):
183+
with capture_internal_exceptions():
184+
add_query_source(span)
185+
168186
return res
169187

170188
return _inner

tests/integrations/asyncpg/test_asyncpg.py

Lines changed: 256 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,12 @@ async def test_query_source_enabled(
657657

658658
@pytest.mark.asyncio
659659
@pytest.mark.parametrize("span_streaming", [True, False])
660-
async def test_query_source(sentry_init, capture_events, capture_items, span_streaming):
660+
async def test_query_source_with_module_in_search_path(
661+
sentry_init, capture_events, capture_items, span_streaming
662+
):
663+
"""
664+
Test that query source is relative to the path of the module it ran in
665+
"""
661666
sentry_init(
662667
integrations=[AsyncPGIntegration()],
663668
traces_sample_rate=1.0,
@@ -668,13 +673,16 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str
668673
},
669674
)
670675

676+
from asyncpg_helpers.helpers import execute_query_in_connection
677+
671678
if span_streaming:
672679
items = capture_items("span")
673680
with sentry_sdk.traces.start_span(name="test_transaction"):
674681
conn: Connection = await connect(PG_CONNECTION_URI)
675682

676-
await conn.execute(
683+
await execute_query_in_connection(
677684
"INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')",
685+
conn,
678686
)
679687

680688
await conn.close()
@@ -698,13 +706,15 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str
698706
with start_transaction(name="test_transaction", sampled=True):
699707
conn: Connection = await connect(PG_CONNECTION_URI)
700708

701-
await conn.execute(
709+
await execute_query_in_connection(
702710
"INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')",
711+
conn,
703712
)
704713

705714
await conn.close()
706715

707716
(event,) = events
717+
708718
span = event["spans"][-1]
709719
assert span["description"].startswith("INSERT INTO")
710720
data = span.get("data", {})
@@ -719,61 +729,10 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str
719729

720730
assert type(data.get(lineno_key)) == int
721731
assert data.get(lineno_key) > 0
722-
assert (
723-
data.get(SPANDATA.CODE_NAMESPACE) == "tests.integrations.asyncpg.test_asyncpg"
724-
)
725-
assert data.get(filepath_key).endswith("tests/integrations/asyncpg/test_asyncpg.py")
726-
727-
is_relative_path = data.get(filepath_key)[0] != os.sep
728-
assert is_relative_path
729-
730-
assert data.get(SPANDATA.CODE_FUNCTION) == "test_query_source"
731-
732-
733-
@pytest.mark.asyncio
734-
async def test_query_source_with_module_in_search_path(sentry_init, capture_events):
735-
"""
736-
Test that query source is relative to the path of the module it ran in
737-
"""
738-
sentry_init(
739-
integrations=[AsyncPGIntegration()],
740-
traces_sample_rate=1.0,
741-
enable_db_query_source=True,
742-
db_query_source_threshold_ms=0,
743-
)
744-
745-
events = capture_events()
746-
747-
from asyncpg_helpers.helpers import execute_query_in_connection
748-
749-
with start_transaction(name="test_transaction", sampled=True):
750-
conn: Connection = await connect(PG_CONNECTION_URI)
751-
752-
await execute_query_in_connection(
753-
"INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')",
754-
conn,
755-
)
756-
757-
await conn.close()
758-
759-
(event,) = events
760-
761-
span = event["spans"][-1]
762-
assert span["description"].startswith("INSERT INTO")
763-
764-
data = span.get("data", {})
765-
766-
assert SPANDATA.CODE_LINENO in data
767-
assert SPANDATA.CODE_NAMESPACE in data
768-
assert SPANDATA.CODE_FILEPATH in data
769-
assert SPANDATA.CODE_FUNCTION in data
770-
771-
assert type(data.get(SPANDATA.CODE_LINENO)) == int
772-
assert data.get(SPANDATA.CODE_LINENO) > 0
732+
assert data.get(filepath_key) == "asyncpg_helpers/helpers.py"
773733
assert data.get(SPANDATA.CODE_NAMESPACE) == "asyncpg_helpers.helpers"
774-
assert data.get(SPANDATA.CODE_FILEPATH) == "asyncpg_helpers/helpers.py"
775734

776-
is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep
735+
is_relative_path = data.get(filepath_key)[0] != os.sep
777736
assert is_relative_path
778737

779738
assert data.get(SPANDATA.CODE_FUNCTION) == "execute_query_in_connection"
@@ -1102,3 +1061,244 @@ def before_send_transaction(event, hint):
11021061

11031062
assert len(spans) == 1
11041063
assert spans[0]["description"] == "filtered"
1064+
1065+
1066+
def _assert_query_source(span, span_streaming, expected_function):
1067+
if span_streaming:
1068+
data = span.get("attributes", {})
1069+
lineno_key = "code.line.number"
1070+
filepath_key = "code.file.path"
1071+
else:
1072+
data = span.get("data", {})
1073+
lineno_key = SPANDATA.CODE_LINENO
1074+
filepath_key = SPANDATA.CODE_FILEPATH
1075+
1076+
assert lineno_key in data
1077+
assert filepath_key in data
1078+
assert SPANDATA.CODE_NAMESPACE in data
1079+
assert SPANDATA.CODE_FUNCTION in data
1080+
1081+
assert type(data.get(lineno_key)) == int
1082+
assert data.get(lineno_key) > 0
1083+
assert data[SPANDATA.CODE_NAMESPACE] == "tests.integrations.asyncpg.test_asyncpg"
1084+
assert data.get(filepath_key).endswith("tests/integrations/asyncpg/test_asyncpg.py")
1085+
assert data.get(filepath_key)[0] != os.sep
1086+
assert data[SPANDATA.CODE_FUNCTION] == expected_function
1087+
1088+
1089+
@pytest.mark.asyncio
1090+
@pytest.mark.parametrize("span_streaming", [True, False])
1091+
async def test_query_source_execute(
1092+
sentry_init, capture_events, capture_items, span_streaming
1093+
):
1094+
sentry_init(
1095+
integrations=[AsyncPGIntegration()],
1096+
traces_sample_rate=1.0,
1097+
enable_db_query_source=True,
1098+
db_query_source_threshold_ms=0,
1099+
_experiments={
1100+
"trace_lifecycle": "stream" if span_streaming else "static",
1101+
},
1102+
)
1103+
1104+
if span_streaming:
1105+
items = capture_items("span")
1106+
with sentry_sdk.traces.start_span(name="test_transaction"):
1107+
conn: Connection = await connect(PG_CONNECTION_URI)
1108+
await conn.execute(
1109+
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
1110+
"Alice",
1111+
"pw",
1112+
datetime.date(1990, 12, 25),
1113+
)
1114+
await conn.close()
1115+
sentry_sdk.flush()
1116+
1117+
spans = [item.payload for item in items]
1118+
assert len(spans) == 3
1119+
1120+
connect_span = spans[0]
1121+
insert_span = spans[1]
1122+
segment = spans[2]
1123+
1124+
assert connect_span["name"] == "connect"
1125+
assert insert_span["name"].startswith("INSERT INTO")
1126+
assert segment["name"] == "test_transaction"
1127+
assert segment["is_segment"] is True
1128+
query_span = spans[1]
1129+
else:
1130+
events = capture_events()
1131+
with start_transaction(name="test_transaction", sampled=True):
1132+
conn: Connection = await connect(PG_CONNECTION_URI)
1133+
await conn.execute(
1134+
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
1135+
"Alice",
1136+
"pw",
1137+
datetime.date(1990, 12, 25),
1138+
)
1139+
await conn.close()
1140+
1141+
(event,) = events
1142+
spans = event["spans"]
1143+
assert len(spans) == 2
1144+
assert spans[0]["description"] == "connect"
1145+
assert spans[1]["description"].startswith("INSERT INTO")
1146+
query_span = spans[1]
1147+
1148+
_assert_query_source(query_span, span_streaming, "test_query_source_execute")
1149+
1150+
1151+
@pytest.mark.asyncio
1152+
@pytest.mark.parametrize("span_streaming", [True, False])
1153+
async def test_query_source_executemany(
1154+
sentry_init, capture_events, capture_items, span_streaming
1155+
):
1156+
sentry_init(
1157+
integrations=[AsyncPGIntegration()],
1158+
traces_sample_rate=1.0,
1159+
enable_db_query_source=True,
1160+
db_query_source_threshold_ms=0,
1161+
_experiments={
1162+
"trace_lifecycle": "stream" if span_streaming else "static",
1163+
},
1164+
)
1165+
1166+
if span_streaming:
1167+
items = capture_items("span")
1168+
with sentry_sdk.traces.start_span(name="test_transaction"):
1169+
conn: Connection = await connect(PG_CONNECTION_URI)
1170+
await conn.executemany(
1171+
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
1172+
[("Bob", "secret_pw", datetime.date(1984, 3, 1))],
1173+
)
1174+
await conn.close()
1175+
sentry_sdk.flush()
1176+
1177+
spans = [item.payload for item in items]
1178+
assert len(spans) == 3
1179+
assert spans[0]["name"] == "connect"
1180+
assert spans[1]["name"].startswith("INSERT INTO")
1181+
assert spans[2]["name"] == "test_transaction"
1182+
query_span = spans[1]
1183+
else:
1184+
events = capture_events()
1185+
with start_transaction(name="test_transaction", sampled=True):
1186+
conn: Connection = await connect(PG_CONNECTION_URI)
1187+
await conn.executemany(
1188+
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
1189+
[("Bob", "secret_pw", datetime.date(1984, 3, 1))],
1190+
)
1191+
await conn.close()
1192+
1193+
(event,) = events
1194+
spans = event["spans"]
1195+
assert len(spans) == 2
1196+
assert spans[0]["description"] == "connect"
1197+
assert spans[1]["description"].startswith("INSERT INTO")
1198+
query_span = spans[1]
1199+
1200+
_assert_query_source(query_span, span_streaming, "test_query_source_executemany")
1201+
1202+
1203+
@pytest.mark.asyncio
1204+
@pytest.mark.parametrize("span_streaming", [True, False])
1205+
async def test_query_source_prepare(
1206+
sentry_init, capture_events, capture_items, span_streaming
1207+
):
1208+
sentry_init(
1209+
integrations=[AsyncPGIntegration()],
1210+
traces_sample_rate=1.0,
1211+
enable_db_query_source=True,
1212+
db_query_source_threshold_ms=0,
1213+
_experiments={
1214+
"trace_lifecycle": "stream" if span_streaming else "static",
1215+
},
1216+
)
1217+
1218+
if span_streaming:
1219+
items = capture_items("span")
1220+
with sentry_sdk.traces.start_span(name="test_transaction"):
1221+
conn: Connection = await connect(PG_CONNECTION_URI)
1222+
await conn.prepare("SELECT * FROM users WHERE name = $1")
1223+
await conn.close()
1224+
sentry_sdk.flush()
1225+
1226+
spans = [item.payload for item in items]
1227+
assert len(spans) == 3
1228+
assert spans[0]["name"] == "connect"
1229+
assert spans[1]["name"] == "SELECT * FROM users WHERE name = $1"
1230+
assert spans[2]["name"] == "test_transaction"
1231+
query_span = spans[1]
1232+
else:
1233+
events = capture_events()
1234+
with start_transaction(name="test_transaction", sampled=True):
1235+
conn: Connection = await connect(PG_CONNECTION_URI)
1236+
await conn.prepare("SELECT * FROM users WHERE name = $1")
1237+
await conn.close()
1238+
1239+
(event,) = events
1240+
spans = event["spans"]
1241+
assert len(spans) == 2
1242+
assert spans[0]["description"] == "connect"
1243+
assert spans[1]["description"] == "SELECT * FROM users WHERE name = $1"
1244+
query_span = spans[1]
1245+
1246+
_assert_query_source(query_span, span_streaming, "test_query_source_prepare")
1247+
1248+
1249+
@pytest.mark.asyncio
1250+
@pytest.mark.parametrize("span_streaming", [True, False])
1251+
async def test_query_source_cursor(
1252+
sentry_init, capture_events, capture_items, span_streaming
1253+
):
1254+
sentry_init(
1255+
integrations=[AsyncPGIntegration()],
1256+
traces_sample_rate=1.0,
1257+
enable_db_query_source=True,
1258+
db_query_source_threshold_ms=0,
1259+
_experiments={
1260+
"trace_lifecycle": "stream" if span_streaming else "static",
1261+
},
1262+
)
1263+
1264+
if span_streaming:
1265+
items = capture_items("span")
1266+
with sentry_sdk.traces.start_span(name="test_transaction"):
1267+
conn: Connection = await connect(PG_CONNECTION_URI)
1268+
async with conn.transaction():
1269+
async for _ in conn.cursor(
1270+
"SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1)
1271+
):
1272+
pass
1273+
await conn.close()
1274+
sentry_sdk.flush()
1275+
1276+
spans = [item.payload for item in items]
1277+
assert len(spans) == 5
1278+
assert spans[0]["name"] == "connect"
1279+
assert spans[1]["name"] == "BEGIN;"
1280+
assert spans[2]["name"] == "SELECT * FROM users WHERE dob > $1"
1281+
assert spans[3]["name"] == "COMMIT;"
1282+
assert spans[4]["name"] == "test_transaction"
1283+
query_span = spans[2]
1284+
else:
1285+
events = capture_events()
1286+
with start_transaction(name="test_transaction", sampled=True):
1287+
conn: Connection = await connect(PG_CONNECTION_URI)
1288+
async with conn.transaction():
1289+
async for _ in conn.cursor(
1290+
"SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1)
1291+
):
1292+
pass
1293+
await conn.close()
1294+
1295+
(event,) = events
1296+
spans = event["spans"]
1297+
assert len(spans) == 4
1298+
assert spans[0]["description"] == "connect"
1299+
assert spans[1]["description"] == "BEGIN;"
1300+
assert spans[2]["description"] == "SELECT * FROM users WHERE dob > $1"
1301+
assert spans[3]["description"] == "COMMIT;"
1302+
query_span = spans[2]
1303+
1304+
_assert_query_source(query_span, span_streaming, "test_query_source_cursor")

0 commit comments

Comments
 (0)