Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 102 additions & 22 deletions src/main/java/org/prebid/server/auction/ExchangeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.prebid.server.util.BidderUtil;
import org.prebid.server.util.HttpUtil;
import org.prebid.server.util.ListUtil;
import org.prebid.server.util.MapUtil;
import org.prebid.server.util.PbsUtil;
import org.prebid.server.util.StreamUtil;

Expand All @@ -114,6 +115,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class ExchangeService {
Expand Down Expand Up @@ -257,17 +260,7 @@ private Future<AuctionContext> runAuction(AuctionContext receivedContext) {
.map(receivedContext::with))

.map(context -> updateRequestMetric(context, uidsCookie, aliases, account, requestTypeMetric))
.compose(context -> Future.join(
context.getAuctionParticipations().stream()
.map(auctionParticipation -> processAndRequestBids(
context,
auctionParticipation.getBidderRequest(),
timeout,
aliases)
.map(auctionParticipation::with))
.toList())
// send all the requests to the bidders and gathers results
.map(CompositeFuture::<AuctionParticipation>list)
.compose(context -> processAndRequestBids(context, timeout, aliases)
.map(storedResponseProcessor::updateStoredBidResponse)
.map(auctionParticipations -> storedResponseProcessor.mergeWithBidderResponses(
auctionParticipations,
Expand Down Expand Up @@ -457,13 +450,10 @@ private void makeBidRejectionTrackers(Map<String, BidRejectionTracker> bidReject
bidderToImpIds.computeIfAbsent(bidder, bidderName -> new HashSet<>()).add(impId));
}

bidderToImpIds.forEach((bidder, impIds) -> {
if (bidRejectionTrackers.containsKey(bidder)) {
bidRejectionTrackers.put(bidder, new BidRejectionTracker(bidRejectionTrackers.get(bidder), impIds));
} else {
bidRejectionTrackers.put(bidder, new BidRejectionTracker(bidder, impIds, logSamplingRate));
}
});
bidderToImpIds.forEach((bidder, impIds) -> bidRejectionTrackers.put(bidder,
bidRejectionTrackers.containsKey(bidder)
? BidRejectionTracker.withAdditionalImpIds(bidRejectionTrackers.get(bidder), impIds)
: new BidRejectionTracker(bidder, impIds, logSamplingRate)));
}

private static StoredResponseResult populateStoredResponse(StoredResponseResult storedResponseResult,
Expand Down Expand Up @@ -1132,10 +1122,100 @@ private AuctionContext updateRequestMetric(AuctionContext context,
return context;
}

private Future<BidderResponse> processAndRequestBids(AuctionContext auctionContext,
BidderRequest bidderRequest,
Timeout timeout,
BidderAliases aliases) {
private Future<List<AuctionParticipation>> processAndRequestBids(AuctionContext auctionContext,
Timeout timeout,
BidderAliases aliases) {

// when we ignore Futures from uncompleted secondary bidders,
// they continue running in the background and can write errors to BidRejectionTrackers.
// To prevent any issues, we provide them with a copy of BidRejectionTracker
// and only merge this copy back if secondary bidder has completed in time
final Map<String, BidRejectionTracker> copiedBidRejectionTrackers =
MapUtil.mapValues(auctionContext.getBidRejectionTrackers(), BidRejectionTracker::copyOf);

final Map<String, AuctionParticipation> bidderToAuctionParticipation = auctionContext
.getAuctionParticipations().stream().collect(Collectors.toMap(
AuctionParticipation::getBidder,
Function.identity()));

final Map<String, Future<BidderResponse>> bidderToFutureResponse = MapUtil.mapValues(
bidderToAuctionParticipation,
auctionParticipation -> processAndRequestBidsForSingleBidder(
auctionContext.with(copiedBidRejectionTrackers),
auctionParticipation.getBidderRequest(),
timeout,
aliases));

return buildPrimaryBiddersCompositeFuture(auctionContext, bidderToFutureResponse).transform(ignored -> {
mergeBidRejectionTrackers(auctionContext, copiedBidRejectionTrackers, bidderToFutureResponse);

return Future.succeededFuture(bidderToFutureResponse.entrySet().stream()
.map(MapUtil.mapEntryValueMapper((bidder, futureResponse) ->
futureResponse.isComplete() ? futureResponse : recoverUncompletedSecondaryBidder(bidder)))
.map(MapUtil.mapEntryMapper((bidder, futureResponse) -> futureResponse.map(bidderResponse ->
bidderToAuctionParticipation.get(bidder).with(bidderResponse))))
.map(Future::result)
.filter(Objects::nonNull)
.toList());
});
}

private Future<BidderResponse> recoverUncompletedSecondaryBidder(String bidderName) {
final BidderSeatBid bidderSeatBid = BidderSeatBid.builder()
.warnings(Collections.singletonList(
BidderError.of("secondary bidder timed out, auction proceeded", BidderError.Type.timeout)))
.build();

return Future.succeededFuture(BidderResponse.of(bidderName, bidderSeatBid, 0));
}

private CompositeFuture buildPrimaryBiddersCompositeFuture(
AuctionContext auctionContext,
Map<String, Future<BidderResponse>> bidderToFutureResponse) {

final Set<String> secondaryBidders = Optional.of(auctionContext)
.map(AuctionContext::getAccount)
.map(Account::getAuction)
.map(AccountAuctionConfig::getSecondaryBidders)
.orElse(Collections.emptySet());

final List<Future<BidderResponse>> primaryBiddersFutureResponses = bidderToFutureResponse.keySet().stream()
.filter(Predicate.not(secondaryBidders::contains))
.map(bidderToFutureResponse::get)
.toList();

return Future.join(CollectionUtils.isNotEmpty(primaryBiddersFutureResponses)
? primaryBiddersFutureResponses
: bidderToFutureResponse.values().stream().toList());
}

private void mergeBidRejectionTrackers(AuctionContext auctionContext,
Map<String, BidRejectionTracker> newBidRejectionTrackers,
Map<String, Future<BidderResponse>> bidderToFutureResponse) {

final Map<String, BidRejectionTracker> mergedBidRejectionTrackers = newBidRejectionTrackers.keySet().stream()
.collect(Collectors.toMap(Function.identity(), bidder -> mergeBidRejectionTrackersForSingleBidder(
bidderToFutureResponse.get(bidder),
auctionContext.getBidRejectionTrackers().get(bidder),
newBidRejectionTrackers.get(bidder))));

auctionContext.getBidRejectionTrackers().clear();
auctionContext.getBidRejectionTrackers().putAll(mergedBidRejectionTrackers);
}

private BidRejectionTracker mergeBidRejectionTrackersForSingleBidder(Future<BidderResponse> futureResponse,
BidRejectionTracker oldBidRejectionTracker,
BidRejectionTracker newBidRejectionTracker) {

return futureResponse == null ? oldBidRejectionTracker : futureResponse.isComplete()
? newBidRejectionTracker
: oldBidRejectionTracker.rejectAll(BidRejectionReason.ERROR_TIMED_OUT);
}

private Future<BidderResponse> processAndRequestBidsForSingleBidder(AuctionContext auctionContext,
BidderRequest bidderRequest,
Timeout timeout,
BidderAliases aliases) {

return bidderRequestPostProcessor.process(bidderRequest, aliases, auctionContext)
.compose(result -> invokeHooksAndRequestBids(auctionContext, result.getValue(), timeout, aliases)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ private List<SeatBid> resolveSeatBids(StoredResponse storedResponse,
Map<String, String> idToStoredResponses,
String impId) {

if (storedResponse instanceof StoredResponse.StoredResponseObject storedResponseObject) {
return Collections.singletonList(storedResponseObject.seatBid());
if (storedResponse instanceof StoredResponse.StoredResponseObject(SeatBid seatBid)) {
return Collections.singletonList(seatBid);
}

final String storedResponseId = ((StoredResponse.StoredResponseId) storedResponse).id();
Expand Down Expand Up @@ -313,7 +313,7 @@ private static AuctionParticipation updateStoredBidResponse(AuctionParticipation
final BidRequest bidRequest = bidderRequest.getBidRequest();

final List<Imp> imps = bidRequest.getImp();
// Аor now, Stored Bid Response works only for bid requests with single imp
// For now, Stored Bid Response works only for bid requests with single imp
if (imps.size() > 1 || StringUtils.isEmpty(bidderRequest.getStoredResponse())) {
return auctionParticipation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public AuctionContext with(BidResponse bidResponse) {
return this.toBuilder().bidResponse(bidResponse).build();
}

public AuctionContext with(Map<String, BidRejectionTracker> bidRejectionTrackers) {
return this.toBuilder().bidRejectionTrackers(bidRejectionTrackers).build();
}

public AuctionContext with(List<AuctionParticipation> auctionParticipations) {
return this.toBuilder().auctionParticipations(auctionParticipations).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,49 @@ public BidRejectionTracker(String bidder, Set<String> involvedImpIds, double log
rejections = new HashMap<>();
}

public BidRejectionTracker(BidRejectionTracker anotherTracker, Set<String> additionalImpIds) {
this.bidder = anotherTracker.bidder;
this.logSamplingRate = anotherTracker.logSamplingRate;
this.involvedImpIds = new HashSet<>(anotherTracker.involvedImpIds);
this.involvedImpIds.addAll(additionalImpIds);

this.succeededBidsIds = new HashMap<>(anotherTracker.succeededBidsIds);
this.rejections = new HashMap<>(anotherTracker.rejections);
private BidRejectionTracker(
String bidder,
Set<String> involvedImpIds,
Map<String, Set<String>> succeededBidsIds,
Map<String, List<Rejection>> rejections,
double logSamplingRate
) {
this.bidder = bidder;
this.involvedImpIds = new HashSet<>(involvedImpIds);
this.logSamplingRate = logSamplingRate;

this.succeededBidsIds = MapUtil.mapValues(succeededBidsIds, v -> new HashSet<>(v));
this.rejections = MapUtil.mapValues(rejections, ArrayList::new);
}

public void succeed(Collection<BidderBid> bids) {
public static BidRejectionTracker copyOf(BidRejectionTracker anotherTracker) {
return new BidRejectionTracker(
anotherTracker.bidder,
anotherTracker.involvedImpIds,
anotherTracker.succeededBidsIds,
anotherTracker.rejections,
anotherTracker.logSamplingRate
);
}

public static BidRejectionTracker withAdditionalImpIds(
BidRejectionTracker anotherTracker,
Set<String> additionalImpIds
) {
final BidRejectionTracker newTracker = copyOf(anotherTracker);
newTracker.involvedImpIds.addAll(additionalImpIds);
return newTracker;
}

public BidRejectionTracker succeed(Collection<BidderBid> bids) {
bids.stream()
.map(BidderBid::getBid)
.filter(Objects::nonNull)
.forEach(this::succeed);
return this;
}

private void succeed(Bid bid) {
private BidRejectionTracker succeed(Bid bid) {
final String bidId = bid.getId();
final String impId = bid.getImpid();
if (involvedImpIds.contains(impId)) {
Expand All @@ -73,21 +98,23 @@ private void succeed(Bid bid) {
logSamplingRate);
}
}
return this;
}

public void restoreFromRejection(Collection<BidderBid> bids) {
succeed(bids);
}

public void reject(Collection<Rejection> rejections) {
public BidRejectionTracker reject(Collection<Rejection> rejections) {
rejections.forEach(this::reject);
return this;
}

public void reject(Rejection rejection) {
public BidRejectionTracker reject(Rejection rejection) {
if (rejection instanceof ImpRejection && rejection.reason().getValue() >= 300) {
logger.warn("The rejected imp {} with the code {} equal to or higher than 300 assumes "
+ "that there is a rejected bid that shouldn't be lost");
return;
return this;
}

final String impId = rejection.impId();
Expand All @@ -113,14 +140,18 @@ public void reject(Rejection rejection) {
}
}
}

return this;
}

public void rejectImps(Collection<String> impIds, BidRejectionReason reason) {
public BidRejectionTracker rejectImps(Collection<String> impIds, BidRejectionReason reason) {
impIds.forEach(impId -> reject(ImpRejection.of(impId, reason)));
return this;
}

public void rejectAll(BidRejectionReason reason) {
public BidRejectionTracker rejectAll(BidRejectionReason reason) {
involvedImpIds.forEach(impId -> reject(ImpRejection.of(impId, reason)));
return this;
}

public Set<Rejection> getRejected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.prebid.server.spring.config.bidder.model.MediaType;

import java.util.Map;
import java.util.Set;

@Builder(toBuilder = true)
@Value
Expand Down Expand Up @@ -65,4 +66,7 @@ public class AccountAuctionConfig {
Integer impressionLimit;

AccountProfilesConfig profiles;

@JsonProperty("secondarybidders")
Set<String> secondaryBidders;
}
22 changes: 22 additions & 0 deletions src/main/java/org/prebid/server/util/MapUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

public class MapUtil {

Expand All @@ -15,4 +18,23 @@ public static <T, U> Map<T, U> merge(Map<T, U> left, Map<T, U> right) {

return Collections.unmodifiableMap(merged);
}

public static <K, V1, V2> Map<K, V2> mapValues(Map<K, V1> map, Function<V1, V2> transform) {
return mapValues(map, (ignored, value) -> transform.apply(value));
}

public static <K, V1, V2> Map<K, V2> mapValues(Map<K, V1> map, BiFunction<K, V1, V2> transform) {
return map.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> transform.apply(entry.getKey(), entry.getValue())));
}

public static <K, V1, V2> Function<Map.Entry<K, V1>, Map.Entry<K, V2>> mapEntryValueMapper(
BiFunction<K, V1, V2> transform) {

return mapEntryMapper((key, value) -> Map.entry(key, transform.apply(key, value)));
}

public static <K, V, T> Function<Map.Entry<K, V>, T> mapEntryMapper(BiFunction<K, V, T> transform) {
return entry -> transform.apply(entry.getKey(), entry.getValue());
}
}
Loading