介绍

Java线程代码示例

Java创建线程的4种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.example.thread.demo;

/**
* 线程创建方式:继承Thread
*/
public class MyThread extends Thread{
@Override
public void run() {
System.out.println("MyThread 线程执行...");
}

public static void main(String[] args) {
MyThread thread =new MyThread();
thread.start();
System.out.println("主线程执行...");
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package org.example.thread.demo;

/**
* 线程创建方式:实现 Runnable
*/
public class MyRunnable implements Runnable {
@Override
public void run() {

System.out.println("MyRunnable 线程执行...");
}

public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
Thread thread = new Thread(myRunnable);
thread.start();
System.out.println("主线程执行...");
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package org.example.thread.demo;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
* 线程创建方式:实现 Callable<String>
*/
public class MyCallable implements Callable<String> {


@Override
public String call() throws Exception {
System.out.println("MyCallable 线程执行...");
return "result";
}

public static void main(java.lang.String[] args) {
MyCallable myCallable = new MyCallable();
FutureTask<String> futureTask = new FutureTask<>(myCallable);
Thread thread = new Thread(futureTask);
thread.start();
try {
System.out.println("futureTask.get() = " + futureTask.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("主线程执行...");
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.example.thread.demo;

import java.util.concurrent.*;

/**
* 线程创建方式: 通过线程池 ExecutorService
*/
public class MyExecutorService {


public static void main(String[] args) throws Exception {
//缓存线程池
ExecutorService executor = Executors.newCachedThreadPool();
//单线程池
ExecutorService executor1 = Executors.newSingleThreadExecutor();
//固定大小线程池
ExecutorService executor2 = Executors.newFixedThreadPool(10);
//延迟执行线程池
ScheduledExecutorService executor3 = Executors.newScheduledThreadPool(10);
//延迟执行任务
executor3.schedule(new MyRunnable(),5, TimeUnit.SECONDS);

//提交callable
Future<String> future = executor.submit(new MyCallable());
// System.out.println("future.get() = " + future.get());
//提交runnable
executor.submit(new MyRunnable());
//提交runnable
executor.submit(new MyThread());
executor.shutdown();
System.out.println("关闭线程池");

}


}

自定义线程池-推荐使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package org.example.thread.demo;

import java.util.concurrent.*;

/**
* 线程池定义方式 - 推荐使用
*/
public class CustomExecutorService {

private final static int corePoolSize = 8;
private final static int maximumPoolSize = 100;
private final static long keepAliveTime = 60;

private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);

public static void main(String[] args) {
/**
* int corePoolSize, 要保留在池中的线程数,即使它们处于空闲状态
* int maximumPoolSize, 池中允许的最大线程数
* long keepAliveTime, 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最长时间
* TimeUnit unit, 参数的时间单位
* BlockingQueue<Runnable> workQueue 用于在执行任务之前保存任务的队列。此队列将仅保存由 execute 方法提交的可运行任务
*/
ExecutorService executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS,workQueue);
}
}

ForkJoin 线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package org.example.thread.demo;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
* 分而治之: ForkJoin 线程池
* 适用于cpu密集型的任务
*/
public class MyForkJoin {

public static class CountTask extends RecursiveTask<Integer> {

private static int THRESHOLD = 100000;
private int start;
private int end;

public CountTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
// System.out.println("run...");
int sum = 0;
if (end - start <= THRESHOLD) {
for (int i = start; i <= end; i++) {
// try {
// Thread.sleep(2);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
sum += i;
}
} else {
int middle = (end + start) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
leftTask.fork();
rightTask.fork();
Integer right = rightTask.join();
Integer left = leftTask.join();
sum = left + right;
}
return sum;
}
}

public static void forkJsonTask(int val) throws Exception {
ForkJoinPool pool = ForkJoinPool.commonPool();
ForkJoinTask<Integer> submit = pool.submit(new CountTask(0, val));
// Thread.sleep(10000);
System.out.println("forkJsonTask sum = " + submit.join());
System.out.println("-----------");
}

public static void commonTask(int val) throws Exception {
int sum = 0;
for (int i = 0; i < val; i++) {
// try {
// Thread.sleep(2);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
sum += i;
}
System.out.println("commonTask sum = " + sum);
}

public static void main(String[] args) throws Exception {
long s = System.currentTimeMillis();
int val = 1000000000;
forkJsonTask(val);
// commonTask(val);
long e = System.currentTimeMillis();
System.out.println("花费时间:" + (e - s) + "ms");


}

}

简单的 Synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package org.example.thread.demo;

/**
* 简单的 Synchronized
*/
public class MySimpleSynchronized {
final static Object object = new Object();

public static class T1 extends Thread {
@Override
public void run() {
synchronized (object) {
System.out.println("T1 run...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("T1 run end...");
}
System.out.println("T1 exit synchronized...");
}
}

public static class T2 extends Thread {
@Override
public void run() {
synchronized (object) {
System.out.println("T2 run...");
System.out.println("T2 run end...");
}
System.out.println("T1 exit synchronized...");
}
}

public static void main(String[] args) {
T1 t1 = new T1();
T2 t2 = new T2();
t1.start();
t2.start();

// 输出:
// T1 run...
// T1 run end...
// T1 exit synchronized...
// T2 run...
// T2 run end...
// T1 exit synchronized...
}
}

简单的 wait notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package org.example.thread.demo;

/**
* 简单的 wait notify
*/
public class MySimpleWN {
final static Object object = new Object();

public static class T1 extends Thread{
@Override
public void run() {
synchronized(object){
try {
System.out.println("T1 wait...");
object.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("T1 run...");
}
System.out.println("T1 end...");
}
}

public static class T2 extends Thread{
@Override
public void run() {
synchronized(object){
System.out.println("T2 notify...");
object.notify();
System.out.println("T2 run...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("T2 end...");
}
}

public static void main(String[] args) {
T1 t1 = new T1();
T2 t2 = new T2();
t1.start();
t2.start();
// 输出:
// T1 wait...
// T2 notify...
// T2 run...
// T2 end...
// T1 run...
// T1 end...

}
}

简单的 ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package org.example.thread.demo;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* 简单的 ReentrantLock
*/
public class MySimpleReentrantLock {
//非公平锁
public static ReentrantLock lock = new ReentrantLock();
//公平锁
// public static ReentrantLock lock = new ReentrantLock(true);

public static class T1 extends Thread {
@Override
public void run() {
lock.lock();
System.out.println("T1 run...");
try {
Thread.sleep(5000);
System.out.println("T1 run end...");
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
lock.unlock();
}
System.out.println("T1 exit lock...");
}
}

public static class T2 extends Thread {
@Override
public void run() {
try {
//尝试获取锁
boolean b = lock.tryLock(5, TimeUnit.SECONDS);
lock.lock();
System.out.println("b = " + b);
System.out.println("T2 run...");
System.out.println("T2 run end...");
}catch (Exception e) {
throw new RuntimeException(e);
}finally {
lock.unlock();
}
System.out.println("T2 exit lock...");
}
}

public static void main(String[] args) {
T1 t1 = new T1();
T2 t2 = new T2();
t1.start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
t2.start();

// 输出:
// T2 run...
// T2 run end...
// T1 exit lock...
// T1 run...
// T1 run end...
// T1 exit lock...
}
}

带条件的 ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package org.example.thread.demo;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 带条件的 ReentrantLock
*/
public class MyConditionReentrantLock {
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();

public static class T1 extends Thread {
@Override
public void run() {
lock.lock();
System.out.println("T1 run...");
try {
System.out.println("T1 await...");
//等待
condition.await();
Thread.sleep(2000);
System.out.println("T1 run end...");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
lock.unlock();
}
System.out.println("T1 exit lock...");
}
}

public static class T2 extends Thread {
@Override
public void run() {
lock.lock();
//信号
condition.signal();
System.out.println("T2 run...");
System.out.println("T2 run end...");
lock.unlock();
System.out.println("T2 exit lock...");
}
}

public static void main(String[] args) {
T1 t1 = new T1();
T2 t2 = new T2();
t1.start();
t2.start();

// 输出:
// T1 run...
// T1 await...
// T2 run...
// T2 run end...
// T2 exit lock...
// T1 run end...
// T1 exit lock...

}
}

简单的读写锁 ReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package org.example.thread.demo;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* 简单的读写锁 ReadWriteLock
*/
public class MySimpleReadWriteLock {
//非公平锁
public static ReentrantLock lock = new ReentrantLock();
public static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

public static Integer value;

public static Lock readLock = readWriteLock.readLock();

public static Lock writeLock = readWriteLock.writeLock();

public void setValue(Lock lock, Integer val) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("CurrentThread name:" + Thread.currentThread().getName() +" setValue1:" + val);
value = val;
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public Integer getValue(Lock lock) {
try {
lock.lock();
System.out.println("CurrentThread name:" + Thread.currentThread().getName() +" getValue:" + value);
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return value;
}

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(10);
MySimpleReadWriteLock mySimpleReadWriteLock = new MySimpleReadWriteLock();
long s = System.currentTimeMillis();
int count = 30;
for (int i = 0; i < 5; i++) {
executorService.submit(new Thread(() -> {
Random random = new Random();
int i1 = random.nextInt();
mySimpleReadWriteLock.setValue(writeLock, i1);
}));
}
for (int i = 0; i < count; i++) {
executorService.submit(new Thread(() -> {
Integer value1 = mySimpleReadWriteLock.getValue(readLock);

}));
}
executorService.shutdown();
while (!executorService.isTerminated()) {
//System.out.print("-");
}
;
long e = System.currentTimeMillis();
System.out.println("花费时间:" + (e - s) + "ms");

}
}

简单的倒计时锁 CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package org.example.thread.demo;

import java.util.concurrent.CountDownLatch;

/**
* 简单的倒计时锁 CountDownLatch
*/
public class MySimpleCountDownLatch {

public static CountDownLatch countDownLatch = new CountDownLatch(3);

public static class CheckTask implements Runnable {
@Override
public void run() {
System.out.println("T1 run...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
countDownLatch.countDown();
System.out.println("T1 run end...");
}
}

public static void main(String[] args) throws Exception {
int count = 3;
for (int i = 0; i < count; i++) {
new Thread(new CheckTask()).start();
}
countDownLatch.await();
System.out.println("全部任务执行完成!");

}
}

Java多线程实战

基于SpringBoot的多线程调用

功能描述:基于线程池,异步调用调用外部数据接口,完成数据封装,然后将数据返回调用。

初始化项目,导入web依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
新增@EnableAsync注解,启动异步调用功能
1
2
3
4
5
6
7
8
9
@EnableAsync
@SpringBootApplication
public class ThreadDemoApplication {

public static void main(String[] args) {
SpringApplication.run(ThreadDemoApplication.class, args);
}

}
新增Config类,自定义线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

@Configuration
public class ThreadPoolConfig {

@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 设置核心线程数
executor.setMaxPoolSize(50); // 设置最大线程数
executor.setQueueCapacity(200); // 设置队列大小
executor.setThreadNamePrefix("threadPoolTaskExecutor-"); // 设置线程名前缀
executor.initialize();
return executor;
}

@Bean("threadPoolTaskExecutor2")
public ThreadPoolTaskExecutor threadPoolTaskExecutor2() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 设置核心线程数
executor.setMaxPoolSize(50); // 设置最大线程数
executor.setQueueCapacity(200); // 设置队列大小
executor.setThreadNamePrefix("threadPoolTaskExecutor2-"); // 设置线程名前缀
executor.initialize();
return executor;
}

}

新建mock控制层:MockController ,模拟外部接口数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.example.thread.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RequestMapping("mock")
@RestController
public class MockController {


@RequestMapping("/getData")
public String getData() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "data1 ";
}

@RequestMapping("/getData2")
public String getData2() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "data2 ";
}

@RequestMapping("/getData3")
public String getData3() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "data3 ";
}


}

新建接口调用服务层,在方法上@Async注解,基于RestTemplate进行外部接口调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.example.thread.service;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.CompletableFuture;


@Service
public class MyThreadService {

public static RestTemplate restTemplate;
static {
restTemplate = new RestTemplate();
}

@Async("threadPoolTaskExecutor")
public void printHello(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread()+":hello");
}


@Async("threadPoolTaskExecutor2")
public void printHello2(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread()+":hello");
}

@Async("threadPoolTaskExecutor2")
public CompletableFuture<String> getData1(){
String result = restTemplate.getForObject("http://localhost:8080/mock/getData", String.class);
return CompletableFuture.completedFuture(result);
}

@Async("threadPoolTaskExecutor2")
public CompletableFuture<String> getData2(){
String result = restTemplate.getForObject("http://localhost:8080/mock/getData2", String.class);
return CompletableFuture.completedFuture(result);
}

@Async("threadPoolTaskExecutor2")
public CompletableFuture<String> getData3(){
String result = restTemplate.getForObject("http://localhost:8080/mock/getData3", String.class);
return CompletableFuture.completedFuture(result);
}
}

新建示例控制层,使用/test2进行接口调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Slf4j
@RequestMapping("demo")
@RestController
public class DemoController {

@Autowired
MyThreadService myThreadService;

@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;


/**
* 异步执行
*
* @return
*/
@RequestMapping("/test")
public String test() {
for (int i = 0; i < 100; i++) {
myThreadService.printHello();
}
System.out.println("test end");
return "success";
}

/**
* 异步执行,同步封装数据返回
*
* @return
*/
@RequestMapping("/test2")
public String test2() {
CompletableFuture<String> future1 = myThreadService.getData1();
CompletableFuture<String> future2 = myThreadService.getData2();
CompletableFuture<String> future3 = myThreadService.getData3();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
try {
allFutures.get(10, TimeUnit.SECONDS); // 设置最大等待时间为10秒
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// 异常处理,根据具体情况进行处理
throw new RuntimeException(e);
}
if (future1.isCompletedExceptionally() || future2.isCompletedExceptionally() || future3.isCompletedExceptionally()) {
// 如果有任务发生异常,则在此处理
throw new RuntimeException("One or more tasks failed.");
}

String result = future1.join() + future2.join() + future3.join();
System.out.println(result);
return result;
}
}

调用结果:

​ 使用多线程异步调用,缩短了等待时间。分析:接口串形调用需要花费3秒+,使用线程池后,等待时间仅仅依赖于最慢的接口耗时,该项目中等待时间为1秒+,