diff --git a/pom.xml b/pom.xml index 16a7262..97e779c 100644 --- a/pom.xml +++ b/pom.xml @@ -44,7 +44,6 @@ https://github.com/Commonjava/weft UTF-8 false - 1.9.2 5.1.7.Final 3.0.0 2.0.1 @@ -60,20 +59,10 @@ pom import - - org.commonjava.util - o11yphant-metrics-api - ${o11yphantVersion} - - - - org.commonjava.util - o11yphant-metrics-api - jakarta.annotation jakarta.annotation-api diff --git a/src/main/java/org/commonjava/cdi/util/weft/PoolWeftExecutorService.java b/src/main/java/org/commonjava/cdi/util/weft/PoolWeftExecutorService.java index b7f537c..535d6ab 100644 --- a/src/main/java/org/commonjava/cdi/util/weft/PoolWeftExecutorService.java +++ b/src/main/java/org/commonjava/cdi/util/weft/PoolWeftExecutorService.java @@ -16,8 +16,6 @@ package org.commonjava.cdi.util.weft; import org.commonjava.cdi.util.weft.exception.PoolOverloadException; -import org.commonjava.o11yphant.metrics.api.MetricRegistry; -import org.commonjava.o11yphant.metrics.api.Timer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,18 +38,12 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.commonjava.o11yphant.metrics.util.NameUtils.name; - /** * Created by jdcasey on 1/3/17. */ public class PoolWeftExecutorService implements WeftExecutorService, ScheduledExecutorService { - private static final String TIMER = "timer"; - - private static final String METER = "meter"; - private static final int DEFAULT_THREAD_COUNT = 2; private static final float DEFAULT_LOAD_FACTOR = 10f; @@ -68,10 +60,6 @@ public class PoolWeftExecutorService private final boolean loadSensitive; - private final MetricRegistry metricRegistry; - - private final String metricPrefix; - private Set contextualizers; private final AtomicLong load = new AtomicLong( 0L ); @@ -79,21 +67,19 @@ public class PoolWeftExecutorService public PoolWeftExecutorService( String name, ThreadPoolExecutor delegate ) { - this( name, delegate, DEFAULT_THREAD_COUNT, DEFAULT_LOAD_FACTOR, DEFAULT_LOAD_SENSITIVE, null, null, + this( name, delegate, DEFAULT_THREAD_COUNT, DEFAULT_LOAD_FACTOR, DEFAULT_LOAD_SENSITIVE, Collections.emptySet() ); } public PoolWeftExecutorService( final String name, ThreadPoolExecutor delegate, final Integer threadCount, - final Float maxLoadFactor, boolean loadSensitive, - final MetricRegistry metricRegistry, final String metricPrefix ) + final Float maxLoadFactor, boolean loadSensitive ) { - this( name, delegate, threadCount, maxLoadFactor, loadSensitive, metricRegistry, metricPrefix, + this( name, delegate, threadCount, maxLoadFactor, loadSensitive, Collections.emptySet() ); } public PoolWeftExecutorService( final String name, ThreadPoolExecutor delegate, final Integer threadCount, final Float maxLoadFactor, boolean loadSensitive, - final MetricRegistry metricRegistry, final String metricPrefix, Iterable contextualizers ) { this.name = name; @@ -101,8 +87,6 @@ public PoolWeftExecutorService( final String name, ThreadPoolExecutor delegate, this.threadCount = threadCount; this.maxLoadFactor = maxLoadFactor; this.loadSensitive = loadSensitive; - this.metricRegistry = metricRegistry; - this.metricPrefix = metricPrefix; this.contextualizers = new HashSet<>(); contextualizers.forEach( c -> this.contextualizers.add( c ) ); } @@ -175,7 +159,7 @@ private void verifyLoad() throw new PoolOverloadException( getName(), getLoadFactor(), getCurrentLoad(), maxLoadFactor, getThreadCount() ); } } - + @Override public Future submit( Callable callable ) { @@ -313,52 +297,6 @@ private ScheduledFuture asScheduled( Function Callable timeCallable( Callable callable ) - { - return (Callable) ()->{ - if( metricRegistry != null ) - { - metricRegistry.meter( name( metricPrefix, "call", METER ) ).mark(); - Timer.Context context = metricRegistry.timer( name( metricPrefix, "call", TIMER ) ).time(); - try - { - return callable.call(); - } - finally - { - context.stop(); - } - } - else - { - return callable.call(); - } - }; - } - - private Runnable timeRunnable( Runnable runnable ) - { - return ()->{ - if( metricRegistry != null ) - { - metricRegistry.meter( name( metricPrefix, "run", METER ) ).mark(); - Timer.Context context = metricRegistry.timer( name( metricPrefix, "run", TIMER ) ).time(); - try - { - runnable.run(); - } - finally - { - context.stop(); - } - } - else - { - runnable.run(); - } - }; - } - private Collection> wrapAll( Collection> collection ) { ThreadContext ctx = ThreadContext.getContext( false ); @@ -369,7 +307,7 @@ private Collection> wrapAll( Collection> c setContext( extractedContext ); Logger logger = LoggerFactory.getLogger( getClass() ); logger.debug( "Using ThreadContext: {} (saving: {}) in {}", ctx, old, Thread.currentThread().getName() ); - return timeCallable((Callable) () -> { + return (Callable) () -> { try { return callable.call(); @@ -381,7 +319,7 @@ private Collection> wrapAll( Collection> c clearBridgedContext(); load.decrementAndGet(); } - }); + }; } ).collect( Collectors.toList() ); } @@ -390,7 +328,7 @@ private Runnable wrapRunnable( Runnable runnable ) ThreadContext ctx = ThreadContext.getContext( false ); Map extractedContext = extractContext(); load.incrementAndGet(); - return timeRunnable(()->{ + return ()->{ ThreadContext old = ThreadContext.setContext( ctx ); setContext( extractedContext ); Logger logger = LoggerFactory.getLogger( getClass() ); @@ -407,7 +345,7 @@ private Runnable wrapRunnable( Runnable runnable ) clearBridgedContext(); load.decrementAndGet(); } - }); + }; } private Callable wrapCallable( Callable callable ) @@ -415,7 +353,7 @@ private Callable wrapCallable( Callable callable ) ThreadContext ctx = ThreadContext.getContext( false ); Map extractedContext = extractContext(); load.incrementAndGet(); - return timeCallable((Callable) ()->{ + return (Callable) ()->{ ThreadContext old = ThreadContext.setContext( ctx ); setContext( extractedContext ); Logger logger = LoggerFactory.getLogger( getClass() ); @@ -431,7 +369,7 @@ private Callable wrapCallable( Callable callable ) clearBridgedContext(); load.decrementAndGet(); } - }); + }; } private void clearBridgedContext() diff --git a/src/main/java/org/commonjava/cdi/util/weft/WeftPoolBoy.java b/src/main/java/org/commonjava/cdi/util/weft/WeftPoolBoy.java index d2ae0e7..c2f91f7 100644 --- a/src/main/java/org/commonjava/cdi/util/weft/WeftPoolBoy.java +++ b/src/main/java/org/commonjava/cdi/util/weft/WeftPoolBoy.java @@ -16,12 +16,9 @@ package org.commonjava.cdi.util.weft; import org.commonjava.cdi.util.weft.config.WeftConfig; -import org.commonjava.o11yphant.metrics.api.Gauge; -import org.commonjava.o11yphant.metrics.api.MetricRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Instance; @@ -36,11 +33,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.apache.commons.lang3.StringUtils.isBlank; import static org.commonjava.cdi.util.weft.config.DefaultWeftConfig.DEFAULT_MAX_LOAD_FACTOR; import static org.commonjava.cdi.util.weft.config.DefaultWeftConfig.DEFAULT_PRIORITY; import static org.commonjava.cdi.util.weft.config.DefaultWeftConfig.DEFAULT_THREADS; -import static org.commonjava.o11yphant.metrics.util.NameUtils.name; @ApplicationScoped public class WeftPoolBoy @@ -54,29 +49,14 @@ public class WeftPoolBoy @Inject private WeftConfig config; - @Inject - private Instance metricRegistryInstance; - - private MetricRegistry metricRegistry; - @Inject private Instance contextualizers; protected WeftPoolBoy(){} - public WeftPoolBoy( WeftConfig config, MetricRegistry registry ) + public WeftPoolBoy( WeftConfig config ) { this.config = config; - this.metricRegistry = registry; - } - - @PostConstruct - public void init() - { - if ( !metricRegistryInstance.isUnsatisfied() ) - { - this.metricRegistry = metricRegistryInstance.get(); - } } public WeftExecutorService getPool( final String key ) @@ -140,7 +120,7 @@ public synchronized WeftExecutorService getPool( final ExecutorConfig ec, final { int threadCount = ec.threads(); String name = ec.named(); - if ( isBlank( name ) ) + if ( name == null || name.trim().isEmpty() ) { name = DUMMY_NAME; } @@ -215,34 +195,18 @@ else if ( threadCount > 0 ) svc = (ThreadPoolExecutor) Executors.newCachedThreadPool( fac ); } - String metricPrefix = name( config.getNodePrefix(), "weft.ThreadPoolExecutor", name ); - - service = new PoolWeftExecutorService( name, svc, threadCount, maxLoadFactor, loadSensitive, metricRegistry, - metricPrefix, contextualizers ); + service = new PoolWeftExecutorService( name, svc, threadCount, maxLoadFactor, loadSensitive, + contextualizers ); // TODO: Wrapper ThreadPoolExecutor that wraps Runnables to store/copy MDC when it gets created/started. addPool( service ); - registerMetrics( metricPrefix, service ); } return service; } - private void registerMetrics( String prefix, WeftExecutorService pool ) - { - if ( metricRegistry != null ) - { - metricRegistry.register( name( prefix, "corePoolSize" ), (Gauge) () -> pool.getCorePoolSize() ); - metricRegistry.register( name( prefix, "activeThreads" ), (Gauge) () -> pool.getActiveCount() ); - metricRegistry.register( name( prefix, "loadFactor" ), (Gauge) () -> pool.getLoadFactor() ); - metricRegistry.register( name( prefix, "currentLoad" ), (Gauge) () -> pool.getCurrentLoad() ); - - metricRegistry.registerHealthCheck( name( prefix, pool.getName() ), new WeftPoolHealthCheck( pool ) ); - } - } - public Map getPools() { Map result = new HashMap<>( pools ); diff --git a/src/main/java/org/commonjava/cdi/util/weft/WeftPoolHealthCheck.java b/src/main/java/org/commonjava/cdi/util/weft/WeftPoolHealthCheck.java deleted file mode 100644 index 4adf3a4..0000000 --- a/src/main/java/org/commonjava/cdi/util/weft/WeftPoolHealthCheck.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Copyright (C) 2013-2022 Red Hat, Inc. (https://github.com/Commonjava/weft) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.commonjava.cdi.util.weft; - -import org.commonjava.o11yphant.metrics.api.healthcheck.HealthCheck; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -public class WeftPoolHealthCheck - implements HealthCheck -{ - private static final String POOL_SIZE = "pool-size"; - - private static final String CURRENT_LOAD = "current-load"; - - private static final String LOAD_FACTOR = "load-factor"; - - private WeftExecutorService pool; - - public WeftPoolHealthCheck( final WeftExecutorService pool ) - { - this.pool = pool; - } - - @Override - public Result check() throws Exception - { - boolean healthy = false; - final String timestamp = new Date().toString(); - final Map details = new HashMap<>(); - if ( pool != null ) - { - healthy = pool.isHealthy(); - details.put( POOL_SIZE, pool.getThreadCount() ); - details.put( CURRENT_LOAD, pool.getCurrentLoad() ); - details.put( LOAD_FACTOR, pool.getLoadFactor() ); - } - - final boolean isHealthy = healthy; - return new Result() - { - @Override - public boolean isHealthy() - { - return isHealthy; - } - - @Override - public String getMessage() - { - return null; - } - - @Override - public Throwable getError() - { - return null; - } - - @Override - public String getTimestamp() - { - return timestamp; - } - - @Override - public Map getDetails() - { - return details; - } - }; - } -}