Java多线程读写(生产者消费者模型)
概述
一个线程读取一个excel文件,储存到内存(此处为Queue),另一个线程从队列中读取文件,并写入新的excel文件,即生产者消费者模型。
代码
文件结构
pom导入依赖
1 |
|
Java多线程读写
故意设计为一个线程读,两个线程写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103package org;
import cn.hutool.poi.excel.ExcelReader;
import cn.hutool.poi.excel.ExcelUtil;
import cn.hutool.poi.excel.ExcelWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class App
{
// 更改为本地路径
private String readFileName = "E:/Pro/MultiThreadRW/src/file/test.xlsx";
private String writeFileName = "E:/Pro/MultiThreadRW/src/file/test2.xlsx";
private boolean end = false;
private final BlockingQueue<List<String>> queue = new LinkedBlockingQueue<>();
// 生产者
class Producer extends Thread{
@Override
public void run(){
// 读取excel文件
ExcelReader reader = ExcelUtil.getReader(readFileName);
List<List<Object>> lists = reader.read();
// 将对象按行写入队列
for (List<Object> list:lists ) {
try{
queue.put(cast(list));
System.out.println("[" + Thread.currentThread().getName() + "]生产者生产一行数据...");
}
catch (InterruptedException e){
e.printStackTrace();
}
}
end = true;
reader.close();
}
// object 转 String
public List<String> cast(List<Object> list){
List<String> tmp = new ArrayList<>();
for (Object li :list ) {
tmp.add((String) li);
}
return tmp;
}
}
// 消费者
class Consumer extends Thread{
@Override
public void run(){
ExcelWriter writer = ExcelUtil.getWriter(writeFileName);
// 当输入流完成或者队列为空
while (!end || !queue.isEmpty()){
try {
// 拿出一条数据
List<String> list = queue.take();
List<List<String>> row = new ArrayList<>();
row.add(list);
// 写入到输出流中
writer.write(row,true);
System.out.println("["+Thread.currentThread().getName()+"]消费者消费一行数据...");
}
catch (InterruptedException e){
e.printStackTrace();
}
}
writer.close();
}
}
// 主函数
public static void main( String[] args ) {
App app = new App();
Producer producer = app.new Producer();
Consumer consumer = app.new Consumer();
Consumer consumer2 = app.new Consumer();
producer.start();
consumer.start();
consumer2.start();
try {
producer.join();
consumer.join();
consumer2.join();
System.out.println("["+Thread.currentThread().getName()+"]完成...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果
1 |
|