@@ -42,45 +42,45 @@ type LocalMessage struct {
4242 ResponseCh chan interface {} // Channel to send response back
4343}
4444
45- type worker struct {
45+ type frankenphpWorker struct {
4646 requestChan chan jetstream.Msg // Channel for NATS messages
4747 localMessageChan chan * LocalMessage // Channel for local synchronous requests
4848 running bool
4949}
5050
51- var globalWorkerInstance * worker
51+ var globalWorkerInstance * frankenphpWorker
5252
53- func (w * worker ) Name () string {
53+ func (w * frankenphpWorker ) Name () string {
5454 return "m#durable-php"
5555}
5656
57- func (w * worker ) FileName () string {
57+ func (w * frankenphpWorker ) FileName () string {
5858 // check if target exists
59- if _ , err := os .Stat ("src/Glue/worker .php" ); ! os .IsNotExist (err ) {
60- return "src/Glue/worker .php"
59+ if _ , err := os .Stat ("src/Glue/frankenphpWorker .php" ); ! os .IsNotExist (err ) {
60+ return "src/Glue/frankenphpWorker .php"
6161 }
6262
63- return "vendor/bottledcode/durable-php/src/Glue/worker .php"
63+ return "vendor/bottledcode/durable-php/src/Glue/frankenphpWorker .php"
6464}
6565
66- func (w * worker ) Env () frankenphp.PreparedEnv {
66+ func (w * frankenphpWorker ) Env () frankenphp.PreparedEnv {
6767 return frankenphp.PreparedEnv {}
6868}
6969
70- func (w * worker ) GetMinThreads () int {
70+ func (w * frankenphpWorker ) GetMinThreads () int {
7171 return 4
7272}
7373
74- func (w * worker ) ThreadActivatedNotification (threadId int ) {
74+ func (w * frankenphpWorker ) ThreadActivatedNotification (threadId int ) {
7575}
7676
77- func (w * worker ) ThreadDrainNotification (threadId int ) {
77+ func (w * frankenphpWorker ) ThreadDrainNotification (threadId int ) {
7878}
7979
80- func (w * worker ) ThreadDeactivatedNotification (threadId int ) {
80+ func (w * frankenphpWorker ) ThreadDeactivatedNotification (threadId int ) {
8181}
8282
83- func (w * worker ) ProvideRequest () * frankenphp.WorkerRequest {
83+ func (w * frankenphpWorker ) ProvideRequest () * frankenphp.WorkerRequest {
8484 // Select between NATS messages and local messages
8585 select {
8686 case msg := <- w .requestChan :
@@ -156,15 +156,15 @@ func processMessage(msg jetstream.Msg) *frankenphp.WorkerRequest {
156156 kind = ids .Orchestration
157157 }
158158
159- // Create worker context for this message
159+ // Create frankenphpWorker context for this message
160160 worker := & Worker {
161161 kind : kind ,
162162 currentMsg : msg ,
163163 }
164164
165165 worker .currentCtx = lib .GetCorrelationId (ctx , nil , & headers )
166166
167- // Set the current worker for PHP access BEFORE processing/authorization
167+ // Set the current frankenphpWorker for PHP access BEFORE processing/authorization
168168 helpers .Logger .Info ("About to call SetCurrentWorker" )
169169 SetCurrentWorker (worker )
170170 helpers .Logger .Info ("SetCurrentWorker completed" )
@@ -277,7 +277,7 @@ func init() {
277277 frankenphp .RegisterExtension (unsafe .Pointer (& C .ext_module_entry ))
278278
279279 // initialize the workers
280- globalWorkerInstance = & worker {
280+ globalWorkerInstance = & frankenphpWorker {
281281 requestChan : make (chan jetstream.Msg , 100 ), // Buffer for 100 messages
282282 localMessageChan : make (chan * LocalMessage , 10 ), // Buffer for 10 local messages
283283 }
@@ -309,7 +309,7 @@ func SendLocalMessage(method, stateId string, context map[string]interface{}) (i
309309 }
310310}
311311
312- // StartNATSMessageFeeder starts goroutines that feed NATS messages to the worker channel
312+ // StartNATSMessageFeeder starts goroutines that feed NATS messages to the frankenphpWorker channel
313313func StartNATSMessageFeeder (ctx context.Context , cfg * config.Config , logger * zap.Logger ) {
314314 js := helpers .Js
315315
@@ -346,7 +346,7 @@ func StartNATSMessageFeeder(ctx context.Context, cfg *config.Config, logger *zap
346346 break // Break inner loop to recreate consumer
347347 }
348348
349- // Send message to worker channel
349+ // Send message to frankenphpWorker channel
350350 select {
351351 case globalWorkerInstance .requestChan <- msg :
352352 // Message sent successfully
@@ -359,37 +359,6 @@ func StartNATSMessageFeeder(ctx context.Context, cfg *config.Config, logger *zap
359359 }
360360}
361361
362- func Authorize (ctx context.Context , ev * glue.EventMessage , from * ids.StateId , preventCreation bool , operation auth.Operation ) (bool , error ) {
363- if ! helpers .Config .Extensions .Authz .Enabled {
364- return true , nil
365- }
366-
367- rm := auth .GetResourceManager (ctx , helpers .Js )
368- r , err := rm .DiscoverResource (ctx , ids .ParseStateId (ev .Destination ), from , helpers .Logger , preventCreation )
369- if err != nil {
370- helpers .Logger .Error ("AUTHORIZATION FAILURE: Request blocked during resource discovery" ,
371- zap .String ("error" , err .Error ()),
372- zap .String ("destination" , ev .Destination ),
373- zap .String ("phase" , "resource-discovery" ),
374- zap .String ("action" , "request-rejected" ))
375- return false , err
376- }
377- if r == nil {
378- return false , nil
379- }
380-
381- if ! r .WantTo (operation , ctx ) {
382- helpers .Logger .Error ("AUTHORIZATION FAILURE: Operation not permitted" ,
383- zap .String ("operation" , string (operation )),
384- zap .String ("destination" , ev .Destination ),
385- zap .String ("phase" , "operation-check" ),
386- zap .String ("action" , "request-rejected" ))
387- return false , errors .New ("user is not authorised" )
388- }
389-
390- return true , nil
391- }
392-
393362//export go_init_module
394363func go_init_module () {
395364 cfg , err := config .GetProjectConfig ()
@@ -664,7 +633,7 @@ func go_init_module() {
664633 }
665634 }
666635
667- // Start NATS message feeder for worker
636+ // Start NATS message feeder for frankenphpWorker
668637 StartNATSMessageFeeder (ctx , cfg , logger )
669638}
670639
@@ -792,48 +761,6 @@ func create_Worker_object() C.uintptr_t {
792761 obj := & Worker {}
793762 return registerGoObject (obj )
794763}
795- func (w * Worker ) startEventLoop (kindStr * C.zend_string ) {
796- kind := ids .IdKind (frankenphp .GoString (unsafe .Pointer (kindStr )))
797-
798- switch kind {
799- case ids .Activity :
800- case ids .Entity :
801- case ids .Orchestration :
802- default :
803- helpers .LogError ("Invalid event kind" )
804- return
805- }
806- w .kind = kind
807-
808- if w .started {
809- helpers .LogError ("Event loop already running" )
810- return
811- }
812-
813- ctx , done := context .WithCancel (helpers .Ctx )
814-
815- c := & helpers.Consumer {
816- Context : ctx ,
817- Done : done ,
818- }
819-
820- stream , err := helpers .Js .Stream (ctx , helpers .Config .Stream )
821- if err != nil {
822- helpers .LogError (err .Error ())
823- return
824- }
825-
826- c .Msg = lib .StartConsumer (ctx , helpers .Config , stream , helpers .Logger , w .kind )
827- w .consumer = c
828- }
829-
830- func (w * Worker ) drainEventLoop () {
831- if ! w .started {
832- return
833- }
834- w .consumer .Msg .Drain ()
835- w .started = false
836- }
837764
838765func (w * Worker ) __destruct () {
839766 w .consumer .Msg .Stop ()
@@ -868,6 +795,16 @@ func (w *Worker) getUser() unsafe.Pointer {
868795 return nil
869796}
870797
798+ func (w * Worker ) setUser (userArr * C.zval ) {
799+ if userArr == nil {
800+ w .currentCtx = context .WithValue (w .currentCtx , appcontext .CurrentUserKey , nil )
801+ }
802+
803+ arrVal := frankenphp .GoArray (unsafe .Pointer (userArr ))
804+ user := helpers .GetUserContext (arrVal )
805+ w .currentCtx = context .WithValue (w .currentCtx , appcontext .CurrentUserKey , user )
806+ }
807+
871808func (w * Worker ) getSource () unsafe.Pointer {
872809 sourceId := ids .ParseStateId (w .currentMsg .Headers ().Get (string (glue .HeaderEmittedBy )))
873810 return frankenphp .PHPString (sourceId .String (), false )
@@ -999,7 +936,18 @@ func delete_wrapper(handle C.uintptr_t) {
999936 structObj .delete ()
1000937}
1001938
1002- // SetCurrentWorker sets the current worker context for the PHP extension
939+ //export setUser_wrapper
940+ func setUser_wrapper (handle C.uintptr_t , user * C.zval ) {
941+ obj := getGoObject (handle )
942+ if obj == nil {
943+ return
944+ }
945+
946+ structObj := obj .(* Worker )
947+ structObj .setUser (user )
948+ }
949+
950+ // SetCurrentWorker sets the current frankenphpWorker context for the PHP extension
1003951func SetCurrentWorker (worker * Worker ) {
1004952 if worker == nil {
1005953 C .clear_current_worker ()
@@ -1009,17 +957,3 @@ func SetCurrentWorker(worker *Worker) {
1009957 handle := registerGoObject (worker )
1010958 C .set_current_worker_handle (C .uintptr_t (handle ))
1011959}
1012-
1013- // ClearCurrentWorker clears the current worker context
1014- func ClearCurrentWorker () {
1015- C .clear_current_worker ()
1016- }
1017-
1018- // GetWorkerInstance returns the global worker instance for feeding requests
1019- func GetWorkerInstance () * worker {
1020- return globalWorkerInstance
1021- }
1022-
1023- // InjectWorkerRequest removed - worker now pulls directly from NATS consumers
1024-
1025- // StartWorkerConsumer removed - NATS consumer logic now integrated into ProvideRequest method
0 commit comments