介绍
Java线程代码示例
Java创建线程的4种方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package org.example.thread.demo;
public class MyThread extends Thread{ @Override public void run() { System.out.println("MyThread 线程执行..."); }
public static void main(String[] args) { MyThread thread =new MyThread(); thread.start(); System.out.println("主线程执行..."); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package org.example.thread.demo;
public class MyRunnable implements Runnable { @Override public void run() {
System.out.println("MyRunnable 线程执行..."); }
public static void main(String[] args) { MyRunnable myRunnable = new MyRunnable(); Thread thread = new Thread(myRunnable); thread.start(); System.out.println("主线程执行..."); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package org.example.thread.demo;
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask;
public class MyCallable implements Callable<String> {
@Override public String call() throws Exception { System.out.println("MyCallable 线程执行..."); return "result"; }
public static void main(java.lang.String[] args) { MyCallable myCallable = new MyCallable(); FutureTask<String> futureTask = new FutureTask<>(myCallable); Thread thread = new Thread(futureTask); thread.start(); try { System.out.println("futureTask.get() = " + futureTask.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } System.out.println("主线程执行..."); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package org.example.thread.demo;
import java.util.concurrent.*;
public class MyExecutorService {
public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); ExecutorService executor1 = Executors.newSingleThreadExecutor(); ExecutorService executor2 = Executors.newFixedThreadPool(10); ScheduledExecutorService executor3 = Executors.newScheduledThreadPool(10); executor3.schedule(new MyRunnable(),5, TimeUnit.SECONDS);
Future<String> future = executor.submit(new MyCallable());
executor.submit(new MyRunnable()); executor.submit(new MyThread()); executor.shutdown(); System.out.println("关闭线程池");
}
}
|
自定义线程池-推荐使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package org.example.thread.demo;
import java.util.concurrent.*;
public class CustomExecutorService {
private final static int corePoolSize = 8; private final static int maximumPoolSize = 100; private final static long keepAliveTime = 60;
private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
ExecutorService executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS,workQueue); } }
|
ForkJoin 线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| package org.example.thread.demo;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask;
public class MyForkJoin {
public static class CountTask extends RecursiveTask<Integer> {
private static int THRESHOLD = 100000; private int start; private int end;
public CountTask(int start, int end) { this.start = start; this.end = end; }
@Override protected Integer compute() {
int sum = 0; if (end - start <= THRESHOLD) { for (int i = start; i <= end; i++) {
sum += i; } } else { int middle = (end + start) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); leftTask.fork(); rightTask.fork(); Integer right = rightTask.join(); Integer left = leftTask.join(); sum = left + right; } return sum; } }
public static void forkJsonTask(int val) throws Exception { ForkJoinPool pool = ForkJoinPool.commonPool(); ForkJoinTask<Integer> submit = pool.submit(new CountTask(0, val));
System.out.println("forkJsonTask sum = " + submit.join()); System.out.println("-----------"); }
public static void commonTask(int val) throws Exception { int sum = 0; for (int i = 0; i < val; i++) {
sum += i; } System.out.println("commonTask sum = " + sum); }
public static void main(String[] args) throws Exception { long s = System.currentTimeMillis(); int val = 1000000000; forkJsonTask(val);
long e = System.currentTimeMillis(); System.out.println("花费时间:" + (e - s) + "ms");
}
}
|
简单的 Synchronized
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package org.example.thread.demo;
public class MySimpleSynchronized { final static Object object = new Object();
public static class T1 extends Thread { @Override public void run() { synchronized (object) { System.out.println("T1 run..."); try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("T1 run end..."); } System.out.println("T1 exit synchronized..."); } }
public static class T2 extends Thread { @Override public void run() { synchronized (object) { System.out.println("T2 run..."); System.out.println("T2 run end..."); } System.out.println("T1 exit synchronized..."); } }
public static void main(String[] args) { T1 t1 = new T1(); T2 t2 = new T2(); t1.start(); t2.start();
} }
|
简单的 wait notify
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| package org.example.thread.demo;
public class MySimpleWN { final static Object object = new Object();
public static class T1 extends Thread{ @Override public void run() { synchronized(object){ try { System.out.println("T1 wait..."); object.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("T1 run..."); } System.out.println("T1 end..."); } }
public static class T2 extends Thread{ @Override public void run() { synchronized(object){ System.out.println("T2 notify..."); object.notify(); System.out.println("T2 run..."); try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } } System.out.println("T2 end..."); } }
public static void main(String[] args) { T1 t1 = new T1(); T2 t2 = new T2(); t1.start(); t2.start();
} }
|
简单的 ReentrantLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| package org.example.thread.demo;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock;
public class MySimpleReentrantLock { public static ReentrantLock lock = new ReentrantLock();
public static class T1 extends Thread { @Override public void run() { lock.lock(); System.out.println("T1 run..."); try { Thread.sleep(5000); System.out.println("T1 run end..."); } catch (Exception e) { throw new RuntimeException(e); }finally { lock.unlock(); } System.out.println("T1 exit lock..."); } }
public static class T2 extends Thread { @Override public void run() { try { boolean b = lock.tryLock(5, TimeUnit.SECONDS); lock.lock(); System.out.println("b = " + b); System.out.println("T2 run..."); System.out.println("T2 run end..."); }catch (Exception e) { throw new RuntimeException(e); }finally { lock.unlock(); } System.out.println("T2 exit lock..."); } }
public static void main(String[] args) { T1 t1 = new T1(); T2 t2 = new T2(); t1.start(); try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } t2.start();
} }
|
带条件的 ReentrantLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package org.example.thread.demo;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class MyConditionReentrantLock { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition();
public static class T1 extends Thread { @Override public void run() { lock.lock(); System.out.println("T1 run..."); try { System.out.println("T1 await..."); condition.await(); Thread.sleep(2000); System.out.println("T1 run end..."); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { lock.unlock(); } System.out.println("T1 exit lock..."); } }
public static class T2 extends Thread { @Override public void run() { lock.lock(); condition.signal(); System.out.println("T2 run..."); System.out.println("T2 run end..."); lock.unlock(); System.out.println("T2 exit lock..."); } }
public static void main(String[] args) { T1 t1 = new T1(); T2 t2 = new T2(); t1.start(); t2.start();
} }
|
简单的读写锁 ReadWriteLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| package org.example.thread.demo;
import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MySimpleReadWriteLock { public static ReentrantLock lock = new ReentrantLock(); public static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public static Integer value;
public static Lock readLock = readWriteLock.readLock();
public static Lock writeLock = readWriteLock.writeLock();
public void setValue(Lock lock, Integer val) { try { lock.lock(); Thread.sleep(1000); System.out.println("CurrentThread name:" + Thread.currentThread().getName() +" setValue1:" + val); value = val; } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }
public Integer getValue(Lock lock) { try { lock.lock(); System.out.println("CurrentThread name:" + Thread.currentThread().getName() +" getValue:" + value); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } return value; }
public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(10); MySimpleReadWriteLock mySimpleReadWriteLock = new MySimpleReadWriteLock(); long s = System.currentTimeMillis(); int count = 30; for (int i = 0; i < 5; i++) { executorService.submit(new Thread(() -> { Random random = new Random(); int i1 = random.nextInt(); mySimpleReadWriteLock.setValue(writeLock, i1); })); } for (int i = 0; i < count; i++) { executorService.submit(new Thread(() -> { Integer value1 = mySimpleReadWriteLock.getValue(readLock);
})); } executorService.shutdown(); while (!executorService.isTerminated()) { } ; long e = System.currentTimeMillis(); System.out.println("花费时间:" + (e - s) + "ms");
} }
|
简单的倒计时锁 CountDownLatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package org.example.thread.demo;
import java.util.concurrent.CountDownLatch;
public class MySimpleCountDownLatch {
public static CountDownLatch countDownLatch = new CountDownLatch(3);
public static class CheckTask implements Runnable { @Override public void run() { System.out.println("T1 run..."); try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } countDownLatch.countDown(); System.out.println("T1 run end..."); } }
public static void main(String[] args) throws Exception { int count = 3; for (int i = 0; i < count; i++) { new Thread(new CheckTask()).start(); } countDownLatch.await(); System.out.println("全部任务执行完成!");
} }
|
Java多线程实战
基于SpringBoot的多线程调用
功能描述:基于线程池,异步调用调用外部数据接口,完成数据封装,然后将数据返回调用。
初始化项目,导入web依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
|
新增@EnableAsync注解,启动异步调用功能
1 2 3 4 5 6 7 8 9
| @EnableAsync @SpringBootApplication public class ThreadDemoApplication {
public static void main(String[] args) { SpringApplication.run(ThreadDemoApplication.class, args); }
}
|
新增Config类,自定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Configuration public class ThreadPoolConfig {
@Bean("threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(200); executor.setThreadNamePrefix("threadPoolTaskExecutor-"); executor.initialize(); return executor; }
@Bean("threadPoolTaskExecutor2") public ThreadPoolTaskExecutor threadPoolTaskExecutor2() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(200); executor.setThreadNamePrefix("threadPoolTaskExecutor2-"); executor.initialize(); return executor; }
}
|
新建mock控制层:MockController ,模拟外部接口数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.example.thread.controller;
import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@Slf4j @RequestMapping("mock") @RestController public class MockController {
@RequestMapping("/getData") public String getData() { try { Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "data1 "; }
@RequestMapping("/getData2") public String getData2() { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "data2 "; }
@RequestMapping("/getData3") public String getData3() { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "data3 "; }
}
|
新建接口调用服务层,在方法上@Async注解,基于RestTemplate进行外部接口调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| package com.example.thread.service;
import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate;
import java.util.concurrent.CompletableFuture;
@Service public class MyThreadService {
public static RestTemplate restTemplate; static { restTemplate = new RestTemplate(); }
@Async("threadPoolTaskExecutor") public void printHello(){ try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread()+":hello"); }
@Async("threadPoolTaskExecutor2") public void printHello2(){ try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread()+":hello"); }
@Async("threadPoolTaskExecutor2") public CompletableFuture<String> getData1(){ String result = restTemplate.getForObject("http://localhost:8080/mock/getData", String.class); return CompletableFuture.completedFuture(result); }
@Async("threadPoolTaskExecutor2") public CompletableFuture<String> getData2(){ String result = restTemplate.getForObject("http://localhost:8080/mock/getData2", String.class); return CompletableFuture.completedFuture(result); }
@Async("threadPoolTaskExecutor2") public CompletableFuture<String> getData3(){ String result = restTemplate.getForObject("http://localhost:8080/mock/getData3", String.class); return CompletableFuture.completedFuture(result); } }
|
新建示例控制层,使用/test2进行接口调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| @Slf4j @RequestMapping("demo") @RestController public class DemoController {
@Autowired MyThreadService myThreadService;
@Autowired ThreadPoolTaskExecutor threadPoolTaskExecutor;
@RequestMapping("/test") public String test() { for (int i = 0; i < 100; i++) { myThreadService.printHello(); } System.out.println("test end"); return "success"; }
@RequestMapping("/test2") public String test2() { CompletableFuture<String> future1 = myThreadService.getData1(); CompletableFuture<String> future2 = myThreadService.getData2(); CompletableFuture<String> future3 = myThreadService.getData3(); CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3); try { allFutures.get(10, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new RuntimeException(e); } if (future1.isCompletedExceptionally() || future2.isCompletedExceptionally() || future3.isCompletedExceptionally()) { throw new RuntimeException("One or more tasks failed."); }
String result = future1.join() + future2.join() + future3.join(); System.out.println(result); return result; } }
|
调用结果:
使用多线程异步调用,缩短了等待时间。分析:接口串形调用需要花费3秒+,使用线程池后,等待时间仅仅依赖于最慢的接口耗时,该项目中等待时间为1秒+,