Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
Expand Down Expand Up @@ -159,6 +160,7 @@

import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
Expand Down Expand Up @@ -616,7 +618,8 @@ public void active(boolean active) {

/**
* @param keys Keys to lock.
* @param timeout Lock timeout.
* @param timeout Transaction timeout.
* @param waitTimeout Lock wait timeout.
* @param tx Transaction.
* @param isRead {@code True} for read operations.
* @param retval Flag to return value.
Expand All @@ -629,6 +632,7 @@ public void active(boolean active) {
public abstract IgniteInternalFuture<Boolean> txLockAsync(
Collection<KeyCacheObject> keys,
long timeout,
long waitTimeout,
IgniteTxLocalEx tx,
boolean isRead,
boolean retval,
Expand Down Expand Up @@ -3106,6 +3110,194 @@ public CacheMetricsImpl metrics0() {
}
}

/** {@inheritDoc} */
@Override public boolean lockTxEntry(CacheEntry<K, V> entry, long waitTimeout) throws IgniteCheckedException {
A.notNull(entry, "entry");

return lockTxEntryAsync(entry, waitTimeout).get();
}

/** {@inheritDoc} */
@Override public boolean lockTxEntries(Collection<CacheEntry<K, V>> entries, long waitTimeout)
throws IgniteCheckedException {
A.notNull(entries, "entries");

return lockTxEntriesAsync(entries, waitTimeout).get();
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockTxEntryAsync(CacheEntry<K, V> entry, long waitTimeout) {
A.notNull(entry, "entry");

return lockTxEntriesAsync(Collections.singleton(entry), waitTimeout);
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockTxEntriesAsync(
Collection<CacheEntry<K, V>> entries,
long waitTimeout
) {
A.notNull(entries, "entries");

GridNearTxLocal tx = tx();

if (tx == null)
return new GridFinishedFuture<>(
new IgniteCheckedException("Failed to acquire transactional lock without transaction."));

if (!tx.pessimistic())
return new GridFinishedFuture<>(
new IgniteCheckedException("Failed to acquire transactional lock in optimistic transaction."));

// Wait for previous per-transaction async operations to finish.
tx.txState().awaitLastFuture();

if (!tx.init())
return new GridFinishedFuture<>(new IgniteTxRollbackCheckedException(
"Failed to acquire transactional lock because transaction has been completed: " + tx));

if (entries.isEmpty())
return new GridFinishedFuture<>(true);

try {
tx.addActiveCache(ctx, false);
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}

Collection<KeyCacheObject> keys = new ArrayList<>(entries.size());
List<IgniteTxEntry> txEntries = new ArrayList<>(entries.size());
List<GridCacheVersion> expVers = new ArrayList<>(entries.size());
Set<IgniteTxKey> txKeys = new HashSet<>(entries.size());

for (CacheEntry<K, V> entry : entries) {
A.notNull(entry, "entry");

KeyCacheObject key = ctx.toCacheKeyObject(entry.getKey());
IgniteTxKey txKey = ctx.txKey(key);

if (!txKeys.add(txKey))
continue;

IgniteTxEntry lockedTxEntry = tx.entry(txKey);

if (lockedTxEntry != null && lockedTxEntry.locked())
continue;

if (!(entry.version() instanceof GridCacheVersion)) {
tx.removeAndUnlockTxEntries(txEntries);

return new GridFinishedFuture<>(new IgniteCheckedException("Failed to acquire transactional lock for entry with " +
"unsupported version type [entry=" + entry + ", version=" + entry.version() + ']'));
}

CacheObject val = ctx.toCacheObject(entry.getValue());
GridCacheEntryEx entryEx = ctx.isColocated() ? ctx.colocated().entryExx(key, tx.topologyVersion(), true) : entryEx(key);

IgniteTxEntry txEntry = tx.addEntry(
READ,
val,
null,
null,
entryEx,
null,
null,
true,
-1L,
-1L,
null,
false,
false,
false,
false,
CU.isNearEnabled(ctx)
);

keys.add(key);
txEntries.add(txEntry);
expVers.add((GridCacheVersion)entry.version());
}

if (keys.isEmpty())
return new GridFinishedFuture<>(true);

// Acquire transactional lock future from concrete cache implementation. Use txLockAsync which
// delegates to cache-specific lockAllAsync implementations for distributed caches.
long timeout = tx.remainingTime();

IgniteInternalFuture<Boolean> lockFut = txLockAsync(keys,
timeout,
waitTimeout == 0 ? timeout : waitTimeout,
tx,
/*isRead*/true,
/*retval*/false,
tx.isolation(),
/*invalidate*/false,
/*createTtl*/0L,
/*accessTtl*/0L);

IgniteInternalFuture<Boolean> res = new GridEmbeddedFuture<>(
lockFut,
(locked, ex) -> {
if (ex != null)
return new GridFinishedFuture<>(ex);

if (!locked) {
tx.removeAndUnlockTxEntries(txEntries);

return new GridFinishedFuture<>(false);
}

try {
for (int i = 0; i < txEntries.size(); i++) {
GridCacheEntryEx cached = txEntries.get(i).cached();
EntryGetResult getRes = cached.innerGetVersioned(
null,
tx,
/*update-metrics*/false,
/*event*/false,
null,
tx.resolveTaskName(),
null,
false,
null);

if (getRes == null || !expVers.get(i).equals(getRes.version())) {
tx.removeAndUnlockTxEntries(txEntries);

return new GridFinishedFuture<>(false);
}
}

return new GridFinishedFuture<>(true);
}
catch (IgniteCheckedException | GridCacheEntryRemovedException e) {
tx.removeAndUnlockTxEntries(txEntries);

return new GridFinishedFuture<>(e);
}
}
);

// Register this future in transaction's async-holder so that subsequent operations
// that call tx.txState().awaitLastFuture() will wait for it.
GridCacheAdapter.FutureHolder holder = tx.txState().lastAsyncFuture();

if (holder != null) {
holder.lock();

try {
holder.saveFuture(res);
}
finally {
holder.unlock();
}
}

return res;
}

/** {@inheritDoc} */
@Override public boolean isLockedByThread(K key) {
A.notNull(key, "key");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,58 @@ public IgniteInternalCache<K, V> delegate() {
}
}

/** {@inheritDoc} */
@Override public boolean lockTxEntry(CacheEntry<K, V> entry, long waitTimeout) throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);

try {
return delegate.lockTxEntry(entry, waitTimeout);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockTxEntryAsync(CacheEntry<K, V> entry, long waitTimeout) {
CacheOperationContext prev = gate.enter(opCtx);

try {
return delegate.lockTxEntryAsync(entry, waitTimeout);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public boolean lockTxEntries(Collection<CacheEntry<K, V>> entries, long waitTimeout)
throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);

try {
return delegate.lockTxEntries(entries, waitTimeout);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockTxEntriesAsync(
Collection<CacheEntry<K, V>> entries,
long waitTimeout
) {
CacheOperationContext prev = gate.enter(opCtx);

try {
return delegate.lockTxEntriesAsync(entries, waitTimeout);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public boolean isLockedByThread(K key) {
CacheOperationContext prev = gate.enter(opCtx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,61 @@ public boolean lock(K key, long timeout)
*/
public boolean isLocked(K key);

/**
* Acquires a transactional lock for the cached object represented by the given entry if the current cached version
* matches the entry version. This method works only in a {@link TransactionConcurrency#PESSIMISTIC} transaction.
*
* @param entry Entry whose key, value and version should be used.
* @param waitTimeout Timeout in milliseconds to wait for lock to be acquired
* ({@code 0} to use the transaction timeout, {@code -1} for immediate failure if
* lock cannot be acquired immediately).
* @return {@code True} if lock was acquired with the same entry version.
* @throws IgniteCheckedException If lock acquisition resulted in an error.
*/
public boolean lockTxEntry(CacheEntry<K, V> entry, long waitTimeout) throws IgniteCheckedException;

/**
* Acquires transactional locks for the cached objects represented by the given entries if all current cached
* versions match the corresponding entry versions. This method works only in a
* {@link TransactionConcurrency#PESSIMISTIC} transaction.
*
* @param entries Entries whose keys, values and versions should be used.
* @param waitTimeout Timeout in milliseconds to wait for locks to be acquired
* ({@code 0} to use the transaction timeout, {@code -1} for immediate failure if
* locks cannot be acquired immediately).
* @return {@code True} if all locks were acquired with the same entry versions.
* @throws IgniteCheckedException If lock acquisition resulted in an error.
*/
public boolean lockTxEntries(Collection<CacheEntry<K, V>> entries, long waitTimeout) throws IgniteCheckedException;

/**
* Asynchronously acquires a transactional lock for the cached object represented by the given entry if the current
* cached version matches the entry version. This method works only in a
* {@link TransactionConcurrency#PESSIMISTIC} transaction.
*
* @param entry Entry whose key, value and version should be used.
* @param waitTimeout Timeout in milliseconds to wait for lock to be acquired
* ({@code 0} to use the transaction timeout, {@code -1} for immediate failure if
* lock cannot be acquired immediately).
* @return Future that resolves to {@code true} if the lock was acquired and the versions matched, or to
* {@code false} otherwise.
*/
public IgniteInternalFuture<Boolean> lockTxEntryAsync(CacheEntry<K, V> entry, long waitTimeout);

/**
* Asynchronously acquires transactional locks for the cached objects represented by the given entries if all
* current cached versions match the corresponding entry versions. This method works only in a
* {@link TransactionConcurrency#PESSIMISTIC} transaction.
*
* @param entries Entries whose keys, values and versions should be used.
* @param waitTimeout Timeout in milliseconds to wait for locks to be acquired
* ({@code 0} to use the transaction timeout, {@code -1} for immediate failure if
* locks cannot be acquired immediately).
* @return Future that resolves to {@code true} if all locks were acquired and all versions matched, or to
* {@code false} otherwise.
*/
public IgniteInternalFuture<Boolean> lockTxEntriesAsync(Collection<CacheEntry<K, V>> entries, long waitTimeout);

/**
* Checks if current thread owns a lock on this key.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcu
@Override public IgniteInternalFuture<Boolean> txLockAsync(
Collection<KeyCacheObject> keys,
long timeout,
long waitTimeout,
IgniteTxLocalEx tx,
boolean isRead,
boolean retval,
Expand All @@ -112,7 +113,7 @@ protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcu
) {
assert tx != null;

return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, createTtl, accessTtl);
return lockAllAsync(keys, timeout, waitTimeout, tx, isInvalidate, isRead, retval, isolation, createTtl, accessTtl);
}

/** {@inheritDoc} */
Expand All @@ -121,6 +122,7 @@ protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcu

// Return value flag is true because we choose to bring values for explicit locks.
return lockAllAsync(ctx.cacheKeysView(keys),
timeout,
timeout,
tx,
false,
Expand All @@ -133,7 +135,8 @@ protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcu

/**
* @param keys Keys to lock.
* @param timeout Timeout.
* @param timeout Transaction timeout.
* @param waitTimeout Lock wait timeout.
* @param tx Transaction
* @param isInvalidate Invalidation flag.
* @param isRead Indicates whether value is read or written.
Expand All @@ -145,6 +148,7 @@ protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcu
*/
protected abstract IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
long waitTimeout,
@Nullable IgniteTxLocalEx tx,
boolean isInvalidate,
boolean isRead,
Expand Down
Loading
Loading