diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala index e9a3e2cc455..b6a5e2594c1 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCServer.scala @@ -103,9 +103,9 @@ class AsyncRPCServer( // if error occurs, return it to the sender. logger.error("Exception occurred", err) returnResult(senderID, id, mkControlError(err)) - // if throw this exception right now, the above message might not be able - // to be sent out. We do not throw for now. - // throw err + // Re-throw Errors (e.g. failed assertions) after replying; only + // Exceptions are returned to the sender and recovered from. + if (err.isInstanceOf[Error]) throw err } } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala index df9cb086a60..25686ade433 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala @@ -173,7 +173,7 @@ class WorkerSpec ) val addPort1 = AsyncRPCClient.ControlInvocation( METHOD_ASSIGN_PORT, - AssignPortRequest(mockPortId, input = true, mkSchema(1).toRawSchema, List(""), List()), + AssignPortRequest(mockPortId, input = true, mkSchema(1).toRawSchema, List(), List()), AsyncRPCContext(CONTROLLER, identifier1), 1 )