-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathevents_async.py
More file actions
95 lines (80 loc) · 2.93 KB
/
events_async.py
File metadata and controls
95 lines (80 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import asyncio
import os
import resend
if not os.environ["RESEND_API_KEY"]:
raise EnvironmentError("RESEND_API_KEY is missing")
async def main() -> None:
# Create a contact to use with event sends
print("--- Create contact ---")
contact: resend.Contacts.CreateContactResponse = await resend.Contacts.create_async(
{
"email": "test-events-async@example.com",
"first_name": "Test",
"last_name": "User",
}
)
contact_id = contact["id"]
print(f"Contact: {contact_id}")
# --- Create event ---
print("\n--- Create event ---")
created: resend.Events.CreateResponse = await resend.Events.create_async(
{
"name": "user.signed_up",
"schema": {
"plan": "string",
"trial_days": "number",
"is_enterprise": "boolean",
"upgraded_at": "date",
},
}
)
event_id = created["id"]
print(f"Created event: {event_id}")
# --- Get event by ID ---
print("\n--- Get event by ID ---")
event: resend.Event = await resend.Events.get_async(event_id)
print(f"Name: {event['name']}, schema: {event['schema']}")
# --- Get event by name ---
print("\n--- Get event by name ---")
event_by_name: resend.Event = await resend.Events.get_async("user.signed_up")
print(f"Found by name: {event_by_name['name']}")
# --- Update event schema ---
print("\n--- Update event schema ---")
updated: resend.Events.UpdateResponse = await resend.Events.update_async(
{
"identifier": "user.signed_up",
"schema": {"plan": "string", "source": "string"},
}
)
print(f"Updated event: {updated['id']}")
# --- Send event with contact_id ---
print("\n--- Send event with contact_id ---")
sent: resend.Events.SendResponse = await resend.Events.send_async(
{
"event": "user.signed_up",
"contact_id": contact_id,
"payload": {"plan": "pro"},
}
)
print(f"Sent event: {sent['event']}")
# --- Send event with email ---
print("\n--- Send event with email ---")
sent_email: resend.Events.SendResponse = await resend.Events.send_async(
{
"event": "user.signed_up",
"email": "test-events-async@example.com",
}
)
print(f"Sent event: {sent_email['event']}")
# --- List events ---
print("\n--- List events ---")
list_resp: resend.Events.ListResponse = await resend.Events.list_async()
print(f"Total: {len(list_resp['data'])}, has_more: {list_resp['has_more']}")
# --- Cleanup ---
print("\n--- Delete event ---")
deleted: resend.Events.DeleteResponse = await resend.Events.remove_async(event_id)
print(f"Deleted: {deleted['deleted']}")
print("\n--- Delete contact ---")
await resend.Contacts.remove_async(id=contact_id)
print(f"Deleted contact: {contact_id}")
asyncio.run(main())