Skip to content

Commit cedb855

Browse files
author
Zhang Wenhao
committed
<fix>[cloudbus]: support send confirm
1. send in CloudBus means submit a message send task. function return do not means sending completion. so there is no way to set a limition in sender. add a FutureCompletion to confirm message send. provide a way to limited send. 2. fix mn heartbeat thread interrupt handle 3. refactor duplicate code for getChainInfo, fix synchronized target. This patch is cherry-picked from ZSTAC-71213 (commit: af58834) Resolves: ZSV-11510 Change-Id: I686361627566737a7775706263796b736967666c
1 parent 421e9c8 commit cedb855

15 files changed

Lines changed: 456 additions & 131 deletions

File tree

conf/errorCodes/sys.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,10 @@
105105
<id>1090</id>
106106
<description>Multiple reasons</description>
107107
</code>
108+
109+
<code>
110+
<id>2111</id>
111+
<description>CloudBus message sending related error</description>
112+
</code>
108113
</error>
109114

core/src/main/java/org/zstack/core/Platform.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.lang.reflect.InvocationTargetException;
5151
import java.lang.reflect.Method;
5252
import java.lang.reflect.Modifier;
53+
import java.net.Inet4Address;
5354
import java.net.InetAddress;
5455
import java.net.NetworkInterface;
5556
import java.net.SocketException;
@@ -817,9 +818,14 @@ private static String getManagementServerIpInternal() {
817818
for (NetworkInterface iface : Collections.list(nets)) {
818819
String name = iface.getName();
819820
if (defaultLine.contains(name)) {
820-
InetAddress ia = iface.getInetAddresses().nextElement();
821-
ip = ia.getHostAddress();
822-
break;
821+
for (InetAddress ia : Collections.list(iface.getInetAddresses())) {
822+
ip = ia.getHostAddress();
823+
if (ia instanceof Inet4Address) {
824+
// we prefer IPv4 address
825+
ip = ia.getHostAddress();
826+
break;
827+
}
828+
}
823829
}
824830
}
825831
} catch (SocketException e) {

core/src/main/java/org/zstack/core/cloudbus/CloudBus.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.springframework.http.HttpEntity;
44
import org.zstack.header.Component;
55
import org.zstack.header.Service;
6+
import org.zstack.header.core.FutureCompletion;
67
import org.zstack.header.errorcode.ErrorCode;
78
import org.zstack.header.exception.CloudConfigureFailException;
89
import org.zstack.header.message.*;
@@ -12,14 +13,18 @@
1213
import java.util.function.Consumer;
1314

1415
public interface CloudBus extends Component {
15-
void send(Message msg);
16-
16+
/**
17+
* submit a message sending task into the queue.
18+
* @return {@link FutureCompletion} which can be used to check whether the message was successfully sent or not.
19+
*/
20+
FutureCompletion send(Message msg);
21+
1722
<T extends Message> void send(List<T> msgs);
1823

1924
@Deprecated
2025
void send(APIMessage msg, Consumer<APIEvent> consumer);
2126

22-
void send(NeedReplyMessage msg, CloudBusCallBack callback);
27+
FutureCompletion send(NeedReplyMessage msg, CloudBusCallBack callback);
2328

2429
@Deprecated
2530
void send(List<? extends NeedReplyMessage> msgs, CloudBusListCallBack callBack);

core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.zstack.core.timeout.ApiTimeoutManager;
2020
import org.zstack.header.Constants;
2121
import org.zstack.header.Service;
22+
import org.zstack.header.core.FutureCompletion;
2223
import org.zstack.header.log.NoLogging;
2324
import org.zstack.header.apimediator.APIIsReadyToGoMsg;
2425
import org.zstack.header.apimediator.APIIsReadyToGoReply;
@@ -106,6 +107,11 @@ public class CloudBusImpl2 implements CloudBus, CloudBusIN, ManagementNodeChange
106107
private final String AMQP_PROPERTY_HEADER__COMPRESSED = "compressed";
107108

108109
private String SERVICE_ID = makeLocalServiceId("cloudbus");
110+
public static final FutureCompletion SEND_CONFIRMED = new FutureCompletion(null);
111+
112+
static {
113+
SEND_CONFIRMED.success();
114+
}
109115

110116
public void setDEFAULT_MESSAGE_TIMEOUT(long timeout) {
111117
this.DEFAULT_MESSAGE_TIMEOUT = timeout;
@@ -1323,8 +1329,9 @@ private void send(Message msg, Boolean noNeedReply) {
13231329
}
13241330

13251331
@Override
1326-
public void send(Message msg) {
1332+
public FutureCompletion send(Message msg) {
13271333
send(msg, true);
1334+
return SEND_CONFIRMED;
13281335
}
13291336

13301337
@Override
@@ -1354,7 +1361,7 @@ private void evaluateMessageTimeout(NeedReplyMessage msg) {
13541361
}
13551362

13561363
@Override
1357-
public void send(final NeedReplyMessage msg, final CloudBusCallBack callback) {
1364+
public FutureCompletion send(final NeedReplyMessage msg, final CloudBusCallBack callback) {
13581365
evaluateMessageTimeout(msg);
13591366

13601367
Envelope e = new Envelope() {
@@ -1405,6 +1412,7 @@ List<Message> getRequests() {
14051412
envelopes.put(msg.getId(), e);
14061413

14071414
send(msg, false);
1415+
return SEND_CONFIRMED;
14081416
}
14091417

14101418
private MessageReply createTimeoutReply(NeedReplyMessage m) {

core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import static org.zstack.core.Platform.*;
5151
import static org.zstack.core.cloudbus.CloudBusGlobalProperty.SYNC_CALL_TIMEOUT;
52+
import static org.zstack.header.errorcode.SysErrors.CLOUD_BUS_SEND_ERROR;
5253
import static org.zstack.utils.BeanUtils.getProperty;
5354
import static org.zstack.utils.BeanUtils.setProperty;
5455

@@ -101,6 +102,7 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN {
101102
private final static TimeoutRestTemplate http = RESTFacade.createRestTemplate(CoreGlobalProperty.REST_FACADE_READ_TIMEOUT, CoreGlobalProperty.REST_FACADE_CONNECT_TIMEOUT);
102103

103104
public static final String HTTP_BASE_URL = "/cloudbus";
105+
public static final FutureCompletion SEND_CONFIRMED = new FutureCompletion(null);
104106

105107
{
106108
if (CloudBusGlobalProperty.MESSAGE_LOG != null) {
@@ -114,6 +116,8 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN {
114116
CloudBusGlobalProperty.HTTP_CONTEXT_PATH = "";
115117
CloudBusGlobalProperty.HTTP_PORT = 8989;
116118
}
119+
120+
SEND_CONFIRMED.success();
117121
}
118122

119123
public static String getManagementNodeUUIDFromServiceID(String serviceID) {
@@ -125,6 +129,12 @@ public static String getManagementNodeUUIDFromServiceID(String serviceID) {
125129
return ss[0];
126130
}
127131

132+
private FutureCompletion sendFail(ErrorCode errorCode) {
133+
FutureCompletion c = new FutureCompletion(null);
134+
c.fail(errorCode);
135+
return c;
136+
}
137+
128138
private abstract class Envelope {
129139
long startTime;
130140

@@ -250,8 +260,8 @@ public void deActiveService(String id) {
250260
}
251261

252262
@Override
253-
public void send(Message msg) {
254-
send(msg, true);
263+
public FutureCompletion send(Message msg) {
264+
return send(msg, true);
255265
}
256266

257267
@Override
@@ -291,11 +301,11 @@ private MessageReply createTimeoutReply(NeedReplyMessage m) {
291301
}
292302

293303
@Override
294-
public void send(NeedReplyMessage msg, CloudBusCallBack callback) {
304+
public FutureCompletion send(NeedReplyMessage msg, CloudBusCallBack callback) {
295305
evaluateMessageTimeout(msg);
296306
if (msg.getTimeout() <= 1) {
297307
callback.run(createTimeoutReply(msg));
298-
return;
308+
return SEND_CONFIRMED;
299309
}
300310

301311
Envelope e = new Envelope() {
@@ -346,7 +356,7 @@ public void timeout() {
346356

347357
envelopes.put(msg.getId(), e);
348358
msgExts.forEach(m -> m.afterAddEnvelopes(msg.getId()));
349-
send(msg, false);
359+
return send(msg, false);
350360
}
351361

352362
@Override
@@ -454,7 +464,7 @@ public void reply(Message request, MessageReply reply) {
454464
callReplyPreSendingExtensions(reply, (NeedReplyMessage) request);
455465
} catch (Exception e) {
456466
logger.error("failed to call pre-sending reply extension:", e);
457-
reply.setError(operr(e.getMessage()));
467+
reply.setError(err(CLOUD_BUS_SEND_ERROR, e.getMessage()));
458468
}
459469
}
460470

@@ -526,29 +536,36 @@ public MessageSender(Message msg) {
526536
localSend = !CloudBusGlobalProperty.HTTP_ALWAYS && managementNodeId.equals(Platform.getManagementServerId());
527537
}
528538

529-
void send() {
539+
FutureCompletion send() {
530540
try {
531-
doSend();
541+
return doSend();
532542
} catch (Throwable th) {
533-
replyErrorIfNeeded(operr(th.getMessage()));
543+
ErrorCode err = err(CLOUD_BUS_SEND_ERROR, th.getMessage());
544+
replyErrorIfNeeded(err);
545+
546+
FutureCompletion c = new FutureCompletion(null);
547+
c.fail(err);
548+
return c;
534549
}
535550
}
536551

537-
private void doSend() {
552+
private FutureCompletion doSend() {
538553
if (msg instanceof Event) {
539554
eventSend();
540-
return;
555+
return SEND_CONFIRMED;
541556
}
542557

543558
if (localSend) {
544559
localSend();
560+
return SEND_CONFIRMED;
545561
} else {
546-
httpSend();
562+
return httpSend();
547563
}
548564
}
549565

550-
private void httpSendInQueue(String ip) {
551-
thdf.chainSubmit(new ChainTask(null) {
566+
private FutureCompletion httpSendInQueue(String ip) {
567+
FutureCompletion sendCompletion = new FutureCompletion(null);
568+
thdf.chainSubmit(new ChainTask(sendCompletion) {
552569
@Override
553570
public String getSyncSignature() {
554571
return "http-send-in-queue";
@@ -557,6 +574,7 @@ public String getSyncSignature() {
557574
@Override
558575
public void run(SyncTaskChain chain) {
559576
httpSend(ip);
577+
sendCompletion.success();
560578
chain.next();
561579
}
562580

@@ -570,25 +588,30 @@ public String getName() {
570588
return getSyncSignature();
571589
}
572590
});
591+
return sendCompletion;
573592
}
574593

575-
private void httpSend() {
594+
private FutureCompletion httpSend() {
576595
buildSchema(msg);
596+
String ip;
577597
try {
578-
String ip = destMaker.getNodeInfo(managementNodeId).getNodeIP();
598+
ip = destMaker.getNodeInfo(managementNodeId).getNodeIP();
579599
httpSendInQueue(ip);
580600
} catch (ManagementNodeNotFoundException e) {
581-
if (msg instanceof MessageReply) {
582-
if (!deadMessageManager.handleManagementNodeNotFoundError(managementNodeId, msg, () -> {
583-
String ip = destMaker.getNodeInfo(managementNodeId).getNodeIP();
584-
httpSendInQueue(ip);
585-
})) {
586-
throw e;
587-
}
601+
boolean errorHandled = msg instanceof MessageReply &&
602+
deadMessageManager.handleManagementNodeNotFoundError(managementNodeId, msg, () -> {
603+
String otherIp = destMaker.getNodeInfo(managementNodeId).getNodeIP();
604+
logger.warn(String.format("resend the message[id:%s] to node[ip:%s]", msg.getId(), otherIp));
605+
httpSendInQueue(otherIp);
606+
});
607+
if (errorHandled) {
608+
return SEND_CONFIRMED;
588609
} else {
589610
throw e;
590611
}
591612
}
613+
614+
return httpSendInQueue(ip);
592615
}
593616

594617
private void httpSend(String ip) {
@@ -612,12 +635,12 @@ protected ResponseEntity<String> call() {
612635
}.run();
613636

614637
if (!rsp.getStatusCode().is2xxSuccessful()) {
615-
replyErrorIfNeeded(operr("HTTP ERROR, status code: %s, body: %s", rsp.getStatusCode(), rsp.getBody()));
638+
replyErrorIfNeeded(err(CLOUD_BUS_SEND_ERROR, "HTTP ERROR, status code: %s, body: %s", rsp.getStatusCode(), rsp.getBody()));
616639
}
617640
} catch (OperationFailureException e) {
618641
replyErrorIfNeeded(e.getErrorCode());
619642
} catch (Throwable e) {
620-
replyErrorIfNeeded(operr(e.getMessage()));
643+
replyErrorIfNeeded(err(CLOUD_BUS_SEND_ERROR, e.getMessage()));
621644
}
622645
}
623646

@@ -1194,7 +1217,7 @@ private void evalThreadContextToMessage(Message msg) {
11941217
}
11951218
}
11961219

1197-
private void doSendAndCallExtensions(Message msg) {
1220+
private FutureCompletion doSendAndCallExtensions(Message msg) {
11981221
// for unit test finding invocation chain
11991222
MessageCommandRecorder.record(msg.getClass());
12001223

@@ -1209,20 +1232,20 @@ private void doSendAndCallExtensions(Message msg) {
12091232
interceptor.beforeSendMessage(msg);
12101233
}
12111234

1212-
doSend(msg);
1235+
return doSend(msg);
12131236
}
12141237

1215-
private void doSend(Message msg) {
1238+
private FutureCompletion doSend(Message msg) {
12161239
evalThreadContextToMessage(msg);
12171240

12181241
if (logger.isTraceEnabled() && islogMessage(msg)) {
12191242
logger.trace(String.format("[msg send]: %s", dumpMessage(msg)));
12201243
}
12211244

1222-
new MessageSender(msg).send();
1245+
return new MessageSender(msg).send();
12231246
}
12241247

1225-
private void send(Message msg, Boolean noNeedReply) {
1248+
private FutureCompletion send(Message msg, Boolean noNeedReply) {
12261249
if (msg.getServiceId() == null) {
12271250
throw new IllegalArgumentException(String.format("service id cannot be null: %s", msg.getClass().getName()));
12281251
}
@@ -1238,7 +1261,7 @@ private void send(Message msg, Boolean noNeedReply) {
12381261
msg.putHeaderEntry(NO_NEED_REPLY_MSG, noNeedReply.toString());
12391262
}
12401263

1241-
doSendAndCallExtensions(msg);
1264+
return doSendAndCallExtensions(msg);
12421265
}
12431266

12441267
private void restoreFromSchema(Message msg, Map raw) throws ClassNotFoundException {

0 commit comments

Comments
 (0)