From 4485492bf80e8bd820829d3411697a4398356af6 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 30 Jun 2026 12:21:58 -0700 Subject: [PATCH 1/2] only return exceptions to the user and fixed the flaky test case found in the ci --- .../texera/amber/engine/common/rpc/AsyncRPCServer.scala | 8 ++++---- .../amber/engine/architecture/worker/WorkerSpec.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala index e9a3e2cc455..318b641a850 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala @@ -100,12 +100,12 @@ class AsyncRPCServer( } catch { case err: Throwable => - // if error occurs, return it to the sender. + // Reply to the sender with the error so the caller can handle it. logger.error("Exception occurred", err) returnResult(senderID, id, mkControlError(err)) - // if throw this exception right now, the above message might not be able - // to be sent out. We do not throw for now. - // throw err + // Re-throw Errors (e.g. failed assertions) after replying; only + // Exceptions are returned to the sender and recovered from. + if (err.isInstanceOf[Error]) throw err } } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala index df9cb086a60..25686ade433 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala @@ -173,7 +173,7 @@ class WorkerSpec ) val addPort1 = AsyncRPCClient.ControlInvocation( METHOD_ASSIGN_PORT, - AssignPortRequest(mockPortId, input = true, mkSchema(1).toRawSchema, List(""), List()), + AssignPortRequest(mockPortId, input = true, mkSchema(1).toRawSchema, List(), List()), AsyncRPCContext(CONTROLLER, identifier1), 1 ) From 12c1107566f4c1364b11a44a9ce4bca795e34c03 Mon Sep 17 00:00:00 2001 From: "Matthew B." Date: Tue, 30 Jun 2026 15:43:07 -0700 Subject: [PATCH 2/2] Clarify error handling comment in AsyncRPCServer Updated error handling comment for clarity. Signed-off-by: Matthew B. --- .../apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala index 318b641a850..b6a5e2594c1 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala @@ -100,7 +100,7 @@ class AsyncRPCServer( } catch { case err: Throwable => - // Reply to the sender with the error so the caller can handle it. + // if error occurs, return it to the sender. logger.error("Exception occurred", err) returnResult(senderID, id, mkControlError(err)) // Re-throw Errors (e.g. failed assertions) after replying; only