File tree Expand file tree Collapse file tree
src/main/java/com/timgroup/statsd Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -92,11 +92,9 @@ public void run() {
9292 @ Override
9393 boolean send (final String message ) {
9494 try {
95- long threadId = Thread .currentThread ().getId ();
96- // modulo reduction alternative to: long shard = threadID % this.lockShardGrain;
97- // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
98- int shard = (int )((threadId * (long )this .lockShardGrain ) >> 32 );
99- int processQueue = (int )((threadId * (long )this .workers ) >> 32 );
95+ int threadId = getThreadId ();
96+ int shard = threadId % lockShardGrain ;
97+ int processQueue = threadId % workers ;
10098
10199 if (!shutdown ) {
102100 messages [shard ].put (message );
Original file line number Diff line number Diff line change @@ -102,11 +102,9 @@ public void run() {
102102 @ Override
103103 boolean send (final String message ) {
104104 if (!shutdown ) {
105- long threadId = Thread .currentThread ().getId ();
106- // modulo reduction alternative to: long shard = threadID % [shard]this.lockShardGrain;
107- // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
108- int shard = (int )((threadId * (long )this .lockShardGrain ) >> 32 );
109- int processQueue = (int )((threadId * (long )this .workers ) >> 32 );
105+ int threadId = getThreadId ();
106+ int shard = threadId % lockShardGrain ;
107+ int processQueue = threadId % workers ;
110108
111109 if (qsize [shard ].get () < qcapacity ) {
112110 messages [shard ].offer (message );
Original file line number Diff line number Diff line change @@ -17,12 +17,21 @@ public abstract class StatsDProcessor implements Runnable {
1717 protected static final String MESSAGE_TOO_LONG = "Message longer than size of sendBuffer" ;
1818 protected static final int WAIT_SLEEP_MS = 10 ; // 10 ms would be a 100HZ slice
1919
20+ // Atomic integer containing the next thread ID to be assigned
21+ private static final AtomicInteger nextId = new AtomicInteger (0 );
22+
2023 protected final StatsDClientErrorHandler handler ;
2124
2225 protected final BufferPool bufferPool ;
2326 protected final BlockingQueue <ByteBuffer > outboundQueue ; // FIFO queue with outbound buffers
2427 protected final ExecutorService executor ;
2528 protected final CountDownLatch endSignal ;
29+ protected static final ThreadLocal <Integer > threadId = new ThreadLocal <Integer >() {
30+ @ Override
31+ protected Integer initialValue () {
32+ return nextId .getAndIncrement ();
33+ }
34+ };
2635
2736 protected final int workers ;
2837 protected final int lockShardGrain ;
@@ -54,6 +63,11 @@ public BlockingQueue<ByteBuffer> getOutboundQueue() {
5463 return this .outboundQueue ;
5564 }
5665
66+ // Returns the current thread's unique ID, assigning it if necessary
67+ public static int getThreadId () {
68+ return threadId .get ().intValue ();
69+ }
70+
5771 @ Override
5872 public abstract void run ();
5973
You can’t perform that action at this time.
0 commit comments