
Unique Multithreading Concepts in Java Programming
Explore advanced multithreading concepts in Java programming such as synchronized, ReentrantLock, volatile, CountDownLatch, FutureTask, and more. Dive into the intricacies of thread management, lock mechanisms, and condition queues to enhance your Java expertise.
Download Presentation

Please find below an Image/Link to download the presentation.
The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author. If you encounter any issues during the download, it is possible that the publisher has removed the file from their server.
You are allowed to download the files provided on this website for personal or commercial use, subject to the condition that they are used lawfully. All files are the property of their respective owners.
The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author.
E N D
Presentation Transcript
1. 1.1.synchronized ReentrantLock 1.2. volatile ThreadLocal 1.3. CountDownLatch FutureTask Semaphore CyclicBarrier 2. 2.1.Executor 2.2.BoundQueue 2.3. 3. 3.1. 3.2.CAS 3.3.ABA AtomicStampedReference 3.4. 3.5.AQS 4. 4.1.DoubleCheck
1. 1.1.synchronized ReentrantLock 1.2. volatile ThreadLocal 1.3. CountDownLatch FutureTask Semaphore CyclicBarrier !
1.1.synchronized ReentrantLock 1.synchronized( ) JVM jdk1.6 jdk1.6 object.markword lockrecord ( thread lock)-> (CAS lockrecored->markword)-> Condition queue
1.1.synchronized ReentrantLock 1.synchronized( ) demo1 -> public synchronized void method() { System.out.println(Thread.currentThread().getId()); } -> class public synchronized static void statisMethod() { System.out.println(Thread.currentThread().getId()); } -> lock private Object lock = new Object(); public void method1() { synchronized (lock) { } }
1.1.synchronized ReentrantLock 1.synchronized( ) demo2 Condition queue public class ConditionQueue implements DemoRun { private boolean isOk = false; @Override public void runTest() throws ExecutionException, InterruptedException, CustomException, BrokenBarrierException { Thread thread1 = new Thread(() -> { synchronized (this) { while (!isOk) { try { System.out.println(String.format("t1:%s wait", Thread.currentThread().getId())); this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("t1:%s ", Thread.currentThread().getId())); } } }); thread1.start(); t1:12 wait t2:13 wait t2:13 t1:12 Thread thread2 = new Thread(() -> { synchronized (this) { while (!isOk) { try { System.out.println(String.format("t2:%s wait", Thread.currentThread().getId())); this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("t2:%s ", Thread.currentThread().getId())); } } }); thread2.start(); Thread.sleep(2000); synchronized (this) { isOk = true; this.notifyAll(); } } }
1.1.synchronizedLock 2.ReentranLock( ) AQS Interruption trylock Condition queue
1.1.synchronizedLock 2.ReentranLock( ) demo1 trylock lock: /** * nonFair fair */ private ReentrantLock lock = new ReentrantLock(true); private void tryLockTest() throws InterruptedException { Thread thread1 = new Thread(() -> { lock.lock(); System.out.println("thread1:" + Thread.currentThread().getId()); try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); thread1:12 main:1 Thread thread2 = new Thread(() -> { try { if (lock.tryLock(5, TimeUnit.SECONDS)) { System.out.println("thread2:" + Thread.currentThread().getId()); try { } catch (Exception ignored) { } finally { lock.unlock(); } } } catch (InterruptedException e) { e.printStackTrace(); } }); thread2.start(); System.out.println("main:" + Thread.currentThread().getId()); Thread.sleep(10000); }
1.1.synchronizedLock 2.ReentranLock( ) demo2 Interruption /** * nonFair fair */ private ReentrantLock lock = new ReentrantLock(true); private void lockInterrupted() { Thread thread1 = new Thread(() -> { lock.lock(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); thread2:java.lang.InterruptedException Thread thread2 = new Thread(() -> { try { lock.lockInterruptibly(); lock.lock(); } catch (InterruptedException e) { System.out.println(String.format("thread2:%s", e)); } }); thread2.start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } thread2.interrupt(); }
1.1.synchronizedLock 2.ReentranLock( ) demo3 ConditionQueue public class ConditionDemo implements DemoRun { private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private class CustomThread extends Thread { public CustomThread(String threadName, Runnable runnable) { super(runnable); this.setName(threadName); } } @Override public void runTest() throws ExecutionException, InterruptedException, CustomException, BrokenBarrierException { for (int i = 0; i < 10; i++) { CustomThread thread1 = new CustomThread(String.valueOf(i), () -> { try { lock.lock(); condition1.await(); System.out.println(String.format("t:%s ", Thread.currentThread().getName())); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); thread1.start(); t:0 t:1 t:2 t:3 t:4 t:5 t:6 t:7 t:8 t:9 t2:0 t2:1 t2:2 t2:3 t2:4 t2:5 t2:6 t2:7 t2:8 t2:9 CustomThread thread2 = new CustomThread(String.valueOf(i), () -> { try { lock.lock(); condition2.await(); System.out.println(String.format("t2:%s ", Thread.currentThread().getName())); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); thread2.start(); } Thread.sleep(3000); lock.lock(); condition1.signalAll(); lock.unlock(); Thread.sleep(3000); lock.lock(); condition2.signalAll(); lock.unlock();
1.2.volatileThreadLocal 1. 2. 3.JMM:Happens-before 1. 2. 3.Volatile 4. 5. 6. 7. 8. 4. Volatile 5.ThreadLocal
1.2.volatileThreadLocal 1. ThreadLocal demo public class ThreadLocalDemo implements DemoRun { private ThreadLocal<Product> threadLocal = new ThreadLocal(); @Override public void runTest() { for (int i = 0; i < 1000; i++) { new Thread(() -> { Product product = new Product(Thread.currentThread().getId()); this.threadLocal.set(product); System.out.println("set.threadId:" + product.getThreadId() + "." + Thread.currentThread().getId()); product = this.threadLocal.get(); System.out.println("get.threadId:" + product.getThreadId() + "." + Thread.currentThread().getId()); }).start(); } } }
1.3. CountDownLatchFutureTaskSemaphoreCyclicBarrier 1.CountDownLatch demo1 public class CountDownLatchDemo implements DemoRun { private CountDownLatch countDownLatch = new CountDownLatch(3); public void runTest() throws InterruptedException { for (int i = 0; i < 3; i++) { Thread thread = new Thread(() -> { 12.begin run. 13.begin run. 14.begin run. 14.end run. 13.end run. 12.end run. 1master thread run. 1master thread end. try { System.out.println(Thread.currentThread().getId() + ".begin run."); Thread.sleep(1000); System.out.println(Thread.currentThread().getId() + ".end run."); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getId() + "master thread run."); System.out.println(Thread.currentThread().getId() + "master thread end."); } }
1.3. CountDownLatchFutureTaskSemaphoreCyclicBarrier 1. FutureTask demo2 Future Interface Runnable Interface Future<T> Interface RunnableFuture<T> FutureTask FutureTask<Product> futureTask = new FutureTask<>(() -> new Product(Thread.currentThread().getId())); futureTask.run(); System.out.println(futureTask.get()); demo.entities.Product@682a0b20 ExecutorService ExecutorService threadPool = Executors.newCachedThreadPool(); Future<Product> future = threadPool.submit(() -> new Product(Thread.currentThread().getId())); System.out.println(future.get()); demo.entities.Product@7ba4f24f
1.3. CountDownLatchFutureTaskSemaphoreCyclicBarrier 1. Semaphore demo3 public class SemaphoreDemo implements DemoRun { Semaphore semaphore = new Semaphore(2); int count = 0; @Override public void runTest() throws ExecutionException, InterruptedException { for (int i = 1; i <= 10; i++) { new Thread(() -> { try { this.semaphore.acquire(); count += 1; System.out.println(Thread.currentThread().getId() + "_" + count); Thread.sleep(3000); this.semaphore.release(); 12_2 13_2 14_3 16_4 15_5 17_6 18_7 19_8 } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } }
1.3. CountDownLatchFutureTaskSemaphoreCyclicBarrier 1. CyclicBarrier demo4 public class CyclicBarrierDemo implements DemoRun { private CyclicBarrier cyclicBarrier = new CyclicBarrier(6); public void runTest() throws InterruptedException { for (int i = 0; i < 6; i++) { int finalI = i; Thread thread = new Thread(() -> { try { System.out.println(Thread.currentThread().getId() + "await."); cyclicBarrier.await(); 12await. 15await. 16await. 17await. 14await. 13await. 12go. 13go. 14go. 15go. 16go. 17go. Thread.sleep(finalI * 1000); System.out.println(Thread.currentThread().getId() + "go."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); thread.start(); } } }
1. 1.1.synchronized ReentrantLock 1.2. volatile ThreadLocal 1.3. CountDownLatch FutureTask Semaphore CyclicBarrier !
2. 2.1.Executor 2.2.BoundQueue 2.2. !
2.1.Executor 1. 1. 2. FIFO LIFO 3. 4. 5. 6.
2.1.Executor 2. Core Pool Size Maximum Pool Size Bound Queue
2.1.Executor 3. ExecutorService newCachedExecutor = Executors.newCachedThreadPool(); ExecutorService singleExecutor = Executors.newSingleThreadExecutor(); ExecutorService workExecutor = Executors.newWorkStealingPool(); ExecutorService fiexdService = Executors.newFixedThreadPool(2); ScheduledExecutorService scheduleExecutor = Executors.newScheduledThreadPool(10);
2.2.BoundQueue 1.BoundQueue ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1)); SynchronousQueue
2.3. 1. ExecutorService fiexdService = Executors.newFixedThreadPool(2); ((ThreadPoolExecutor) fiexdService). setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); AbortPolicy( ) CallerRunsPolicy( ) DiscardPolicy( ) DiscardOldestPolicy( )
2. 2.1.Executor 2.2.BoundQueue 2.3. !
3. 3.1. 3.2.CAS 3.3.ABA AtomicStampedReference 3.4. 3.5.AQS !
3.1. java.util.concurrent.atomic AtomicInteger AtomicLong AtomicReference AtomicBoolean AtomicStampedReference AtomicMarkableReference AtomicReferenceFieldUpdater
3.2.CAS 1. (compare and swap) 2.java unsafe final native sun.misc public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
3.2.CAS 3.2.CAS demo AtomicInteger: public class CASDemo implements DemoRun { private AtomicInteger atomicInteger = new AtomicInteger(); @Override public void runTest() { for (int i = 1; i < 3; i++) { new Thread(() -> { int count = 1; while (!atomicInteger.compareAndSet(0, 1)) { System.out.println( String.format("ThreadId:%s, :%s", Thread.currentThread().getId(), count)); count += 1; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(3000); atomicInteger.compareAndSet(1, 0); System.out.println(Thread.currentThread().getId()); return; } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } } ThreadId:13, :1 ThreadId:13, :2 ThreadId:13, :3 12 13
3.3.ABAAtomicStampedReference Thread 1 Thread 2 Thread 1 A A A C B D D D
3.3.ABAAtomicStampedReference 3.3.ABA AtomicStampedReference demo: public class ABA_AtomicStampedReferenceDemo implements DemoRun { private AtomicInteger atomicInteger = new AtomicInteger(100); private AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100, 0); @Override public void runTest() throws ExecutionException, InterruptedException, CustomException, BrokenBarrierException { /** atomicInteger */ this.atomicInteger.compareAndSet(100, 100); /** atomicStampedReference */ Integer stamp = this.atomicStampedReference.getStamp(); this.atomicStampedReference.compareAndSet(100, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1); Thread thread = new Thread(() -> { boolean isOk = this.atomicInteger.compareAndSet(100, 200); if (isOk) { System.out.println("atomic integer ok ."); } }); thread.start(); Thread thread2 = new Thread(() -> { boolean isOk = this.atomicStampedReference.compareAndSet(101, 200, stamp, stamp + 1); if (isOk) { System.out.println("atomic stamped reference ok."); } else { System.out.println("atomic stamped reference fail."); } }); thread2.start(); } } atomic integer ok . atomic stamped reference fail.
3.4. 1. 1 2 2. 1 3 2 3. 1 3 2
3.5.AQS state Thread private class sync extends AbstractQueuedSynchronizer { @Override protected boolean isHeldExclusively() { return getState() == 1; } @Override protected boolean tryAcquire(int arg) { assert arg == 1; if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { assert arg == 1; if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } }
3.5.AQS AQS public class CustomLockImpl implements demo.concurrentdemo.CustomLock { private volatile sync sync = new sync(); @Override public void lock() { this.sync.acquire(1); } @Override public void unLock() { this.sync.release(1); } @Override public boolean tryLock() { return this.sync.tryAcquire(1); } @Override public boolean tryRelease() { return this.sync.tryRelease(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
3.5.AQS 3.5.AQS demo public class CustomLockTest implements DemoRun { private CustomLock customLock = new CustomLockImpl(); @Override public void runTest() throws ExecutionException, InterruptedException, CustomException, BrokenBarrierException { for (int i = 0; i < 100; i++) { Thread thread = new Thread(() -> { this.customLock.lock(); try { Thread.sleep(3000); System.out.println(String.format("t:%s,sleep.", Thread.currentThread().getId())); } catch (InterruptedException e) { e.printStackTrace(); } finally { this.customLock.unLock(); } }); thread.start(); } } } t:12,sleep. t:13,sleep. t:14,sleep. t:15,sleep. t:16,sleep.
3.5.AQS 3.5.AQS
3.5.AQS 3.5.AQS 1 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 2 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 3 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } http://ifeve.com/introduce-abstractqueuedsynchronizer/
3.5.AQS 3.5.AQS 4 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 3 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 5 static void selfInterrupt() { Thread.currentThread().interrupt(); } http://ifeve.com/introduce-abstractqueuedsynchronizer/
3. 3.1. 3.2.CAS 3.3.ABA AtomicStampedReference 3.4. 3.5.AQS !
4. 4.1.DoubleCheck
4.1.DoubleCheck 1.DoubleCheck demo: public class DoubleCheckedLocking { private static DoubleCheckedLocking resource; public static DoubleCheckedLocking getResource() { if (resource == null) { synchronized (DoubleCheckedLocking.class) { if (resource == null) { resource = new DoubleCheckedLocking(); } } } ? return resource; } }
End Thanks ! http://ifeve.com/ JAVA Doug Lea