Skip to content

Commit b85fe66

Browse files
committed
Make comments clearer and add part about custom flag definition cache provider
1 parent 0ff5b86 commit b85fe66

2 files changed

Lines changed: 22 additions & 15 deletions

File tree

examples/celery_integration.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,23 +71,26 @@ def create_integration() -> PosthogCeleryIntegration:
7171

7272

7373
# --- Worker process setup ---
74-
# Celery's default prefork pool runs tasks in child processes. This example
75-
# runs on a single host, so the inherited PostHog client and Celery
76-
# integration are fork-safe and do not need to be recreated in each child.
77-
# If workers run across multiple hosts, configure PostHog and instrument a
78-
# worker-local integration in worker_process_init.
79-
@worker_process_init.connect
80-
def on_worker_process_init(**kwargs) -> None:
81-
# global integration
82-
83-
# configure_posthog()
84-
# integration = create_integration()
85-
# integration.instrument()
86-
return
74+
# On a single host the forked child inherits the PostHog client and
75+
# integration, so nothing extra is needed. If workers run on different
76+
# hosts, uncomment the signal and handler below to initialise a fresh
77+
# client and integration in each worker process. If using a custom flag
78+
# definition cache provider, reinitialize your client in each worker with
79+
# the custom provider, and reinstrument the integration with that new client
80+
# instance.
81+
# @worker_process_init.connect
82+
# def on_worker_process_init(**kwargs) -> None:
83+
# global integration
84+
# configure_posthog()
85+
# integration = create_integration()
86+
# integration.instrument()
87+
# return
8788

8889

8990
# Use this signal to shutdown the integration and PostHog client
90-
# Calling shutdown() is important to flush any pending events
91+
# in the worker processes. Calling shutdown() is important to flush
92+
# any pending events and is required even if the workers are running
93+
# on the same host as the producer.
9194
@worker_process_shutdown.connect
9295
def on_worker_process_shutdown(**kwargs) -> None:
9396
integration.shutdown()
@@ -186,5 +189,6 @@ def failing_task() -> None:
186189
print("Tasks dispatched. Check your Celery worker logs and PostHog for events.")
187190
print()
188191

192+
# Shut down the integration and client in producer process
189193
integration.shutdown()
190194
posthog.shutdown()

posthog/integrations/celery.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@ class PosthogCeleryIntegration:
8686
Args:
8787
client: Optional ``Client`` instance. When provided, all events and
8888
exceptions are captured through this client rather than the
89-
global ``posthog`` module.
89+
global ``posthog`` module. Don't skip this if using a custom flag
90+
definition cache provider, and pass the custom ``Client`` instance
91+
here initialized with the custom provider so fork safety for that
92+
provider is handled correctly.
9093
capture_exceptions: Whether to capture task exceptions via
9194
``capture_exception`` (default ``True``).
9295
capture_task_lifecycle_events: Whether to emit lifecycle events of the task

0 commit comments

Comments
 (0)