并发编程和线程池

并发编程和线程池

练气期(并发编程基础)

练气期一层(this)

synchronized(this)和synchronized方法都是锁当前对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Test_01 {
private int count = 0; // 存在堆中
private Object o = new Object(); // 存在堆中

// 多个线程都能找到的,都能访问的对象叫临界资源对象
public void testSync1() {
synchronized (o) {
System.out.println(Thread.currentThread().getName()
+ " count = " + count++);
}
}

public void testSync2() {
synchronized (this) {
System.out.println(Thread.currentThread().getName()
+ " count = " + count++);
}
}

public synchronized void testSync3() {
System.out.println(Thread.currentThread().getName()
+ " count = " + count++);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
final Test_01 t = new Test_01();
new Thread(new Runnable() {
@Override
public void run() {
t.testSync3();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
t.testSync3();
}
}).start();
}

}

练气期二层(static)

静态同步方法,锁的是当前类型的类对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Test_02 {
private static int staticCount = 0;

public static synchronized void testSync4(){
System.out.println(Thread.currentThread().getName()
+ " staticCount = " + staticCount++);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public static void testSync5(){
synchronized(Test_02.class){
System.out.println(Thread.currentThread().getName()
+ " staticCount = " + staticCount++);
}
}

}

练气期三层(原子性)

加锁的目的就是为了保证操作的原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Test_03 implements Runnable {

private int count = 0;

@Override
public /*synchronized*/ void run() {
System.out.println(Thread.currentThread().getName()
+ " count = " + count++);
}

public static void main(String[] args) {
Test_03 t = new Test_03();
for (int i = 0; i < 5; i++) {
new Thread(t, "Thread - " + i).start();
}
}

}

练气期四层(同步与非同步方法间调用)

同步方法只影响锁定同一个锁对象的同步方法。不影响其他线程调用非同步方法,或调用其他锁资源的同步方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class Test_04 {
Object o = new Object();

public synchronized void m1() { // 重量级的访问操作。
System.out.println("public synchronized void m1() start");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("public synchronized void m1() end");
}

public void m3() {
synchronized (o) {
System.out.println("public void m3() start");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("public void m3() end");
}
}

public void m2() {
System.out.println("public void m2() start");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("public void m2() end");
}

public static class MyThread01 implements Runnable {
public MyThread01(int i, Test_04 t) {
this.i = i;
this.t = t;
}

int i;
Test_04 t;

public void run() {
if (i == 0) {
t.m1();
} else if (i > 0) {
t.m2();
} else {
t.m3();
}
}
}

public static void main(String[] args) {
Test_04 t = new Test_04();
new Thread(new Test_04.MyThread01(0, t)).start();
new Thread(new Test_04.MyThread01(1, t)).start();
new Thread(new Test_04.MyThread01(-1, t)).start();
}

}

练气期五层(存在原子性问题)

同步方法只能保证当前方法的原子性,不能保证多个业务方法之间的互相访问的原子性。一般来说,商业项目中,不考虑业务逻辑上的脏读问题。如你买东西下订单后,提示订单已下,查询时候,可能看不到。一般我们只关注数据脏读。但是在金融领域,保险领域严格要求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Test_05 {
private double d = 0.0;
public synchronized void m1(double d){
try {
// 相当于复杂的业务逻辑代码。
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.d = d;
}

public double m2(){
return this.d;
}

public static void main(String[] args) {
final Test_05 t = new Test_05();

new Thread(new Runnable() {
@Override
public void run() {
t.m1(100);
}
}).start();
System.out.println(t.m2());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(t.m2());
}

}

练气期六层(锁可重入)

同一个线程,多次调用同步代码,锁定同一个锁对象,可重入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Test_06 {

synchronized void m1(){ // 锁this
System.out.println("m1 start");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
m2();
System.out.println("m1 end");
}
synchronized void m2(){ // 锁this
System.out.println("m2 start");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("m2 end");
}

public static void main(String[] args) {

new Test_06().m1();

}

}

练气期七层(调用父类的同步方法)

子类同步方法覆盖父类同步方法,可以指定调用父类的同步方法, 相当于锁的重入。父类的方法 <<==>> 本类的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Test_07 {

synchronized void m() {
System.out.println("Super Class m start");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Super Class m end");
}

public static void main(String[] args) {
new Sub_Test_07().m();
}

}

class Sub_Test_07 extends Test_07 {
synchronized void m() {
System.out.println("Sub Class m start");
super.m();
System.out.println("Sub Class m end");
}
}

练气期八层(锁与异常)

当同步方法中发生异常的时候,自动释放锁资源,不会影响其他线程的执行。我们需要注意的是在同步业务逻辑中,如果发生异常如何处理——— try/catch 。如存钱时,发送网络中断,查询的时候查到多少钱,存的钱要返还

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Test_08 {
int i = 0;

synchronized void m() {
System.out.println(Thread.currentThread().getName() + " - start");
while (true) {
i++;
System.out.println(Thread.currentThread().getName() + " - " + i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
/*if(i == 5){
i = 1/0;
}*/
//模拟存钱,中断处理
if (i == 5) {
try {
i = 1 / 0;
} catch (Exception e) {
i = 0;
}
}
}
}

public static void main(String[] args) {
final Test_08 t = new Test_08();
// 锁的是当前对象
new Thread(new Runnable() {
@Override
public void run() {
t.m();
}
}, "t1").start();

new Thread(new Runnable() {
@Override
public void run() {
t.m();
}
}, "t2").start();
}

}

练气期九层(volatile)

cpu默认查询cpu的高速缓存区域,CPU中每一个核都有自己的缓存,当cpu有中断的时候,他可能清空高速缓存区域数据,重新从内存中读取数据。volatile改变内存中的数据,通知底层OS系统,每次使用b的时候,最好看下内存数据是否发生变动。即volatile做的是一个通知OS系统的作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Test_09 {

volatile boolean b = true; //线程可见性问题

void m(){
System.out.println("start");
while(b){}
System.out.println("end");
}

public static void main(String[] args) {
final Test_09 t = new Test_09();
new Thread(new Runnable() {
@Override
public void run() {
t.m();
}
}).start();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

t.b = false; //堆空间的对象,线程共享
}

}

volatile的非原子性问题,只能保证可见性,不能保证原子性。

那什么时候使用volatile?棋牌室的人数,新增的人有一个线程去+1。这是可以使用volatile

join()多个线程在运行结束时,我把多个线程再main线程的位置连在一起,当其他线程都结束,即保证在所有线程循环执行+1后,再执行main线程打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Test_10 {

volatile int count = 0;

/*synchronized*/ void m() { //保证原子性的解决方法是使用synchronized或者是Atomic
for (int i = 0; i < 10000; i++) {
count++;
}
}

public static void main(String[] args) {
final Test_10 t = new Test_10();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(new Runnable() {
@Override
public void run() {
t.m();
}
}));
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(t.count); //理论上是10w。实际少于这个数
}
}

练气期十层(AtomicXxx)

什么时候有原子性,没有可见性?

答:所谓原子性是指多个线程访问一个变量时,其结果必须保证正确性。所谓可见性是指多线程间可以看最终结果的变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Test_11 {
AtomicInteger count = new AtomicInteger(0);

void m() {
for (int i = 0; i < 10000; i++) {
/*if(count.get() < 1000)*/
count.incrementAndGet(); //相当于++count,count.getAndAccumulate()是count++;
}
}

public static void main(String[] args) {
final Test_11 t = new Test_11();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(new Runnable() {
@Override
public void run() {
t.m();
}
}));
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(t.count.intValue());
}
}

练气期十一层(锁对象变更)

  • 同步代码一旦加锁后,那么会有一个临时的锁引用指向锁对象,和真实的引用无直接关联。在锁未释放之前,修改锁引用,不会影响同步代码的执行。
  • 我们打印的是Test_13中的o。不是锁引用的_O;下面synchronized锁的是两个对象。打印的是同一个对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Test_13 {
Object o = new Object(); //变量引用

int i = 0;

int a() {
try {
/*
* return i ->
* int _returnValue = i; // 0;
* return _returnValue;
*/
return i;
} finally {
i = 10;
}
}

void m() {
System.out.println(Thread.currentThread().getName() + " start");
synchronized (o) { //计算变量引用与变量引用不是一回事
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - " + o);
}
}
}

public static void main(String[] args) {
final Test_13 t = new Test_13();
new Thread(new Runnable() {
@Override
public void run() {
t.m();
}
}, "thread1").start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
t.m();
}
}, "thread2");
t.o = new Object();
thread2.start(); //更改临界资源对象

System.out.println(t.i);
System.out.println(t.a());
System.out.println(t.i);
}

}

练气期十二层(CountDownLatch)

  • 不会进入等待队列,可以和锁混合使用,或替代锁的功能。
  • 一次性在门上挂多个锁。
  • 作用如:init对象的时候有一个前后顺序的问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Test_15 {
CountDownLatch latch = new CountDownLatch(5);

void m1() {
try {
latch.await();// 等待门闩开放。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("m1() method");
}

void m2() {
for (int i = 0; i < 10; i++) {
if (latch.getCount() != 0) {
System.out.println("latch count : " + latch.getCount());
latch.countDown(); // 减门闩上的锁。
}
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("m2() method : " + i);
}
}

public static void main(String[] args) {
final Test_15 t = new Test_15();
new Thread(new Runnable() {
@Override
public void run() {
t.m1();
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
t.m2();
}
}).start();
}

}

练气期大圆满

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Test_14 {
String s1 = "hello";
String s2 = new String("hello"); // new关键字,一定是在堆中创建一个新的对象。
Integer i1 = 1; // i1与i2是同一个变量,在常量池中,new是放在堆内存
Integer i2 = 1;

void m1() {
synchronized (i1) { //s1与s2
System.out.println("m1()");
while (true) {

}
}
}

void m2() {
synchronized (i2) {
System.out.println("m2()");
while (true) {

}
}
}

public static void main(String[] args) {
final Test_14 t = new Test_14();
new Thread(new Runnable() {
@Override
public void run() {
t.m1();
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
t.m2();
}
}).start();
}

}

自定义容器,提供新增元素(add)和获取元素数量(size)方法。启动两个线程。线程1向容器中新增10个数据。线程2监听容器元素数量,当容器元素数量为5时,线程2输出信息并终止。

  1. 使用volatile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Test_01 {
public static void main(String[] args) {
final Test_01_Container t = new Test_01_Container();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("add Object to Container " + i);
t.add(new Object());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
while (true) {
if (t.size() == 5) {
System.out.println("size = 5");
break;
}
}
}
}).start();
}
}

class Test_01_Container {
volatile List<Object> container = new ArrayList<>();

public void add(Object o) {
this.container.add(o);
}

public int size() {
return this.container.size();
}
}
  1. 使用synchronized和wait(), 调用wait()将释放锁,并且进入等待队列中,生产者与消费者模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Test_02 {
public static void main(String[] args) {
final Test_02_Container t = new Test_02_Container();
final Object lock = new Object();

new Thread(new Runnable(){
@Override
public void run() {
synchronized (lock) {
if(t.size() != 5){
try {
lock.wait(); // 线程进入等待队列。
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("size = 5");
lock.notifyAll(); // 唤醒其他等待线程
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
for(int i = 0; i < 10; i++){
System.out.println("add Object to Container " + i);
t.add(new Object());
if(t.size() == 5){
lock.notifyAll();
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}).start();
}
}

class Test_02_Container{
List<Object> container = new ArrayList<>();

public void add(Object o){
this.container.add(o);
}

public int size(){
return this.container.size();
}
}
  1. 使用门闩避免进入等待队列,效率更高。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class Test_03 {
public static void main(String[] args) {
final Test_03_Container t = new Test_03_Container();
final CountDownLatch latch = new CountDownLatch(1);

new Thread(new Runnable(){
@Override
public void run() {
if(t.size() != 5){
try {
latch.await(); // 等待门闩的开放。 不是进入等待队列
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("size = 5");
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 10; i++){
System.out.println("add Object to Container " + i);
t.add(new Object());
if(t.size() == 5){
latch.countDown(); // 门闩-1
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}

class Test_03_Container{
List<Object> container = new ArrayList<>();

public void add(Object o){
this.container.add(o);
}

public int size(){
return this.container.size();
}
}

小编是一枚Java Coder,业余写文章,现主营微信公众号《Java患者》,喜欢的话关注我的公众号或者加我微信我们一起学习Java

筑基期(ReentrantLock)

筑基初期(lock等待锁)

  • concurrent是jdk1.5后的包,避免synchronized的出现而设计出来的一种锁机制。
  • ReentrantLock 重入锁,在一个对象上加一个标记信息,这个标记信息代表锁机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class Test_01 {
Lock lock = new ReentrantLock();

void m1() {
try {
lock.lock(); // 加锁
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println("m1() method " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 解锁
}
}

void m2() {
lock.lock();
System.out.println("m2() method");
lock.unlock();
}

public static void main(String[] args) {
final Test_01 t = new Test_01();
new Thread(new Runnable() {
@Override
public void run() {
t.m1();
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
t.m2();
}
}).start();
}
}

筑基中期(tryLock尝试锁)

尝试锁有阻塞和非阻塞两种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class Test_02 {
Lock lock = new ReentrantLock();

void m1(){
try{
lock.lock();
for(int i = 0; i < 10; i++){
TimeUnit.SECONDS.sleep(1);
System.out.println("m1() method " + i);
}
}catch(InterruptedException e){
e.printStackTrace();
}finally{
lock.unlock();
}
}

void m2(){
boolean isLocked = false;
try{
// 尝试锁, 如果有锁,无法获取锁标记,返回false。
// 非阻塞,如果获取锁标记,返回true
// isLocked = lock.tryLock();

// 阻塞尝试锁,阻塞参数代表的时长,尝试获取锁标记。
// 如果超时,不等待。直接返回。
isLocked = lock.tryLock(5, TimeUnit.SECONDS);

if(isLocked){
System.out.println("m2() method synchronized");
}else{
System.out.println("m2() method unsynchronized");
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(isLocked){
// 尝试锁在解除锁标记的时候,一定要判断是否获取到锁标记。
// 如果当前线程没有获取到锁标记,会抛出异常。
lock.unlock();
}
}
}

public static void main(String[] args) {
final Test_02 t = new Test_02();
new Thread(new Runnable() {
@Override
public void run() {
t.m1();
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
t.m2();
}
}).start();
}
}

筑基后期(lockInterruptibly可打断锁)

  • 阻塞状态有3种: 包括普通阻塞(不释放锁),等待队列(释放锁),锁池队列。
    • 普通阻塞: sleep(10000), 可以被打断。调用thread.interrupt()方法,可以打断阻塞状态,抛出异常。
    • 等待队列: wait()方法被调用,也是一种阻塞状态,只能由notify唤醒。无法打断。
    • 锁池队列: 执行过程中,遇到同步代码,无法获取锁标记。不是所有的锁池队列都可被打断。
      • 使用ReentrantLock的lock方法,获取锁标记的时候,如果需要阻塞等待锁标记,无法被打断。
      • 使用ReentrantLock的lockInterruptibly方法,获取锁标记的时候,如果需要阻塞等待,可以被打断。
  • 可打断锁意义:软件锁死了,无响应,去去任务管理器结束任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class Test_03 {
Lock lock = new ReentrantLock();

void m1() {
try {
lock.lock();
for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println("m1() method " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

void m2() {
try {
// 线程执行到这里,本来是不能获得锁标记的,要进入等待队列的。
// 当通过调用当前线程的interrupt(),通过打断当前线程,抛出异常,使线程被唤醒,阻塞结束
lock.lockInterruptibly(); // 可尝试打断的,阻塞等待锁。可以被其他的线程打断阻塞状态
System.out.println("m2() method");
} catch (InterruptedException e) {
// 被打断的异常,被打断与唤醒、阻塞结束都是不一样的
// sleep任何一个线程都可以把他打断,强行唤醒
// 如果是lock不可被打断的
// 如果是lockInterruptibly,阻塞等待这把锁,类似sleep,可以通过interrupt()打断
System.out.println("m2() method interrupted");
} finally {
try {
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
final Test_03 t = new Test_03();
new Thread(new Runnable() {
@Override
public void run() {
t.m1();
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
t.m2();
}
});
t2.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 不调用interrupt()方法,t2最后可以获得锁,继续执行
t2.interrupt();// 打断t2线程,锁的位置会抛出异常。
}
}

筑基圆满(公平锁)

  • 在cpu和os中本身线程竞争锁标记是不公平的,不考虑线程的等待时间的。
  • 运用在轮询的场景,如打牌。
  • 需要效果一部分的cpu资源计算等待的时间,性能有所降低。要仅能少用,并发量在10之内。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Test_04 {

public static void main(String[] args) {
TestReentrantlock t = new TestReentrantlock();
//TestSync t = new TestSync();
Thread t1 = new Thread(t);
Thread t2 = new Thread(t);
t1.start();
t2.start();
}
}

class TestReentrantlock extends Thread {
// 定义一个公平锁
private static ReentrantLock lock = new ReentrantLock(true);

public void run() {
for (int i = 0; i < 5; i++) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " get lock");
} finally {
lock.unlock();
}
}
}

}

class TestSync extends Thread {
public void run() {
for (int i = 0; i < 5; i++) {
//不公平的
synchronized (this) {
System.out.println(Thread.currentThread().getName() + " get lock in TestSync");
}
}
}
}

小编是一枚Java Coder,业余写文章,现主营微信公众号《Java患者》,喜欢的话关注我的公众号或者加我微信我们一起学习Java

金丹期

金丹初期(生产者&消费者)

  • ReenTrantLock建议应用在同步方式,相对效率比synchronized高,量级较轻。
  • synchronized在JDK1.5版本尝试优化,到JDK1.7后,优化效率已经非常好了。在绝对效率上不比ReenTrantLock差多少。
  • 使用ReenTrantLock必须释放锁标记。一般在finally代码块释放锁标记的。
1
2
练习(生产者消费者模式):
自定义同步容器,容器容量上限为10。可以在多线程中应用,并保证数据线程安全。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class TestContainer01<E> {

private final LinkedList<E> list = new LinkedList<>();
private final int MAX = 10;
private int count = 0;

public synchronized int getCount(){
return count;
}

public synchronized void put(E e){
while(list.size() == MAX){
try {
this.wait();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}

list.add(e);
count++;
this.notifyAll();
}

public synchronized E get(){
E e = null;
while(list.size() == 0){
try{
this.wait();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
e = list.removeFirst();
count--;
this.notifyAll();
return e;
}

public static void main(String[] args) {
final TestContainer01<String> c = new TestContainer01<>();
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j < 5; j++){
System.out.println(c.get());
}
}
}, "consumer"+i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for(int i = 0; i < 2; i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j < 25; j++){
c.put("container value " + j);
}
}
}, "producer"+i).start();
}
}

}

使用ReentrantLock完成生产者-消费者

  • Condition, 为Lock增加条件。当条件满足时(生成了或者是被消费),做什么事情,如加锁或解锁。如等待或唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public class TestContainer02<E> {

private final LinkedList<E> list = new LinkedList<>();
private final int MAX = 10;
private int count = 0;

private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();

public int getCount(){
return count;
}

public void put(E e){
lock.lock();
try {
while(list.size() == MAX){
System.out.println(Thread.currentThread().getName() + " 等待。。。");
// 进入等待队列。释放锁标记。
// 借助条件,进入的等待队列。
producer.await();
}
System.out.println(Thread.currentThread().getName() + " put 。。。");
list.add(e);
count++;
// 借助条件,唤醒所有的消费者。
consumer.signalAll();
} catch (InterruptedException e1) {
e1.printStackTrace();
} finally {
lock.unlock();
}
}

public E get(){
E e = null;

lock.lock();
try {
while(list.size() == 0){
System.out.println(Thread.currentThread().getName() + " 等待。。。");
// 借助条件,消费者进入等待队列
consumer.await();
}
System.out.println(Thread.currentThread().getName() + " get 。。。");
e = list.removeFirst();
count--;
// 借助条件,唤醒所有的生产者
producer.signalAll();
} catch (InterruptedException e1) {
e1.printStackTrace();
} finally {
lock.unlock();
}

return e;
}

public static void main(String[] args) {
final TestContainer02<String> c = new TestContainer02<>();
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j < 5; j++){
System.out.println(c.get());
}
}
}, "consumer"+i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
for(int i = 0; i < 2; i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j < 25; j++){
c.put("container value " + j);
}
}
}, "producer"+i).start();
}
}

}

金丹中期(锁的底层实现)

Java 虚拟机中的同步(Synchronization)基于进入和退出管程(Monitor)对象实现。同步方法 并不是由 monitor enter 和 monitor exit 指令来实现同步的,而是由方法调用指令读取运行时常量池中方法的ACC_SYNCHRONIZED 标志来隐式实现的。注:monitor enter 和 monitor exit 指令是C语言的内容。

对象的内存模型(一个对象包含3部分,没有方法,方法是在方法区域中的)

对象内存模型

  • 对象头:存储对象的 hashCode、锁信息或分代年龄或 GC 标志,类型指针指向对象的类元数据,JVM 通过这个指针确定该对象是哪个类的实例等信息。(关注锁信息)
  • 实例变量:存放类的属性数据信息,包括父类的属性信息
  • 填充数据:由于虚拟机要求对象起始地址必须是 8 字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐
  • monitor在栈中,但不是在线程栈中。
  • _Owner指向线程。

当线程在对象上加锁时,对象头都会指向monitor,记录锁信息。当执行 synchronized 同步方法或同步代码块时,会在对象头中记录锁标记,锁标记指向的是 monitor 对象(也称为管程或监视器锁)的起始地址。每个对象都存在着一个 monitor 与之关联,对象与其 monitor 之间的关系有存在多种实现方式,如 monitor 可以与对象一起创建销毁或当线程试图获取对象锁时自动生成,但当一个 monitor 被某个线程持有后,它便处于锁定状态。

另外的线程想获取对象头中的锁信息的时候,会发现对象头中已经记录一把锁(monitor),他就获取不到。monitor是互斥的,对象头记录的monitor就不会分配给其他线程了,此时这个线程就会进入阻塞状态。当执行中的线程发生异常,或者是释放锁标记,对象头的锁信息就会释放它记录的monitor。阻塞状态的线程就会弹出来争夺对象中的锁信息,重新在锁信息中记录monitor。

ObjectMonitor 中有两个队列,_WaitSet 和 _EntryList,以及_Owner 标记。其中_WaitSet是用于管理等待队列(wait)线程的,_EntryList 是用于管理锁池阻塞线程的,_Owner 标记用于记录当前执行线程。

线程状态图

线程状态图

​ 当多线程并发访问同一个同步代码时,首先会进入_EntryList,当线程获取锁标记后,monitor 中的_Owner 记录此线程,并在 monitor 中的计数器执行递增计算(+1),代表锁定,其他线程在_EntryList 中继续阻塞。若执行线程调用 wait 方法,则 monitor 中的计数器执行赋值为 0 计算,并将_Owner 标记赋值为 null,代表放弃锁,执行线程进如_WaitSet 中阻塞。若执行线程调用 notify/notifyAll 方法,_WaitSet 中的线程被唤醒,进入_EntryList 中阻塞,等待获取锁标记。若执行线程的同步代码执行结束,同样会释放锁标记,monitor 中的_Owner标记赋值为 null,且计数器赋值为 0 计算。

​ interrupt() 方法可以任何打断阻塞状态的线程,以抛异常的代价。

​ InterruptedException异常是阻塞异常。阻塞中的线程抛出的。

锁的重入

​ 在 Java 中,同步锁是可以重入的。只有同一线程调用同步方法或执行同步代码块,对同一个对象加锁时才可重入。
​ 当线程持有锁时,会在 monitor 的计数器中执行递增计算,若当前线程调用其他同步代码,且同步代码的锁对象相同时,monitor 中的计数器继续递增。每个同步代码执行结束,monitor 中的计数器都会递减,直至所有同步代码执行结束,monitor 中的计数器为 0 时,释放锁标记,_Owner 标记赋值为 null。

金丹后期(锁的种类)

  • Java 中锁的种类包括偏向锁,自旋锁,轻量级锁,重量级锁。
  • 锁的使用方式先提供偏向锁,如果不满足的时候,升级为轻量级锁,再不满足,升级为重量级锁。自旋锁是一个过渡的锁状态,不是一种实际的锁类型。锁只能升级,不能降级。
  • 金丹初期提到的就是重量级锁。

偏向锁:

​ 是一种编译解释锁。如果代码中不可能出现多线程并发争抢同一个锁的时候,JVM 编译代码,解释执行的时候,会自动的放弃同步信息。消除 synchronized 的同步代码结果。使用锁标记的形式记录锁状态。在 Monitor 中有变量 ACC_SYNCHRONIZED。当变量值使用的时候,代表偏向锁锁定。可以避免锁的争抢和锁池状态的维护。提高JVM解释效率。

1
2
3
4
5
6
Object o = new Object();
public void m() {
o = new Object();
synchronized (o) {
}
}

轻量级锁:

​ 是一个过渡锁。当偏向锁不满足,也就是有多线程并发访问,锁定同一个对象的时候,先提升为轻量级锁。也是使用标记 ACC_SYNCHRONIZED 标记记录的。ACC_UNSYNCHRONIZED 标记记录未获取到锁信息的线程。就是只有两个线程争抢锁标记的时候,优先使用轻量级锁。A线程和monitor有直接关联的。B线程不记录monitor,是monitor记录B线程,线程A结束后,B两个线程才找到monitor。也可能出现重量级锁。

自旋锁

​ 是一个过渡锁,是偏向锁和轻量级锁的过渡。当获取锁的过程中,未获取到。为了提高效率,JVM 自动执行若干次空循环,再次申请锁,而不是进入阻塞状态的情况。称为自旋锁。自旋锁提高效率就是避免线程状态的变更。

金丹圆满(ThreadLocal)

  • 就是一个Map。key 是Thread.getCurrentThread(),value 是线程需要保存的变量。
  • ThreadLocal.set(value)相当map.put(Thread.getCurrentThread(), value)。
  • ThreadLocal.get() 相当map.get(Thread.getCurrentThread())。
  • 内存问题 : 在并发量高的时候,可能有内存溢出。
  • 使用ThreadLocal的时候,一定注意回收资源问题,每个线程结束之前,将当前线程保存的线程变量一定要删除 ,调用ThreadLocal.remove(),要不会发生泄露。run方法的finally代码块。

在一个操作系统中,线程和进程是有数量上限的。在操作系统中,确定线程和进程唯一性的唯一条件就是线程或进程 ID。操作系统在回收线程或进程的时候,不是一定杀死线程或进程,在繁忙的时候,只会做情况线程或进程栈数据的操作,重复使用线程或进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Test_01 {

volatile static String name = "zhangsan";
static ThreadLocal<String> tl = new ThreadLocal<>();

public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name); // lisi
System.out.println(tl.get()); // null
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
name = "lisi";
tl.set("wangwu");
}
}).start();
}

}

小编是一枚Java Coder,业余写文章,现主营微信公众号《Java患者》,喜欢的话关注我的公众号或者加我微信我们一起学习Java

元婴期(并发容器)

解决并发情况下的容器线程安全问题的。给多线程环境准备一个线程安全的容器对象。线程安全的容器对象: Vector, Hashtable。线程安全容器对象,都是使用 synchronized方法实现的。
concurrent 包中的同步容器,大多数是使用系统底层技术实现的线程安全。类似 native。Java8 中使用 CAS。

元婴前期(Map/Set)

  • ConcurrentHashMap/ConcurrentHashSet底层哈希实现的同步 Map(Set)。效率高,线程安全。使用系统底层技术实现线程安全。量级较 synchronized 低。key 和 value 不能为 null。
  • ConcurrentSkipListMap/ConcurrentSkipListSet底层跳表(SkipList)实现的同步 Map(Set)。有序,效率比 ConcurrentHashMap 稍低。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Test_01_ConcurrentMap {

public static void main(String[] args) {
final Map<String, String> map = new Hashtable<>(); // Collections.syncxxxxxx
// final Map<String, String> map = new ConcurrentHashMap<>();
// final Map<String, String> map = new ConcurrentSkipListMap<>(); 数据结构跳表
final Random r = new Random();
Thread[] array = new Thread[100];
final CountDownLatch latch = new CountDownLatch(array.length);

long begin = System.currentTimeMillis();
for (int i = 0; i < array.length; i++) {
array[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
map.put("key" + r.nextInt(100000), "value" + r.nextInt(100000));
}
latch.countDown();
}
});
}
for (Thread t : array) {
t.start();
}
try {
latch.await(); //等待门闩开放
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("执行时间为 : " + (end - begin) + "毫秒!");
}

}

调表机构:存10、18、15、20、19。

跳表

元婴中期(List)

  • CopyOnWriteArrayList:写时复制集合,效率低,读取效率高。每次写入数据,都会创建一个新的底层数组。
  • 浪费空间保证数据的安全。
  • 初始容量1,每次新增的内容,创建容量+1。
  • 取得时候,取最新的数组。remove最后一个数据,直接用上一个数组。
  • set和remove其他数据,重新创建数组。
  • 存在幻读(写的时候,有读操作,不是最新添加的数据)
  • 存在脏读(写的时候,有读操作,不是最新添加的数据)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Test_02_CopyOnWriteList {

public static void main(String[] args) {
// final List<String> list = new ArrayList<>(); 线程不安全
// final List<String> list = new Vector<>(); 线程安全 更快
final List<String> list = new CopyOnWriteArrayList<>();
final Random r = new Random();
Thread[] array = new Thread[100];
final CountDownLatch latch = new CountDownLatch(array.length);

long begin = System.currentTimeMillis();
for(int i = 0; i < array.length; i++){
array[i] = new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j < 1000; j++){
list.add("value" + r.nextInt(100000));
}
latch.countDown();
}
});
}
for(Thread t : array){
t.start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("执行时间为 : " + (end-begin) + "毫秒!");
System.out.println("List.size() : " + list.size());
}

}

元婴后期(Queue)

ConcurrentLinkedQueue:基础链表同步队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Test_03_ConcurrentLinkedQueue {

public static void main(String[] args) {
Queue<String> queue = new ConcurrentLinkedQueue<>();
for(int i = 0; i < 10; i++){
queue.offer("value" + i);
}

System.out.println(queue);
System.out.println(queue.size());

// peek() -> 查看queue中的首数据
System.out.println(queue.peek());
System.out.println(queue.size());

// poll() -> 获取queue中的首数据
System.out.println(queue.poll());
System.out.println(queue.size());
}

}

LinkedBlockingQueue:阻塞队列,队列容量不足自动阻塞,队列容量为 0 自动阻塞。

  • put自动阻塞, 队列容量满后,自动阻塞。
  • take自动阻塞方法, 队列容量为0后,自动阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Test_04_LinkedBlockingQueue {

final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
final Random r = new Random();

public static void main(String[] args) {
final Test_04_LinkedBlockingQueue t = new Test_04_LinkedBlockingQueue();

new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
t.queue.put("value"+t.r.nextInt(1000));
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "producer").start();

for(int i = 0; i < 3; i++){
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
System.out.println(Thread.currentThread().getName() +
" - " + t.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "consumer"+i).start();
}
}

}

ArrayBlockingQueue:底层数组实现的有界队列。自动阻塞。根据调用 API(add/put/offer)不同,有不同特性。当容量不足的时候,有阻塞能力。

  • add 方法在容量不足的时候,抛出异常。
  • put 方法在容量不足的时候,阻塞等待。
  • offer 方法,
    • 单参数 offer 方法,不阻塞。容量不足的时候,返回 false。当前新增数据操作放弃。
    • 三参数 offer 方法(offer(value,times,timeunit)),容量不足的时候,阻塞 times 时长(单
      位为 timeunit),如果在阻塞时长内,有容量空闲,新增数据返回 true。如果阻塞时长范围
      内,无容量空闲,放弃新增数据,返回 false。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Test_05_ArrayBlockingQueue {

final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

public static void main(String[] args) {
final Test_05_ArrayBlockingQueue t = new Test_05_ArrayBlockingQueue();

for(int i = 0; i < 5; i++){

// System.out.println("add method : " + t.queue.add("value"+i));
---------------------------------------------------------------------
/*try {
t.queue.put("put"+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("put method : " + i);*/
---------------------------------------------------------------------
// System.out.println("offer method : " + t.queue.offer("value"+i));
---------------------------------------------------------------------
try {
System.out.println("offer method : " +
t.queue.offer("value"+i, 1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println(t.queue);
}

}

DelayQueue:延时队列。根据比较机制,实现自定义处理顺序的队列。常用于定时任务。

  • 通过比较方法,比较排列,获取。
  • 可以保存的对象一定要实现Delayed接口。Delayed接口继承Comparable接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Test_06_DelayQueue {

static BlockingQueue<MyTask_06> queue = new DelayQueue<>();

public static void main(String[] args) throws InterruptedException {
long value = System.currentTimeMillis();
MyTask_06 task1 = new MyTask_06(value + 2000);
MyTask_06 task2 = new MyTask_06(value + 1000);
MyTask_06 task3 = new MyTask_06(value + 3000);
MyTask_06 task4 = new MyTask_06(value + 2500);
MyTask_06 task5 = new MyTask_06(value + 1500);

queue.put(task1);
queue.put(task2);
queue.put(task3);
queue.put(task4);
queue.put(task5);

System.out.println(queue);
System.out.println(value);
for(int i = 0; i < 5; i++){
System.out.println(queue.take());
}
}

}

class MyTask_06 implements Delayed {

private long compareValue;

public MyTask_06(long compareValue){
this.compareValue = compareValue;
}

/**
* 比较大小。自动实现升序
* 建议和getDelay方法配合完成。
* 如果在DelayQueue是需要按时间完成的计划任务,必须配合getDelay方法完成。
*/
@Override
public int compareTo(Delayed o) {
return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}

/**
* 获取计划时长的方法。
* 根据参数TimeUnit来决定,如何返回结果值。秒,毫秒
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(compareValue - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

@Override
public String toString(){
return "Task compare value is : " + this.compareValue;
}

}

LinkedTransferQueue:转移队列,

  • 使用 transfer 方法,没有消费者,就阻塞。必须有消费者(take()方法的调用者),实现数据的即时处理(电话)。
  • 无容量的,放数组,容量为零,这时候要阻塞。等另一个线程来拿,不经过容器的存储来转移数组。
  • 使用 add方法,直接存在容器中。队列会保存数据,不做阻塞等待(短信)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class Test_07_TransferQueue {

TransferQueue<String> queue = new LinkedTransferQueue<>();

public static void main(String[] args) {
final Test_07_TransferQueue t = new Test_07_TransferQueue();//为了匿名内部累方法中获得queue引用。

/*new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " thread begin " );
System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "output thread").start();

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

try {
t.queue.transfer("test string");
} catch (InterruptedException e) {
e.printStackTrace();
}*/

new Thread(new Runnable() {

@Override
public void run() {
try {
t.queue.transfer("test string");
// t.queue.add("test string");
System.out.println("add ok");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " thread begin " );
System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "output thread").start();

}

}

SynchronusQueue:同步队列,是一个容量为 0 的队列。是一个特殊的 TransferQueue。

  • 必须现有消费线程等待,才能使用的队列。
  • add 方法,无阻塞。若没有消费线程阻塞等待数据,则抛出非阻塞异常。
  • put 方法,有阻塞。若没有消费线程阻塞等待数据,则阻塞。
  • 场景:玩家与玩家之间的匹配。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Test_08_SynchronusQueue {

BlockingQueue<String> queue = new SynchronousQueue<>();

public static void main(String[] args) {
final Test_08_SynchronusQueue t = new Test_08_SynchronusQueue();

new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " thread begin " );
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "output thread").start();

/*try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
// t.queue.add("test add");
try {
t.queue.put("test put");
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + " queue size : " + t.queue.size());
}

}

元婴圆满(线程池)

Executor:线程池顶级接口。定义方法,void execute(Runnable)。方法是用于处理任务的一个服务方法。调用者提供 Runnable 接口的实现,线程池通过线程执行这个 Runnable。服务方法无返回值的。是 Runnable 接口中的 run 方法无返回值。常用方法 - void execute(Runnable),作用是启动线程任务的。

​ 他不是线程池,他是线程池线程池底层处理机制。在使用线程池的时候,底层如何处理本线程的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Test_01_MyExecutor implements Executor {
public static void main(String[] args) {
new Test_01_MyExecutor().execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " - test executor");
}
});
}

@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}

ExecutorService:Executor 接口的子接口。提供了一个新的服务方法,submit。有返回值(Future 类型)。submit 方法提供了 overload 方法。其中有参数类型为 Runnable 的,不需要提供返回值的;有参数类型为 Callable,可以提供线程执行后的返回值。

  • 他是线程池服务类型。所有的线程池类型都实现这个接口,实现这个接口,代表可以提供线程池能力。

  • Future是 submit 方法的返回值。代表未来,也就是线程执行结束后的一种结果。如返回值。

  • 常见方法

    • void execute(Runnable)
    • Future submit(Callable)
    • Future submit(Runnable)
    • shutdown():优雅关闭。 不是强行关闭线程池,回收线程池中的资源。而是不再处理新的任务,将已接收的任务处理完毕后再关闭。
  • 线程池状态

    • Running - 线程池正在执行中。活动状态。
    • ShuttingDown - 线程池正在关闭过程中。优雅关闭。一旦进入这个状态,线程池不再接收新的任务,处理所有已接收的任务,处理完毕后,关闭线程池。不能执行submit方法和execute方法。
    • Terminated - 线程池已经关闭。不能执行submit方法和execute方法。

Future:未来结果,代表线程任务执行结束后的结果。获取线程执行结果的方式是通过 get 方法获取的。

  • get 无参,阻塞等待线程执行结束,并得到结果。
  • get 有参,阻塞固定时长,等待线程执行结束后的结果,如果在阻塞时长范围内,线程未执行结束,抛出异常。
  • 常用方法:
    • T get()
    • T get(long, TimeUnit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Test_03_Future {

public static void main(String[] args) throws InterruptedException, ExecutionException {
/*FutureTask<String> task = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "first future task";
}
});

new Thread(task).start();

System.out.println(task.get());*/
// 上面代码和下面一模一样的
ExecutorService service = Executors.newFixedThreadPool(1);

Future<String> future = service.submit(new Callable<String>() {
@Override
public String call() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("aaa");
return Thread.currentThread().getName() + " - test executor";
}
});
System.out.println(future);
System.out.println(future.isDone()); // 查看线程是否结束, 任务是否完成。 call方法是否执行结束

System.out.println(future.get()); // 获取call方法的返回值。
System.out.println(future.isDone());
}

}

Callable:可执行接口, 类似 Runnable 接口,也是可以启动一个线程的接口。其中定义的方法是
call,call 方法的作用和 Runnable 中的 run 方法完全一致,call 方法有返回值。

  • 接口方法 : Object call();相当于 Runnable 接口中的 run 方法。区别为此方法有返回值。
    不能抛出已检查异常。
  • 和 Runnable 接口的选择 - 需要返回值或需要抛出异常时,使用 Callable,其他情况可
    任意选择。

Executors:工具类型,为 Executor 线程池提供工具方法。

  • 可以快速的提供若干种线程池。如:固定容量的,无限容量的,容量为 1 等各种线程池。
  • 线程池是一个进程级的重量级资源。默认的生命周期和 JVM 一致。当开启线程池后,直到 JVM 关闭为止,是线程池的默认生命周期。如果手工调用 shutdown 方法,那么线程池执行所有的任务后,自动关闭。不调用shutdown方法,程序一直不关闭的。
  • 开始 - 创建线程池。
  • 结束 - JVM 关闭或调用 shutdown 并处理完所有的任务。
  • 类似 Arrays,Collections 等工具类型的功用。

FixedThreadPool:容量固定的线程池。活动状态和线程池容量是有上限的线程池。

  • 所有的线程池中,都有一个任务队列。使用的是 BlockingQueue作为任务的载体。当任务数量大于线程池容量的时候,没有运行的任务保存在任务队列中,当线程有空闲的,自动从队列中取出任务执行。

  • 使用场景: 大多数情况下,使用的线程池,首选推荐 FixedThreadPool。OS 系统和硬件是有线程支持上限。不能随意的无限制提供线程池。

  • 线程池默认的容量上限是 Integer.MAX_VALUE。

  • 常见的线程池容量: PC - 200。 服务器 - 1000~10000

  • queued tasks - 任务队列,completed tasks - 结束任务队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Test_02_FixedThreadPool {

public static void main(String[] args) {
ExecutorService service =
Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
service.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - test executor");
}
});
}
//[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
System.out.println(service);

service.shutdown();
// 是否已经结束, 相当于回收了资源。
System.out.println(service.isTerminated()); // false
// 是否已经关闭, 是否调用过shutdown方法
System.out.println(service.isShutdown()); //true
// [Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
System.out.println(service);

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

// service.shutdown();
System.out.println(service.isTerminated()); //true
System.out.println(service.isShutdown()); //true
// [Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
System.out.println(service);
}

}

CachedThreadPool:缓存的线程池。容量不限(Integer.MAX_VALUE)。自动扩容。容量管理策略:如果线程池中的线程数量不满足任务执行,创建新的线程。每次有新任务无法即时处理的时候,都会创建新的线程。

  • 当线程池中的线程空闲时长达到一定的临界值(默认 60 秒),自动释放线程。
  • 默认线程空闲 60 秒,自动销毁。
  • 应用场景: 内部应用或测试应用。 内部应用,有条件的内部数据瞬间处理时应用,如:电信平台夜间执行数据整理(有把握在短时间内处理完所有工作,且对硬件和软件有足够的信心)。
  • 测试应用,在测试的时候,尝试得到硬件或软件的最高负载量,用于提供FixedThreadPool 容量的指导。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Test_05_CachedThreadPool {

public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
System.out.println(service); // 容量为0

for(int i = 0; i < 5; i++){
service.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - test executor");
}
});
}

System.out.println(service); // 容量为5

try {
TimeUnit.SECONDS.sleep(65);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(service);
}

}

ScheduledThreadPool:计划任务线程池。可以根据计划自动执行任务的线程池。

  • scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)
    • runnable - 要执行的任务。
    • start_limit - 第一次任务执行的间隔。
    • limit - 多次任务执行的间隔。
    • timeunit - 多次任务执行间隔的时间单位。
  • 他是阻塞的,效率低下。
  • 他本质就是DelayedQueue
  • 每间隔一定的时间,随机一个线程运行,并且运行完的线程,不会销毁,会继续等待下次选中运行。
  • 使用场景: 计划任务时选用(具体与DelaydQueue比较后选择),如:电信行业中的数据整理,每分钟整理,每消失整理,每天整理等.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Test_07_ScheduledThreadPool {

public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
System.out.println(service);

// 定时完成任务。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)
// runnable - 要执行的任务。
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}, 0, 300, TimeUnit.MILLISECONDS);

}

}

SingleThreadExceutor:单一容量的线程池。使用场景: 所有任务交给它处理,保证任务顺序时使用。如: 游戏大厅中的公共频道聊天。秒杀。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Test_06_SingleThreadExecutor {

public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
System.out.println(service);

for(int i = 0; i < 5; i++){
service.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - test executor");
}
});
}

}

}

ForkJoinPool:分支合并线程池(mapduce 类似的设计思想,递归思想的运用)。适合用于处理复杂任务。

  • 初始化线程容量与 CPU 核心数相关。
  • 线程池中运行的内容必须是 ForkJoinTask 的子类型(RecursiveTask,RecursiveAction)。
  • ForkJoinPool - 分支合并线程池。 可以递归完成复杂任务。要求可分支合并的任务必须是 ForkJoinTask 类型的子类型。其中提供了分支和合并的能力。ForkJoinTask 类型提供了两个抽象子类型,RecursiveTask 有返回结果的分支合并任务,RecursiveAction 无返回结果的分支合并任务。(Callable/Runnable)compute 方法:就是任务的执行逻辑。
  • ForkJoinPool 没有所谓的容量。默认都是 1 个线程。根据任务自动的分支新的子线程。当子线程任务结束后,自动合并。所谓自动是根据 fork 和 join 两个方法实现的。
  • 应用: 主要是做科学计算或天文计算的。数据分析的.
  • 拿空间换时间,效率高,但要看CPU能力。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class Test_08_ForkJoinPool {

final static int[] numbers = new int[1000000];
final static int MAX_SIZE = 50000;
final static Random r = new Random();


static{
for(int i = 0; i < numbers.length; i++){
numbers[i] = r.nextInt(1000);
}
}

static class AddTask extends RecursiveTask<Long>{ // RecursiveAction
int begin, end;
public AddTask(int begin, int end){
this.begin = begin;
this.end = end;
}

//
protected Long compute(){
if((end - begin) < MAX_SIZE){
long sum = 0L;
for(int i = begin; i < end; i++){
sum += numbers[i];
}
// System.out.println("form " + begin + " to " + end + " sum is : " + sum);
return sum;
}else{
int middle = begin + (end - begin)/2;
AddTask task1 = new AddTask(begin, middle);
AddTask task2 = new AddTask(middle, end);
task1.fork();// 就是用于开启新的任务的。 就是分支工作的。 就是开启一个新的线程任务。
task2.fork();
// join - 合并。将任务的结果获取。 这是一个阻塞方法。一定会得到结果数据。
return task1.join() + task2.join();
}
}
}

public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
long result = 0L;
for(int i = 0; i < numbers.length; i++){
result += numbers[i];
}
System.out.println(result);

ForkJoinPool pool = new ForkJoinPool();
AddTask task = new AddTask(0, numbers.length);

Future<Long> future = pool.submit(task);
System.out.println(future.get());

}

}

ThreadPoolExecutor:线程池底层实现。除 ForkJoinPool 外,其他常用线程池底层都是使用ThreadPoolExecutor实现的。public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue);

  • corePoolSize: 核心容量,创建线程池的时候,默认有多少线程。也是线程池保持的最少线程数
  • maximumPoolSize: 最大容量,线程池最多有多少线程
  • keepAliveTime:生命周期,0 为永久。当线程空闲多久后,自动回收。
  • unit:生命周期单位,为生命周期提供单位,如:秒,毫秒
  • workQueue:任务队列,阻塞队列。注意,泛型必须是

使用场景: 默认提供的线程池不满足条件时使用。如:初始线程数据 4,最大线程数200,线程空闲周期 30 秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Test_09_ThreadPoolExecutor {

public static void main(String[] args) {
// 模拟fixedThreadPool, 核心线程5个,最大容量5个,线程的生命周期无限。
ExecutorService service =
new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());

for(int i = 0; i < 6; i++){
service.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - test executor");
}
});
}

System.out.println(service);

service.shutdown();
System.out.println(service.isTerminated());
System.out.println(service.isShutdown());
System.out.println(service);

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

service.shutdown();
System.out.println(service.isTerminated());
System.out.println(service.isShutdown());
System.out.println(service);

}

}

1
2
3
练习:
启动若干线程,并行访问同一个容器中的数据。保证获取容器中数据时没有数据错误,且线程安全。
如:售票,秒杀等业务。

使用synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Test_01 {

static List<String> list = new ArrayList<>();
// static List<String> list = new Vector<>();

static{
for(int i = 0; i < 10000; i++){
list.add("String " + i);
}
}

public static void main(String[] args) {
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
while(list.size() > 0){
System.out.println(Thread.currentThread().getName() + " - " + list.remove(0));
}
}
}, "Thread" + i).start();
}

/*for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
while(true){
synchronized (list) {
if(list.size() <= 0){
break;
}
System.out.println(Thread.currentThread().getName() + " - " + list.remove(0));
}
}
}
}, "Thread" + i).start();
}*/
}

}

使用queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Test_02 {

static Queue<String> list = new ConcurrentLinkedQueue<>();

static{
for(int i = 0; i < 10000; i++){
list.add("String " + i);
}
}

public static void main(String[] args) {
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
while(true){
String str = list.poll();
if(str == null){
break;
}
System.out.println(Thread.currentThread().getName() + " - " + str);
}
}
}, "Thread" + i).start();
}
}

}

小编是一枚Java Coder,业余写文章,现主营微信公众号《Java患者》,喜欢的话关注我的公众号或者加我微信我们一起学习Java

化神期(JVM1.7)

化神前期(jvm结构)

jvm基本结构图:

jvm基本结构图

  • 类加载子系统:类加载子系统负责从文件系统或者网络中加载 Class 信息,如ClassLoad这里面的组件。
  • 方法区:加载的类信息存放于一块称为方法区的内存空间。除了类的信息外,方法区中可能还会存放运行时常量池信息,包括字符串字面量和数字常量(这部分常量信息是 Class 文件中常量池部分的内存映射)。
  • Java 堆:java 堆在虚拟机启动的时候建立,它是 java 程序最主要的内存工作区域。几乎所有的java 对象实例都存放在 java 堆中。堆空间是所有线程共享的,这是一块与 java 应用密切相关的内存空间。
  • 直接内存:java 的 NIO 库允许 java 程序使用直接内存。直接内存是在 java 堆外的、直接向系统申请的内存空间。通常访问直接内存的速度会优于 java 堆。因此出于性能的考虑,读写频繁的场合可能会考虑使用直接内存。由于直接内存在 java 堆外,因此它的大小不会直接受限于 Xmx 指定的最大堆大小,但是系统内存是有限的,java 堆和直接内存的总和依然受限于操作系统能给出的最大内存。
  • 垃圾回收系统:垃圾回收系统是 java 虚拟机的重要组成部分,垃圾回收器可以对方法区、java 堆和直接内存进行回收。其中,java 堆是垃圾收集器的工作重点。和 C/C++不同,java 中所有的对象空间释放都是隐式的,也就是说,java 中没有类似 free()或者 delete()这样的函数释放指定的内存区域。对于不再使用的垃圾对象,垃圾回收系统会在后台默默工作,默默查找、标识并释放垃圾对象,完成包括 java 堆、方法区和直接内存中的全自动化管理。
  • Java 栈:每一个 java 虚拟机线程都有一个私有的 java 栈,一个线程的 java 栈在线程创建的时候被创建,java 栈中保存着帧信息,java 栈中保存着局部变量、方法参数,同时和 java 方法的调用、返回密切相关。
  • 本地方法栈:本地方法栈和 java 栈非常类似,最大的不同在于 java 栈用于方法的调用,而本地方法栈则用于本地方法的调用,作为对 java 虚拟机的重要扩展,java 虚拟机允许 java 直接调用本地方法(通常使用 C 编写)。
  • PC 寄存器:PC(Program Counter)寄存器也是每一个线程私有的空间,java 虚拟机会为每一个 java线程创建 PC 寄存器。在任意时刻,一个 java 线程总是在执行一个方法,这个正在被执行的方法称为当前方法。如果当前方法不是本地方法,PC 寄存器就会指向当前正在被执行的指令。如果当前方法是本地方法,那么 PC 寄存器的值就是 undefined。
  • Java HotSpot Client 模式和 Server 模式的区别:当虚拟机运行在-client 模式的时候,使用的是一个代号为 C1 的轻量级编译器, 而-server模式启动的虚拟机采用相对重量级,代号为 C2 的编译器. C2 比 C1 编译器编译的相对彻底,服务起来之后,性能更高。
    • JDK 安装目录/jre/lib/(x86、i386、amd32、amd64)/jvm.cfg文件中的内容,-server 和-client 哪一个配置在上,执行引擎就是哪一个。如果是 JDK1.5版本且是 64 位系统应用时,-client 无效。
    • –64 位系统内容
      -server KNOWN
      -client IGNORE
    • –32 位系统内容
      -server KNOWN
      -client KNOWN
    • 注意 :在部分 JDK1.6 版本和后续的 JDK 版本 (64 位系统 ) 中, -client 参数已经不起作用
      了, Server 模式成为唯一。

化神中期(堆结构及对象分代)

  • 什么是分代,分代的必要性是什么?

​ Java 虚拟机根据对象存活的周期不同,把堆内存划分为几块,一般分为新生代、老年代和永久代(对 HotSpot 虚拟机而言),这就是 JVM 的内存分代策略。
​ 堆内存是虚拟机管理的内存中最大的一块,也是垃圾回收最频繁的一块区域,我们程序所有的对象实例都存放在堆内存中。给堆内存分代是为了提高对象内存分配和垃圾回收的效率。试想一下,如果堆内存没有区域划分,所有的新创建的对象和生命周期很长的对象放在一起,随着程序的执行,堆内存需要频繁进行垃圾收集,而每次回收都要遍历所有的对象,遍历这些对象所花费的时间代价是巨大的,会严重影响我们的 GC 效率,并且会产生碎片。
​ 有了内存分代,情况就不同了,新创建的对象会在新生代中分配内存,经过多次回收仍然存活下来的对象存放在老年代中,静态属性、类信息等存放在永久代中,新生代中的对象存活时间短,只需要在新生代区域中频繁进行 GC,老年代中对象生命周期长,内存回收的频率相对较低,不需要频繁进行回收,永久代中回收效果太差,一般不进行垃圾回收,还可以根据不同年代的特点采用合适的垃圾收集算法。分代收集大大提升了收集效率,这些都是内存分代带来的好处。

  • 分代的划分Java

​ 虚拟机将堆内存划分为 新生代、老年代和永久代 ,永久代是 HotSpot 虚拟机特有的概念(JDK1.8 之后为 metaspace 替代永久代),它采用永久代的方式来实现方法区,其他的虚拟机实现没有这一概念,而且 HotSpot 也有取消永久代的趋势,在 JDK 1.7 中 HotSpot 已经开始了“去永久化”,把原本放在永久代的字符串常量池移出。永久主要存放常量、类信息、静态变量等数据,与垃圾回收关系不大,新生代和老年代是垃圾回收的主要区域。

堆内存简图

  • 新生代(Young Generation)

​ 新生成的对象优先存放在新生代中,新生代对象朝生夕死,存活率很低,在新生代Eden中,常规应用进行一次垃圾收集一般可以回收 70% ~ 95% 的空间,回收效率很高。
​ HotSpot 将新生代划分为三块,一块较大的 Eden(伊甸)空间和两块较小的 Survivor(幸存者)空间,默认比例为 8:1:1。划分的目的是因为 HotSpot 采用复制算法来回收新生代,设置这个比例是为了充分利用内存空间,减少浪费。新生成的对象在 Eden 区分配(大对象除外,大对象直接进入老年代),当 Eden 区没有足够的空间进行分配时,虚拟机将发起一次Minor GC。
​ GC 开始时,对象只会存在于 Eden 区和 From Survivor 区,To Survivor 区是空的(作为保留区域)。GC 进行时,Eden 区中所有存活的对象都会被复制到 To Survivor 区,而在 FromSurvivor 区中,仍存活的对象会根据它们的年龄值决定去向,年龄值达到年龄阀值(默认为15,新生代中的对象每熬过一轮垃圾回收,年龄值就加 1,GC 分代年龄存储在对象的 header中)的对象会被移到老年代中,没有达到阀值的对象会被复制到 To Survivor 区。接着清空Eden 区和 From Survivor 区,新生代中存活的对象都在 To Survivor 区。接着, From Survivor区和 To Survivor 区会交换它们的角色(复制算法减少碎片),也就是新的 To Survivor 区就是上次 GC 清空的 FromSurvivor 区,新的 From Survivor 区就是上次 GC 的 To Survivor 区,总之,不管怎样都会保证To Survivor 区在一轮 GC 后是空的。GC 时当 To Survivor 区没有足够的空间存放上一次新生代收集下来的存活对象时,需要依赖老年代进行分配担保,将这些对象存放在老年代中。

  • 老年代(Old Generationn )

​ 在新生代中经历了多次(具体看虚拟机配置的阀值)GC 后仍然存活下来的对象会进入老年代中。老年代中的对象生命周期较长,存活率比较高,在老年代中进行 GC 的频率相对而言较低,而且回收的速度也比较慢。

  • 永久代(Permanent Generationn)

​ 永久代存储类信息、常量、静态变量、即时编译器编译后的代码等数据,对这一区域而言,Java 虚拟机规范指出可以不进行垃圾收集,一般而言不会进行垃圾回收。

化神后期(垃圾回收算法及分代垃圾)

常见 垃圾回收算法

  • 引用计数 (Reference Counting ):比较古老的回收算法。原理是此对象有一个引用,即增加一个计数,删除一个引用则减少一个计数。垃圾回收时,只用收集计数为 0 的对象。此算法最致命的是无法处理循环引用的问题。
  • 复制(Copying):此算法把内存空间划为两个相等的区域,每次只使用其中一个区域。垃圾回收时,遍历当前使用区域,把正在使用中的对象复制到另外一个区域中。此算法每次只处理正在使用中的对象,因此复制成本比较小,同时复制过去以后还能进行相应的内存整理,不会出现“碎片”问题。当然,此算法的缺点也是很明显的,就是需要两倍内存空间。简图如下:

复制算法

  • 标记- 清除(Mark-Sweep ):最古老的算法,此算法执行分两阶段。第一阶段从引用根节点开始标记所有被引用的对象,第二阶段遍历整个堆,把未标记的对象清除。此算法需要暂停整个应用,同时,会产生内存碎片。简图如下:

标记-清楚算法

  • 标记- 整理(Mark-Compact ):此算法结合了“标记-清除”和“复制”两个算法的优点。也是分两阶段,第一阶段从根节点开始标记所有被引用对象,第二阶段遍历整个堆,把清除未标记对象并且把存活对象“压缩”到堆的其中一块,按顺序排放。此算法避免了“标记-清除”的碎片问题,同时也避免了“复制”算法的空间问题。简图如下:

标记-整理

垃圾收集器的分类

  • 次收集器:Scavenge GC,指发生在新生代的 GC,因为新生代的 Java 对象大多都是朝生夕死,所以
    Scavenge GC 非常频繁,一般回收速度也比较快。

    • 当 Eden 空间不足以为对象分配内存时,会触发 Scavenge GC。
    • 一般情况下,当新对象生成,并且在 Eden 申请空间失败时,就会触发 Scavenge GC,对Eden 区域进行 GC,清除非存活对象,并且把尚且存活的对象移动到 Survivor 区。然后整理Survivor 的两个区。这种方式的 GC 是对年轻代的 Eden 区进行,不会影响到年老代。因为大部分对象都是从 Eden 区开始的,同时 Eden 区不会分配的很大,所以 Eden 区的 GC 会频繁进行。因而,一般在这里需要使用速度快、效率高的算法,使 Eden 去能尽快空闲出来。
    • 当年轻代堆空间紧张时会被触发
    • 相对于全收集而言,收集间隔较短
  • 全收集器:Full GC,指发生在老年代的 GC,出现了 Full GC 一般会伴随着至少一次的 Minor GC(老年代的对象大部分是 Scavenge GC 过程中从新生代进入老年代),比如:分配担保失败。FullGC 的速度一般会比 Scavenge GC 慢 10 倍以上。

    • 当老年代内存不足或者显式调用 System.gc()方法时,会触发 Full GC。
    • 当老年代或者持久代堆空间满了,会触发全收集操作。
    • 可以使用 System.gc()方法来显式的启动全收集,全收集一般根据堆大小的不同,需要的时间不尽相同,但一般会比较长。
  • 垃圾回收器的常规组合使用:

    • Serial、ParNew、Parallel Scabenage构成新生代回收器。
    • Serial Old、Parallel Old、CMS是老年代回收器。
    • G1新老通用

垃圾回收的常见匹配

分代垃圾收集器

  • 串行收集器(Serial ):JDK1.3之前JVM唯一一个次收集器(新生代收集器),1.5版本也是默认次收集器,它是串收集器。
    • Serial 收集器是 Hotspot 运行在 Client 模式下的默认新生代收集器。
    • 它的特点是:只用一个 CPU(计算核心)/一条线程去完成 GC 工作, 且在进行垃圾收集时必须暂停其他所有的工作线程(“Stop The World” -后面简称 STW)。
    • 可以使用-XX:+UseSerialGC 打开。虽然是单线程收集, 但它却简单而高效, 在 VM 管理内存不大的情况下(收集几十 M~一两百 M 的新生代), 停顿时间完全可以控制在几十毫秒~一百多毫秒内。
    • 大多数收集器都是在串行收集器进行优化,减少他停顿的时间。

  • 并行收集器(ParNew ):ParNew 收集器其实是前面 Serial 的多线程版本,考虑用户等待的时间, 除使用多条线程进行 GC外, 包括 Serial可用的所有控制参数、收集算法、STW、对象分配规则、回收策略等都与 Serial 完全一样(也是VM启用 CMS 收集器-XX: +UseConcMarkSweepGC 的默认新生代收集器)。

    由于存在线程切换的开销, ParNew 在单 CPU 的环境中比不上 Serial, 且在通过超线程技术实现的两个 CPU 的环境中也不能 100%保证能超越 Serial. 但随着可用的 CPU 数量的增加,收集效率肯定也会大大增加(ParNew 收集线程数与 CPU 的数量相同, 因此在 CPU 数量过大的环境中, 可用-XX:ParallelGCThreads=参数控制 GC 线程数,一般与CPU的线程数相同)。

  • Parallel Scavenge 收集器:与 ParNew 类似, Parallel Scavenge 也是使用复制算法, 也是并行多线程收集器. 但与其他收集器关注尽可能缩短垃圾收集时间不同, Parallel Scavenge 更关注系统吞吐量:

    • 系统吞吐量=运行用户代码时间/(运行用户代码时间+垃圾收集时间)

    • 停顿时间越短就越适用于用户交互的程序-良好的响应速度能提升用户的体验;

    • 而高吞吐量则适用于后台运算而不需要太多交互的任务-可以最高效率地利用CPU时间,尽快地完成程序的运算任务.

    • Parallel Scavenge 提供了如下参数设置系统吞吐量:

  • Serial Old 收集器:Serial Old 是 Serial 收集器的老年代版本, 同样是单线程收集器,使用“标记-整理”算法

  • Parallel Old 收集器:Parallel Old 是 Parallel Scavenge 收集器的老年代版本, 使用多线程和“标记-整理”算
    法, 吞吐量优先, 主要与 Parallel Scavenge 配合在注重吞吐量及 CPU 资源敏感系统内使用;

  • CMS 收集器 (Concurrent Mark Sweep ):CMS(Concurrent Mark Sweep)收集器是一款具有划时代意义的收集器, 一款真正意义上的并发收集器, 虽然现在已经有了理论意义上表现更好的 G1 收集器, 但现在主流互联网企业线上选用的仍是 CMS(如 Taobao、微店)。

    • 并发(concurrent)包含用户线程,并行(Parallel)不包含。
    • CMS是一种以获取最短回收停顿时间为目标的收集器(CMS又称多并发低暂停的收集器),基于”标记-清除”算法实现, 整个 GC 过程分为以下 4 个步骤:
      • 初始标记(CMS initial mark)
      • 并发标记(CMS concurrent mark: GC Roots Tracing 过程)
      • 重新标记(CMS remark)
      • 并发清除(CMS concurrent sweep: 已死对象将会就地释放, 注意:此处没有压缩)
    • 其中 1,3 两个步骤(初始标记、重新标记)仍需 STW. 但初始标记仅只标记一下 GC Roots能直接关联到的对象, 速度很快; 而重新标记则是为了修正并发标记期间因用户程序继续运行而导致标记产生变动的那一部分对象的标记记录, 虽然一般比初始标记阶段稍长, 但要远小于并发标记时间。CMS 特点:
      • CMS 默认启动的回收线程数=(CPU 数目+3)4,当 CPU 数>4 时, GC线程一般占用不超过 25%的 CPU 资源, 但是当 CPU 数<=4 时, GC线程可能就会过多的占用用户 CPU 资源, 从而导致应用程序变慢, 总吞吐量降低。
      • 无法处理浮动垃圾, 可能出现 Promotion Failure、Concurrent Mode Failure 而导致另一
        次 Full GC 的产生: 浮动垃圾是指在 CMS 并发清理阶段用户线程运行而产生的新垃圾. 由于
        在 GC 阶段用户线程还需运行, 因此还需要预留足够的内存空间给用户线程使用, 导致 CMS
        不 能 像 其 他收 集 器那 样 等到 老 年 代几 乎 填满 了 再进 行 收 集. 因此 CMS 提 供 了
        -XX:CMSInitiatingOccupancyFraction 参 数 来 设 置 GC 的 触 发 百 分 比 ( 以 及
        -XX:+UseCMSInitiatingOccupancyOnly 来启用该触发百分比), 当老年代的使用空间超过该比例
        后 CMS 就会被触发(JDK 1.6 之后默认 92%). 但当 CMS 运行期间预留的内存无法满足程序需
        要, 就会出现上述 Promotion Failure 等失败, 这时 VM 将启动后备预案: 临时启用 Serial Old
        收集器来重新执行Full GC(CMS通常配合大内存使用, 一旦大内存转入串行的Serial GC, 那停
        顿的时间就是大家都不愿看到的了).
      • 最后, 由于 CMS 采用”标记-清除”算法实现, 可能会产生大量内存碎片. 内存碎片过
        多 可 能 会 导 致 无 法 分 配 大 对 象 而 提 前 触 发 Full GC. 因 此 CMS 提 供 了
        -XX:+UseCMSCompactAtFullCollection 开关参数, 用于在 Full GC 后再执行一个碎片整理过程.
        但内存整理是无法并发的, 内存碎片问题虽然没有了, 但停顿时间也因此变长了, 因此 CMS
        还提供了另外一个参数-XX:CMSFullGCsBeforeCompaction 用于设置在执行 N 次不进行内存整
        理的 Full GC 后, 跟着来一次带整理的(默认为 0: 每次进入 Full GC 时都进行碎片整理).

  • 分区收集- G1 收集器:G1(Garbage-First)是一款面向服务端应用的收集器, 主要目标用于配备多颗 CPU 的服务器治理大内存,-XX:+UseG1GC 启用 G1 收集器。

    • 与其他基于分代的收集器不同, G1 将整个 Java 堆划分为多个大小相等的独立区域(Region), 虽然还保留有新生代和老年代的概念, 但新生代和老年代不再是物理隔离的了,它们都是一部分 Region(不需要连续)的集合.如:

    • 每块区域既有可能属于 O 区、也有可能是 Y 区,因此不需要一次就对整个老年代/新生代回收。而是当线程并发寻找可回收的对象时,有些区块包含可回收的对象要比其他区块多很多。 虽然在清理这些区块时 G1 仍然需要暂停应用线程,,但可以用相对较少的时间优先回收垃圾较多的 Region。这种方式保证了 G1 可以在有限的时间内获取尽可能高的收集效率。

    • G1的新生代收集跟ParNew类似: 存活的对象被转移到一个或多个Survivor Regions.,如果存活时间达到阀值, 这部分对象就会被提升到老年代.如图:

    • 其特定是:

      • 一整块堆内存被分为多个 Regions.
      • 存活对象被拷贝到新的 Survivor 区或老年代.
      • 年轻代内存由一组不连续的 heap 区组成, 这种方法使得可以动态调整各代区域尺寸.
      • Young GC 会有 STW 事件, 进行时所有应用程序线程都会被暂停.
      • 多线程并发 GC.
    • G1 老年代 GC 特点如下 :

      • 并发标记阶段
        1. 在与应用程序并发执行的过程中会计算活跃度信息 .
        2. 这些活跃度信息标识出那些 regions 最适合在 STW 期间回收 (which regions will be best
          to reclaim during an evacuation pause).
        3. 不像 CMS 有清理阶段 .
      • 再次标记阶段
        1. 使用 Snapshot-at-the-Beginning(SATB) 算法比 CMS 快得多 .
        2. 空 region 直接被回收 .
      • 拷贝 / 清理阶段 (Copying/Cleanup Phase)
        1. 年轻代与老年代同时回收 .
        2. 老年代内存回收会基于他的活跃度信息

化神圆满(JVM优化)

JDK 常用 JVM 优化相关命令

  • jps

    • jps - l:显示线程 id 和执行线程的主类名
    • jps -v:显示线程 id 和执行线程的主类名和 JVM 配置信息
  • jstat

    • jstat -参数 线程 id 执行时间(单位毫秒) 执行次数
    • 如:jstat -gc 4488 30 10
      • SXC - survivor 初始空间大小,单位字节。(X为survivor中X区域)
      • SXU - survivor 使用空间大小, 单位字节。
      • EC - eden 初始空间大小
      • EU - eden 使用空间大小
      • OC - old 初始空间大小
      • OU - old 使用空间大小
      • PC - permanent 初始空间大小
      • PU - permanent 使用空间大小
      • YGC - youngGC 收集次数
      • YGCT - youngGC 收集使用时长, 单位秒
      • FGC - fullGC 收集次数
      • FGCT - fullGC 收集使用时长
      • GCT - 总计收集使用总时长 YGCT+FGCT
  • jvisualvm:一个 JDK 内置的图形化 VM 监视管理工具。一般我们会在里面安装visualgc 插件。(工具、插件、可用插件),设置编辑url连接地址。

    JVM 常见参数
    配置方式:java [options] MainClass [arguments]
    options - JVM 启动参数。 配置多个参数的时候,参数之间使用空格分隔。
    参数命名: 常见为 -参数名
    参数赋值: 常见为 -参数名=参数值 | -参数名:参数值

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Test {
public static void main(String[] args) {
List<GarbageCollectorMXBean> l = ManagementFactory.getGarbageCollectorMXBeans();
for(GarbageCollectorMXBean b : l) {
System.out.println(b.getName()); // 收集器名称
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
  • 内存设置

    • -Xms:初始堆大小,JVM 启动的时候,给定堆空间大小。
    • -Xmx:最大堆大小,JVM 运行过程中,如果初始堆空间不足的时候,最大可以扩展到多少。
    • -Xmn:设置年轻代大小。整个堆大小=年轻代大小+年老代大小+持久代大小。持久代一般固定大小为 64m,所以增大年轻代后,将会减小年老代大小。此值对系统性能影响较大,Sun 官方推荐配置为整个堆的 3/8。
    • -Xss: 设置每个线程的 Java 栈大小。JDK5.0 以后每个线程 Java 栈大小为 1M,以前每个线程堆栈大小为 256K。根据应用的线程所需内存大小进行调整。在相同物理内存下,减小这个值能生成更多的线程。但是操作系统对一个进程内的线程数还是有限制的,不能无限生成,经验值在 3000~5000 左右。
    • -XX:NewSize=n:设置年轻代大小
    • -XX:NewRatio=n:设置年轻代和年老代的比值。如:为 3,表示年轻代与年老代比值为 1:3,年轻代占整个年轻代+年老代和的 1/4
    • -XX:SurvivorRatio=n:年轻代中 Eden 区与两个 Survivor 区的比值。注意 Survivor 区有两个。如:3,表示 Eden:Survivor=3:2,一个 Survivor 区占整个年轻代的 1/5
    • -XX:MaxPermSize=n:设置持久代大小
    • -XX:MaxTenuringThreshold:设置垃圾最大年龄。如果设置为 0 的话,则年轻代对象不经过 Survivor区,直接进入年老代。对于年老代比较多的应用,可以提高效率。如果将此值设置为一个较大值,则年轻代对象会在 Survivor 区进行多次复制,这样可以增加对象再年轻代的存活时间,增加在年轻代即被回收的概率。
  • 收集器设置

    • -XX:+UseSerialGC:设置串行收集器,年轻带收集器, 次收集器
    • -XX:+UseParallelGC:设置并行收集器
    • -XX:+UseParNewGC:设置年轻代为并行收集。可与 CMS 收集同时使用。JDK5.0 以上,JVM会根据系统配置自行设置,所以无需再设置此值。
    • -XX:+UseParallelOldGC:设置并行年老代收集器,JDK6.0 支持对年老代并行收集。
    • -XX:+UseConcMarkSweepGC:设置年老代并发收集器,测试中配置这个以后,-XX:NewRatio的配置失效,原因不明。所以,此时年轻代大小最好用-Xmn 设置。
    • -XX:+UseG1GC:设置 G1 收集器
  • 垃圾回收统计信息,类似日志的配置信息。会有控制台相关信息输出。 商业项目上线的时候,使用 loggc

    • -XX:+PrintGC
    • -XX:+Printetails
    • -XX:+PrintGCTimeStamps
    • -Xloggc:filename
  • 并行收集器设置

    • -XX:ParallelGCThreads=n:设置并行收集器收集时最大线程数使用的 CPU 数。并行收集线程数。
    • -XX:MaxGCPauseMillis=n:设置并行收集最大暂停时间,单位毫秒。可以减少 STW 时间。
    • -XX:GCTimeRatio=n:设置垃圾回收时间占程序运行时间的百分比。公式为 1/(1+n)并发收集器设置
    • -XX:+CMSIncrementalMode:设置为增量模式。适用于单 CPU 情况。
    • -XX:+UseAdaptiveSizePolicy:设置此选项后,并行收集器会自动选择年轻代区大小和相应的 Survivor 区比例,以达到目标系统规定的最低相应时间或者收集频率等,此值建议使用并行收集器时,一直打开。
    • -XX:CMSFullGCsBeforeCompaction=n:由于并发收集器不对内存空间进行压缩、整理,所以运行一段时间以后会产生“碎片”,使得运行效率降低。此值设置运行多少次 GC 以后对内存空间进行压缩、整理。
    • -XX:+UseCMSCompactAtFullCollection:打开对年老代的压缩。可能会影响性能,但是可
      以消除碎片
  • 内存设置经验分享

    • JVM 中最大堆大小有三方面限制:
      • 相关操作系统的数据模型(32-bt 还是 64-bit)限制;
      • 系统的可用虚拟内存限制;
      • 系统的可用物理内存限制。
      • 32 位系统 下,一般限制在 1.5G~2G;64 为操作系统对内存无限制。
    • Tomcat 配置方式: 编写 catalina.bat|catalina.sh ,增加 JAVA_OPTS 参数设置。 windows和 linux 配置方式不同。 windows - set “JAVA_OPTS=%JAVA_OPTS% 自定义参数 “ ; linux -JAVA_OPTS=”$JAVA_OPTS 自定义参数 “常见设置:
      • -Xmx3550m -Xms3550m -Xmn2g -Xss128k 适合开发过程的测试应用。要求物理内存大于4G。
        • 设置JVM最大可用内存与初始内存相同,可以避免每次垃圾完成后JVM重新分配内存。
        • -Xmn2g,设置年轻代为2个g,持久代一般固定大小为64m,所以增大年轻代后,将会减小老年代大小。官方推荐设置成整个堆的3/8。
        • -Xss128k ,设置每个线程的堆栈大小,JDK1.5后每个线程栈大小为1M,以前每个线程栈大小为256k。在相同物理内存下,减少这个值能生成更多的线程,但是操作系统对进程内的线程数量是有限的,不能无线生成,经验值在3000~5000左右。
  • 收集器设置经验分享

    • 关于收集器的选择 JVM 给了三种选择:串行收集器、并行收集器、并发收集器,但是串行收集器只适用于小数据量的情况,所以这里的选择主要针对并行收集器和并发收集器。默认情况下,JDK5.0 以前都是使用串行收集器,如果想使用其他收集器需要在启动时加入相应参数。JDK5.0 以后,JVM 会根据当前系统配置进行判断。
    • 常见配置:
      • 并行收集器主要以到达一定的吞吐量为目标,适用于科学计算和后台处理等。
      • -Xmx3800m -Xms3800m -Xmn2g -Xss128k -XX:+UseParallelGC -XX:ParallelGCThreads=20
      • 使用 ParallelGC 作为并行收集器, GC 线程为 20(CPU 核心数>=20 时),内存问题根据硬件配置具体提供。建议使用物理内存的 80%左右作为 JVM 内存容量。
      • -Xmx3550m -Xms3550m -Xmn2g -Xss128k -XX:+UseParallelGC -XX:ParallelGCThreads=20
        -XX:+UseParallelOldGC
      • 指定老年代收集器,在JDK5.0之后的版本,ParallelGC对应的全收集器就是ParallelOldGC。可以忽略
      • -Xmx3550m -Xms3550m -Xmn2g -Xss128k -XX:+UseParallelGC -XX:MaxGCPauseMillis=100
      • 指定 GC 时最大暂停时间。单位是毫秒。每次 GC 最长使用 100 毫秒。可以尽可能提高工作线程的执行资源。
      • -Xmx3550m -Xms3550m -Xmn2g -Xss128k -XX:+UseParallelGC -XX:MaxGCPauseMillis=100
        -XX:+UseAdaptiveSizePolicy
      • UseAdaptiveSizePolicy 是提高年轻代 GC 效率的配置。次收集器执行效率。
      • 并发收集器主要是保证系统的响应时间,减少垃圾收集时的停顿时间。适用于应用服务
        器、电信领域、互联网领域等。
      • -Xmx3550m -Xms3550m -Xmn2g -Xss128k -XX:ParallelGCThreads=20
        -XX:+UseConcMarkSweepGC -XX:+UseParNewGC
      • 指定年轻代收集器为 ParNew,年老代收集器 ConcurrentMarkSweep,并发 GC 线程数为20(CPU 核心>=20),并发 GC 的线程数建议使用(CPU 核心数+3)/4 或 CPU 核心数【不推荐使用】。
      • -Xmx3550m -Xms3550m -Xmn2g -Xss128k -XX:+UseConcMarkSweepGC
        -XX:CMSFullGCsBeforeCompaction=5 -XX:+UseCMSCompactAtFullCollection
      • CMSFullGCsBeforeCompaction=5 执行 5 次 GC 后,运行一次内存的整理。
      • UseCMSCompactAtFullCollection 执行老年代内存整理。可以避免内存碎片,提高 GC 过程中的效率,减少停顿时间。
  • 简单总结

    • 年轻代大小选择
      • 响应时间优先的应用:尽可能设大,直到接近系统的最低响应时间限制(根据实际情况选择)。在此种情况下,年轻代收集发生的频率也是最小的。同时,减少到达年老代的对象。
      • 吞吐量优先的应用:尽可能的设置大,可能到达 Gbit 的程度。因为对响应时间没有要求,垃圾收集可以并行进行,一般适合 8CPU 以上的应用。
    • 年老代大小选择
      • 响应时间优先的应用: 年老代使用并发收集器,所以其大小需要小心设置,一般要考虑并发会话率和会话持续时间等一些参数。如果堆设置小了,可以会造成内存碎片、高回收频率以及应用暂停而使用传统的标记清除方式;如果堆大了,则需要较 长的收集时间。最优化的方案,一般需要参考以下数据获得:
        • 并发垃圾收集信息
        • 持久代并发收集次数
        • 传统 GC 信息
        • 花在年轻代和年老代回收上的时间比例
        • 减少年轻代和年老代花费的时间,一般会提高应用的效率
      • 吞吐量优先的应用:一般吞吐量优先的应用都有一个很大的年轻代和一个较小的年老代。原因是,这样可以尽可能回收掉大部分短期对象,减少中期的对象,而年老代存放长期存活对象。
      • 较小堆引起的碎片问题,因为年老代的并发收集器使用标记、清除算法,所以不会对堆进行压缩。当收集器回收时,他会把相邻的空间进行合并,这样可以分配给较大的对象。但是,当堆空间较小时,运行一段时间以后,就会出现“碎片”,如果并发收集器找不到足够的空间,那么并发收集器将会停止,然后使用传统的标记、整理方式进行回收。如果出现“碎片”,可能需要进行如下配置:
        • -XX:+UseCMSCompactAtFullCollection:使用并发收集器时,开启对年老代的压缩。
        • -XX:CMSFullGCsBeforeCompaction=0:上面配置开启的情况下,这里设置多少次 Full GC后,对年老代进行压缩

小编是一枚Java Coder,业余写文章,现主营微信公众号《Java患者》,喜欢的话关注我的公众号或者加我微信我们一起学习Java

合体期(网络编程)

合体前期(Socket)

首先注意,Socket不是Java中独有的概念,而是一个语言无关标准。任何可以实现网络编程的编程语言都有Socket。

  • 什么是 Socket?

    ​ 网络上的两个程序通过一个双向的通信连接实现数据的交换,这个连接的一端称为一个socket。

    ​ 建立网络通信连接至少要一个端口号。socket 本质是编程接口(API),对 TCP/IP 的封装,TCP/IP 也要提供可供程序员做网络开发所用的接口,这就是 Socket 编程接口;HTTP 是轿车, 提供了封装或者显示数据的具体形式;Socket 是发动机,提供了网络通信的能力。

    Socket 的英文原义是插座。作为 BSD UNIX 的进程通信机制,取后一种意思。通 常也称作套接字,用于描述 IP 地址和端口,是一个通信链的句柄,可以用来实现不同虚 拟机或不同计算机之间的通信。在 Internet 上的主机一般运行了多个服务软件,同时提供几 种服务。每种服务都打开一个 Socket,并绑定到一个端口上,不同的端口对应于不同的服务。Socket 正如其英文原义那样,像一个多孔插座。一台主机犹如布满各种插座的房间,每个插 座有一个编号,有的插座提供 220 伏交流电, 有的提供 110 伏交流电,有的则提供有线电 视节目。 客户软件将插头插到不同编号的插座,就可以得到不同的服务。

  • Socket 连接步骤

    根据连接启动的方式以及本地套接字要连接的目标,套接字之间的连接过程可以分为三个步骤:服务器监听,客户端请求,连接确认。【如果包含数据交互+断开连接,那么一共是 五个步骤】

    1. 服务器监听:是服务器端套接字并不定位具体的客户端套接字,而是处于等待连接的状态,实时监控网络状态。
    2. 客户端请求:是指由客户端的套接字提出连接请求,要连接的目标是服务器端的套接字。为此,客户端的套接字必须首先描述它要连接的服务器的套接字,指出服务器端套接字的地址和端口号,然后就向服务器端套接字提出连接请求。
    3. 连接确认(三层握手):是指当服务器端套接字监听到或者说接收到客户端套接字的连接请求【1】,它就响应客户端套接字的请求,建立一个新的线程,把服务器端套接字的描述发给客户端【2】,一旦客户端确认了此描述,连接就建立好了【3】。而服务器端套接字继续处于监听状态,继续接收其他客户端套接字的连接请求。
    4. 断开连接:客户端向服务发起一个请求关闭消息【1】,服务器根据自己状态,等到可以关闭时候,发一个可以关闭的消息给客户端【2】,并且服务器再向浏览器发一个关闭成功的消息【3】,客户端发一个光笔成功的消息,至于服务器可以收到不管【4】。

  • Java 中的 Socket

    在java.net包是网络编程的基础类库。其中ServerSocket和Socket是网络编程的基础类型ServerSocket是服务端应用类型。Socket是建立连接的类型。当连接建立成功后,服务器和客户端都会有一个 Socket对象示例,可以通过这个Socket 对象示例,完成会话的所有操作。

    对于一个完整的网络连接来说,Socket是平等的,没有服务器客户端分级情况

  • 什么是同步和异步

    ​ 同步和异步是针对应用程序和内核OS的交互而言的,同步指的是用户进程触发 IO 操作并 等待或者轮询的去查看 IO 操作是否就绪,而异步是指用户进程触发 IO 操作以后便开始做自 己的事情,而当 IO 操作已经完成的时候会得到 IO 完成的通知,异步是OS底层支持的一种操作。以银行取款为例:

    • 同步 : 自己亲自出马持银行卡到银行取钱(使用同步 IO 时,Java 自己处理 IO 读写);
    • 异步 : 委托一小弟拿银行卡到银行取钱,然后给你(使用异步 IO 时,JavaIO 读写 委托给 OS 处理,需要将数据缓冲区地址和大小传给 OS(银行卡和密码)OS 需要支持异步 IO 操作 API
  • 什么是阻塞和非阻塞

    ​ 阻塞和非阻塞是针对于进程在访问数据的时候,根据 IO 操作的就绪状态来采取的不同 方式,说白了是一种读取或者写入操作方法的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入方法会立即返回一个状态值。 以银行取款为例:

    • 阻塞 : ATM 排队取款,你只能等待(使用阻塞 IO 时,Java 调用会一直阻塞到读写完 成才返回);
    • 非阻塞 : 柜台取款,取个号,然后坐在椅子上做其它事,等号广播会通知你办理,没到号你就不能去,你可以不断问大堂经理排到了没有,大堂经理如果说还没到你就不能去(使用非阻塞 IO 时,如果不能读写 Java 调用会马上返回,当 IO 事件分发器通知可读写时再继 续进行读写,不断循环直到读写完成)
    • ajax是异步阻塞的。

合体中期(BIO、NIO 、AIO)

  • BIO 编程Blocking IO同步阻塞的编程方式。

    • BIO 编程方式通常是在 JDK1.4 版本之前常用的编程方式。编程实现过程为:首先在服务端启动一个 ServerSocket 来监听网络请求,客户端启动 Socket 发起网络请求,默认情况下 ServerSocket 回建立一个线程来处理此请求,如果服务端没有线程可用,客户端则会阻塞等待或遭到拒绝。 且建立好的连接,在通讯过程中,是同步的。在并发处理效率上比较低。大致结构如下:

    • 每次请求都要创建一个server socket和一个thread

    • 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就 需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可 以通过线程池机制改善(有人把这种叫做伪异步,实际上不能实现任何的异步的操作,归根还是同步)。

    • BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序直观简单易理解。

    • 使用线程池机制改善后的 BIO 模型图如下:

Client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Client {
public static void main(String[] args) {
String host = null;
int port = 0;
if(args.length > 2){
host = args[0];
port = Integer.parseInt(args[1]);
}else{
host = "127.0.0.1";
port = 9999;
}

Socket socket = null;
BufferedReader reader = null;
PrintWriter writer = null;
Scanner s = new Scanner(System.in);
try{
socket = new Socket(host, port);
String message = null;

reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), "UTF-8"));
writer = new PrintWriter(
socket.getOutputStream(), true);
while(true){
message = s.nextLine();
if(message.equals("exit")){
break;
}
writer.println(message);
writer.flush();
System.out.println(reader.readLine());
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
socket = null;
if(reader != null){
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
reader = null;
if(writer != null){
writer.close();
}
writer = null;
}
}
}

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public class Server {

public static void main(String[] args) {
int port = genPort(args);

ServerSocket server = null;

try{
server = new ServerSocket(port);
System.out.println("server started!");
while(true){
Socket socket = server.accept();

new Thread(new Handler(socket)).start();
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(server != null){
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
server = null;
}
}

static class Handler implements Runnable{
Socket socket = null;
public Handler(Socket socket){
this.socket = socket;
}
@Override
public void run() {
BufferedReader reader = null;
PrintWriter writer = null;
try{

reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), "UTF-8"));
writer = new PrintWriter(
new OutputStreamWriter(socket.getOutputStream(), "UTF-8"));
String readMessage = null;
while(true){
System.out.println("server reading... ");
if((readMessage = reader.readLine()) == null){
break;
}
System.out.println(readMessage);
writer.println("server recive : " + readMessage);
writer.flush();
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
socket = null;
if(reader != null){
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
reader = null;
if(writer != null){
writer.close();
}
writer = null;
}
}

}

private static int genPort(String[] args){
if(args.length > 0){
try{
return Integer.parseInt(args[0]);
}catch(NumberFormatException e){
return 9999;
}
}else{
return 9999;
}
}

}

ThreadPool版的Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class Server {

public static void main(String[] args) {
int port = genPort(args);

ServerSocket server = null;
ExecutorService service = Executors.newFixedThreadPool(50);

try{
server = new ServerSocket(port);
System.out.println("server started!");
while(true){
Socket socket = server.accept();

service.execute(new Handler(socket));
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(server != null){
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
server = null;
}
}

static class Handler implements Runnable{
Socket socket = null;
public Handler(Socket socket){
this.socket = socket;
}
@Override
public void run() {
BufferedReader reader = null;
PrintWriter writer = null;
try{

reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), "UTF-8"));
writer = new PrintWriter(
new OutputStreamWriter(socket.getOutputStream(), "UTF-8"));
String readMessage = null;
while(true){
System.out.println("server reading... ");
if((readMessage = reader.readLine()) == null){
break;
}
System.out.println(readMessage);
writer.println("server recive : " + readMessage);
writer.flush();
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
socket = null;
if(reader != null){
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
reader = null;
if(writer != null){
writer.close();
}
writer = null;
}
}

}

private static int genPort(String[] args){
if(args.length > 0){
try{
return Integer.parseInt(args[0]);
}catch(NumberFormatException e){
return 9999;
}
}else{
return 9999;
}
}

}
  • NIO 编程 Unblocking IONew IO): 同步非阻塞的编程方式。

    • NIO 本身是基于事件驱动思想来完成的,其主要想解决的是 BIO 的大并发问题,NIO 基于Reactor,当 socket 有流可读或可写入 socket 时,操作系统会相应的通知应用程序进行处理,应用再将流读取到缓冲区或写入操作系统。也就是说,这个时候,已经不是一个连接就要对应一个处理线程了,而是有效的请求,对应一个线程,当连接没有数据时,是没有工作线程来处理的(即一个线程对应多个有效请求)。

    • NIO 的最重要的地方是当一个连接创建后,不需要对应一个线程,这个连接会被注册到多路复用器上面,所以所有的连接只需要一个线程就可以搞定,当这个线程中的多路复用器进行轮询的时候,发现连接上有请求的话,才开启一个线程进行处理,也就是一个请求一个线程模式,并且面向缓存。

    • NIO 的处理方式中,当一个请求来的话,开启线程进行处理,可能会等待后端应用的资源(JDBC 连接等待),其实这个线程就被阻塞了,当并发上来的话,还是会有 BIO 一样的问题。

    • 同步非阻塞,服务器实现模式为一个请求一个通道,即客户端发送的连接请求都会注册 到多路复用器上,多路复用器轮询到连接有 I/O 请求时才启动一个线程进行处理。

    • NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局 限于应用中,编程复杂,JDK1.4 开始支持。

    • Buffer:ByteBuffer,CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer**

    • Channel:SocketChannel,ServerSocketChannel**

    • Selector:Selector,AbstractSelector

    • SelectionKey:OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class NIOClient {

public static void main(String[] args) {
// 远程地址创建
InetSocketAddress remote = new InetSocketAddress("localhost", 9999);
SocketChannel channel = null;

// 定义缓存。
ByteBuffer buffer = ByteBuffer.allocate(1024);

try {
// 开启通道
channel = SocketChannel.open();
// 连接远程服务器。
channel.connect(remote);
Scanner reader = new Scanner(System.in);
while(true){
System.out.print("put message for send to server > ");
String line = reader.nextLine();
if(line.equals("exit")){
break;
}
// 将控制台输入的数据写入到缓存。
buffer.put(line.getBytes("UTF-8"));
// 重置缓存游标
buffer.flip();
// 将数据发送给服务器
channel.write(buffer);
// 清空缓存数据。
buffer.clear();

// 读取服务器返回的数据
int readLength = channel.read(buffer);
if(readLength == -1){
break;
}
// 重置缓存游标
buffer.flip();
byte[] datas = new byte[buffer.remaining()];
// 读取数据到字节数组。
buffer.get(datas);
System.out.println("from server : " + new String(datas, "UTF-8"));
// 清空缓存。
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally{
if(null != channel){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

}

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
public class NIOServer implements Runnable {

// 多路复用器, 选择器。 用于注册通道的。
private Selector selector;
// 定义了两个缓存。分别用于读和写。 初始化空间大小单位为字节。
// Buffer1是不安全的,要想安全模仿BIO,独立定义为Handler对象
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);

public static void main(String[] args) {
new Thread(new NIOServer(9999)).start();
}

public NIOServer(int port) {
init(port);
}

private void init(int port){
try {
System.out.println("server starting at port " + port + " ...");
// 开启多路复用器
this.selector = Selector.open();
// 开启服务通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 非阻塞, 如果传递参数true,为阻塞模式。
serverChannel.configureBlocking(false);
// 绑定端口
serverChannel.bind(new InetSocketAddress(port));
// 注册,并标记当前服务通道状态
/*
* register(Selector, int)
* int - 状态编码
* OP_ACCEPT : 连接成功的标记位。
* OP_READ : 可以读取数据的标记
* OP_WRITE : 可以写入数据的标记
* OP_CONNECT : 连接建立后的标记
*/
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("server started.");
} catch (IOException e) {
e.printStackTrace();
}
}

public void run(){
while(true){
try {
// 阻塞方法,当至少一个通道被选中,此方法返回。
// 通道是否选择,由注册到多路复用器中的通道标记决定。
this.selector.select();
// 返回以选中的通道标记集合, 集合中保存的是通道的标记。相当于是通道的ID。
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
// 将本次要处理的通道从集合中删除,下次循环根据新的通道列表再次执行必要的业务逻辑
keys.remove();
// 通道是否有效
if(key.isValid()){
// 阻塞状态
try{
if(key.isAcceptable()){
accept(key);
}
}catch(CancelledKeyException cke){
// 断开连接。 出现异常。
key.cancel();
}
// 可读状态
try{
if(key.isReadable()){
read(key);
}
}catch(CancelledKeyException cke){
key.cancel();
}
// 可写状态
try{
if(key.isWritable()){
write(key);
}
}catch(CancelledKeyException cke){
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}

}
}

private void write(SelectionKey key){
this.writeBuffer.clear();
SocketChannel channel = (SocketChannel)key.channel();
Scanner reader = new Scanner(System.in);
try {
System.out.print("put message for send to client > ");
String line = reader.nextLine();
// 将控制台输入的字符串写入Buffer中。 写入的数据是一个字节数组。
writeBuffer.put(line.getBytes("UTF-8"));
writeBuffer.flip();
channel.write(writeBuffer);

channel.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}

private void read(SelectionKey key){
try {
// 清空读缓存。
this.readBuffer.clear();
// 获取通道
SocketChannel channel = (SocketChannel)key.channel();
// 将通道中的数据读取到缓存中。通道中的数据,就是客户端发送给服务器的数据。
int readLength = channel.read(readBuffer);
// 检查客户端是否写入数据。
if(readLength == -1){
// 关闭通道
key.channel().close();
// 关闭连接
key.cancel();
return;
}
/*
* flip, NIO中最复杂的操作就是Buffer的控制。
* Buffer中有一个游标。游标信息在操作后不会归零,如果直接访问Buffer的话,数据有不一致的可能。
* flip是重置游标的方法。NIO编程中,flip方法是常用方法。
*/
this.readBuffer.flip();
// 字节数组,保存具体数据的。 Buffer.remaining() -> 是获取Buffer中有效数据长度的方法。
byte[] datas = new byte[readBuffer.remaining()];
// 是将Buffer中的有效数据保存到字节数组中。
readBuffer.get(datas);
System.out.println("from " + channel.getRemoteAddress() + " client : " + new String(datas, "UTF-8"));

// 注册通道, 标记为写操作。
channel.register(this.selector, SelectionKey.OP_WRITE);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
try {
key.channel().close();
key.cancel();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}

private void accept(SelectionKey key){
try {
// 此通道为init方法中注册到Selector上的ServerSocketChannel
ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
// 阻塞方法,当客户端发起请求后返回。 此通道和客户端一一对应。
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
// 设置对应客户端的通道标记状态,此通道为读取数据使用的。
channel.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}

}

}

TestBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
*
* Buffer的应用固定逻辑
* 写操作顺序
* 1. clear()
* 2. put() -> 写操作
* 3. flip() -> 重置游标
* 4. SocketChannel.write(buffer); -> 将缓存数据发送到网络的另一端
* 5. clear()
*
* 读操作顺序
* 1. clear()
* 2. SocketChannel.read(buffer); -> 从网络中读取数据
* 3. buffer.flip() -> 重置游标
* 4. buffer.get() -> 读取数据
* 5. buffer.clear()
*
*/
public class TestBuffer {
public static void main(String[] args) throws Exception {

ByteBuffer buffer = ByteBuffer.allocate(8);

byte[] temp = new byte[]{3,2,1};

// 写入数据之前 : java.nio.HeapByteBuffer[pos=0 lim=8 cap=8]
// pos - 游标位置, lim - 限制数量, cap - 最大容量
System.out.println("写入数据之前 : " + buffer);

// 写入字节数组到缓存
buffer.put(temp);

// 写入数据之后 : java.nio.HeapByteBuffer[pos=3 lim=8 cap=8]
// 游标为3, 限制为8, 容量为8,默认限制与容量一样大小
System.out.println("写入数据之后 : " + buffer);

// 重置游标 , lim = pos ; pos = 0;
buffer.flip();//把这行注释掉后,下面的循环就是5次了。
//在重置一次,pos为0,lim(可读写操作有效数据位数)为0,get会报错,也不让写。要写要clear

// 重置游标之后 : java.nio.HeapByteBuffer[pos=0 lim=3 cap=8]
// 游标为0, 限制为3,
System.out.println("重置游标之后 : " + buffer);

// 清空Buffer, pos = 0; lim = cap;
// buffer.clear();

// get() -> 获取当前游标指向的位置的数据。
// System.out.println(buffer.get());
// remaining是lim-pos
/*for(int i = 0; i < buffer.remaining(); i++){
// get(int index) -> 获取指定位置的数据。
int data = buffer.get(i);
System.out.println(i + " - " + data);
}*/
}
}
  • AIO 编程 AsynchronousIO: 异步非阻塞的编程方式
    • NIO 不同,当进行读写操作时,只须直接调用 APIreadwrite 方法即可。这两种 方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入 read 方 法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将 write 方法传递的流写入完 毕时,操作系统主动通知应用程序。即可以理解为,read/write 方法都是异步的,完成后会 主动调用回调函数。
    • 客户端向服务端发数据,首先是OS接收到,他会将数据写到buffer里,然后通知应用程序代码,数据已经准备好了,可以read拿走了。应用代码write时,也会不数据同步进入OS的Buffer里,通过反向通知告诉应用程序已经写完了。buffer数据会自动地反回给client。client与server交互借助OS实现异步操作。
    • JDK1.7 中,这部分内容被称作 NIO.2,主要在 java.nio.channels 包下增加了下面四个异步通道:
      • AsynchronousSocketChannel
      • AsynchronousServerSocketChannel
      • AsynchronousFileChannel
      • AsynchronousDatagramChannel
    • 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的 I/O 请求都是由 OS 先完成了再通知服务器应用去启动线程进行处理。
    • AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调 用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class AIOServer {

// 线程池, 提高服务端效率。
private ExecutorService service;
// 线程组
// private AsynchronousChannelGroup group;
// 服务端通道, 针对服务器端定义的通道。
private AsynchronousServerSocketChannel serverChannel;

public AIOServer(int port){
init(9999);
}

private void init(int port){
try {
System.out.println("server starting at port : " + port + " ...");
// 定长线程池
service = Executors.newFixedThreadPool(4);
/* 使用线程组
group = AsynchronousChannelGroup.withThreadPool(service);
serverChannel = AsynchronousServerSocketChannel.open(group);
*/
// 开启服务端通道, 通过静态方法创建的。
serverChannel = AsynchronousServerSocketChannel.open();
// 绑定监听端口, 服务器启动成功,但是未监听请求。
serverChannel.bind(new InetSocketAddress(port));
System.out.println("server started.");
// 开始监听
// accept(T attachment, CompletionHandler<AsynchronousSocketChannel, ? super T>)
// AIO开发中,监听是一个类似递归的监听操作。每次监听到客户端请求后,都需要处理逻辑开启下一次的监听。
// 下一次的监听,需要服务器的资源继续支持。this传递到AIOServerHandler的completed方法中
serverChannel.accept(this, new AIOServerHandler());
try {
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
new AIOServer(9999);
}

public ExecutorService getService() {
return service;
}

public void setService(ExecutorService service) {
this.service = service;
}

public AsynchronousServerSocketChannel getServerChannel() {
return serverChannel;
}

public void setServerChannel(AsynchronousServerSocketChannel serverChannel) {
this.serverChannel = serverChannel;
}

}

AIOServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class AIOServerHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServer> {

/**
* 业务处理逻辑, 当请求到来后,监听成功,应该做什么。
* 一定要实现的逻辑: 为下一次客户端请求开启监听。accept方法调用。
* result参数 : 就是和客户端直接建立关联的通道。
* 无论BIO、NIO、AIO中,一旦连接建立,两端是平等的。
* result中有通道中的所有相关数据。如:OS操作系统准备好的读取数据缓存,或等待返回数据的缓存。
*/
@Override
public void completed(AsynchronousSocketChannel result, AIOServer attachment) {
// 处理下一次的客户端请求。类似递归逻辑。
attachment.getServerChannel().accept(attachment, this);
doRead(result);
}

/**
* 异常处理逻辑, 当服务端代码出现异常的时候,做什么事情。
*/
@Override
public void failed(Throwable exc, AIOServer attachment) {
exc.printStackTrace();
}

/**
* 真实项目中,服务器返回的结果应该是根据客户端的请求数据计算得到的。不是等待控制台输入的。
* @param result
*/
private void doWrite(AsynchronousSocketChannel result){
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.print("enter message send to client > ");
Scanner s = new Scanner(System.in);
String line = s.nextLine();
buffer.put(line.getBytes("UTF-8"));
// 重点:必须复位,必须复位,必须复位
buffer.flip();
// write方法是一个异步操作。具体实现由OS实现。 可以增加get方法,实现阻塞,等待OS的写操作结束。
result.write(buffer);
// result.write(buffer).get(); // 调用get代表服务端线程阻塞,等待写操作完成
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}/* catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}*/
}

private void doRead(final AsynchronousSocketChannel channel){
ByteBuffer buffer = ByteBuffer.allocate(1024);
/*
* 异步读操作, read(Buffer destination, A attachment,
* CompletionHandler<Integer, ? super A> handler)
* destination - 目的地, 是处理客户端传递数据的中转缓存。 可以不使用。
* attachment - 处理客户端传递数据的对象。 通常使用Buffer处理。
* handler - 处理逻辑
*/
channel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {

/**
* 业务逻辑,读取客户端传输数据
* attachment - 在completed方法执行的时候,OS已经将客户端请求的数据写入到Buffer中了。
* 但是未复位(flip)。 使用前一定要复位。
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
System.out.println(attachment.capacity());
// 复位
attachment.flip();
System.out.println("from client : " + new String(attachment.array(), "UTF-8"));
doWrite(channel);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}

}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class AIOClient {

private AsynchronousSocketChannel channel;

public AIOClient(String host, int port){
init(host, port);
}

private void init(String host, int port){
try {
// 开启通道
channel = AsynchronousSocketChannel.open();
// 发起请求,建立连接。
channel.connect(new InetSocketAddress(host, port));
} catch (IOException e) {
e.printStackTrace();
}
}

public void write(String line){
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(line.getBytes("UTF-8"));
buffer.flip();
channel.write(buffer);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}

public void read(){
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// read方法是异步方法,OS实现的。get方法是一个阻塞方法,会等待OS处理结束后再返回,要不代码不等待,下面没数据打印出来。真实开发可以不加,其结果依靠OS自己去等待,拿到数据再返回通知,
channel.read(buffer).get();
// channel.read(dst, attachment, handler);
buffer.flip();
System.out.println("from server : " + new String(buffer.array(), "UTF-8"));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}

public void doDestory(){
if(null != channel){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
AIOClient client = new AIOClient("localhost", 9999);
try{
System.out.print("enter message send to server > ");
Scanner s = new Scanner(System.in);
String line = s.nextLine();
client.write(line);
client.read();
}finally{
client.doDestory();
}
}

}

合体后期(Netty)

  • 简介

    Netty 是由 JBOSS 提供的一个 java 开源框架。Netty 提供异步的、事件驱动的网络应用 程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 它是建立再NIO和AIO基础之上的。

    ​ 也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用 Netty 可以确保你 快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty 相当 简化和流线化了网络应用的编程开发过程,例如,TCPUDPsocket 服务开发。

    ​ “快速简单并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实 现经验,这些协议包括 FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项 目,最终,Netty* 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能, 稳定性和伸缩性。

    Netty4.x 版本开始,需要使用 JDK1.6 及以上版本提供基础支撑。

    ​ 在设计上:针对多种传输类型的统一接口 - 阻塞和非阻塞;简单但更强大的线程模型; 真正的无连接的数据报套接字支持;链接逻辑支持复用;

    ​ 在性能上:比核心 Java API 更好的吞吐量,较低的延时;资源消耗更少,这个得益于 共享池和重用;减少内存拷贝

    ​ 在健壮性上:消除由于慢,快,或重载连接产生的 OutOfMemoryError;消除经常发现 在 NIO 在高速网络中的应用中的不公平的读/写比

    ​ 在安全上:完整的 SSL / TLSStartTLS 的支持

    ​ 且已得到大量商业应用的真实验证,如:Hadoop 项目的 AvroRPC 框架)、DubboDubboxRPC 框架。

    Netty 的官网是:http://netty.io

    ​ 有 三 方 提 供 的 中 文 翻 译 Netty 用 户 手 册 ( 官 网 提 供 源 信 息 ): http://ifeve.com/netty5-user-guide/

  • Netty 架构

  • 线程模型(acceptor是一个监听线程)

    .jpg)

    • 单线程模型

      ​ 在 ServerBootstrap 调用方法 group 的时候,传递的参数是同一个线程组,且在构造线程 组的时候,构造参数为 1,这种开发方式,就是一个单线程模型。 个人机开发测试使用。不推荐。

      ​ 在处理acceptor和runnable task的线程组合并成一个,并且只有一个线程。

      1
      2
      3
      4
      5
      6
      // 初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量。
      acceptorGroup = new NioEventLoopGroup(1);
      // 初始化服务的配置
      bootstrap = new ServerBootstrap();
      // 绑定线程组
      bootstrap.group(acceptorGroup, acceptorGroup);
  • 多线程模型

    ​ 在 ServerBootstrap 调用方法 group 的时候,传递的参数是两个不同的线程组。负责监听 的 acceptor 线程组,线程数为 1,也就是构造参数为 1。负责处理客户端任务的线程组,线 程数大于 1,也就是构造参数大于 1。这种开发方式,就是多线程模型。

    ​ 长连接,且客户端数量较少,连接持续时间较长情况下使用。如:企业内部交流应用。

    1
    2
    3
    4
    5
    6
    7
    // 初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量。
    acceptorGroup = new NioEventLoopGroup(1);
    clientGroup = new NioEventLoopGroup(>1);
    // 初始化服务的配置
    bootstrap = new ServerBootstrap();
    // 绑定线程组
    bootstrap.group(acceptorGroup, clientGroup);
  • 主从多线程模型

    ​ 在 ServerBootstrap 调用方法 group 的时候,传递的参数是两个不同的线程组。负责监听 的 acceptor 线程组,线程数大于 1,也就是构造参数大于 1。负责处理客户端任务的线程组, 线程数大于 1,也就是构造参数大于 1。这种开发方式,就是主从多线程模型。

    ​ 长连接,客户端数量相对较多,连接持续时间比较长的情况下使用。如:对外提供服务

    的相册服务器。

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<dependencies>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
<!-- <version>4.1.24.Final</version> -->
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>5.0.0.Alpha2</version>
<!-- <version>4.1.24.Final</version> -->
</dependency>
<!-- 接收处理工具 -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-river</artifactId>
<version>1.4.11.Final</version>
</dependency>
<!-- 序列化处理工具 -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.4.11.Final</version>
</dependency>
<!-- 系统信息收集 -->
<dependency>
<groupId>org.hyperic.sigar</groupId>
<artifactId>com.springsource.org.hyperic.sigar</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.kaazing</groupId>
<artifactId>sigar.dist</artifactId>
<version>1.0.0.0</version>
<classifier>distribution</classifier>
<type>zip</type>
</dependency>
</dependencies>

工具类

GzipUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class GzipUtils {

public static void main(String[] args) throws Exception {
FileInputStream fis = new FileInputStream("D:\\3\\1.jpg");
byte[] temp = new byte[fis.available()];
int length = fis.read(temp);
System.out.println("长度 : " + length);

byte[] zipArray = GzipUtils.zip(temp);
System.out.println("压缩后的长度 : " + zipArray.length);

byte[] unzipArray = GzipUtils.unzip(zipArray);
System.out.println("解压缩后的长度 : " + unzipArray.length);

FileOutputStream fos = new FileOutputStream("D:\\3\\101.jpg");
fos.write(unzipArray);
fos.flush();

fos.close();
fis.close();
}

/**
* 解压缩
* @param source 源数据。需要解压的数据。
* @return 解压后的数据。 恢复的数据。
* @throws Exception
*/
public static byte[] unzip(byte[] source) throws Exception{
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayInputStream in = new ByteArrayInputStream(source);
// JDK提供的。 专门用于压缩使用的流对象。可以处理字节数组数据。
GZIPInputStream zipIn = new GZIPInputStream(in);
byte[] temp = new byte[256];
int length = 0;
while((length = zipIn.read(temp, 0, temp.length)) != -1){
out.write(temp, 0, length);
}
// 将字节数组输出流中的数据,转换为一个字节数组。
byte[] target = out.toByteArray();

zipIn.close();
out.close();

return target;
}

/**
* 压缩
* @param source 源数据,需要压缩的数据
* @return 压缩后的数据。
* @throws Exception
*/
public static byte[] zip(byte[] source) throws Exception{
ByteArrayOutputStream out = new ByteArrayOutputStream();
// 输出流,JDK提供的,提供解压缩功能。
GZIPOutputStream zipOut = new GZIPOutputStream(out);
// 将压缩信息写入到内存。 写入的过程会实现解压。
zipOut.write(source);
// 结束。
zipOut.finish();
byte[] target = out.toByteArray();

zipOut.close();

return target;
}
}

OSUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
public class OSUtils {


public static void main(String[] args) {
try {
// System信息,从jvm获取
property();
System.out.println("----------------------------------");
// cpu信息
cpu();
System.out.println("----------------------------------");
// 内存信息
memory();
System.out.println("----------------------------------");
// 操作系统信息
os();
System.out.println("----------------------------------");
// 用户信息
who();
System.out.println("----------------------------------");
// 文件系统信息
file();
System.out.println("----------------------------------");
// 网络信息
net();
System.out.println("----------------------------------");
// 以太网信息
ethernet();
System.out.println("----------------------------------");
} catch (Exception e1) {
e1.printStackTrace();
}
}

private static void property() throws UnknownHostException {
Runtime r = Runtime.getRuntime();
Properties props = System.getProperties();
InetAddress addr;
addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress();
Map<String, String> map = System.getenv();
String userName = map.get("USERNAME");// 获取用户名
String computerName = map.get("COMPUTERNAME");// 获取计算机名
String userDomain = map.get("USERDOMAIN");// 获取计算机域名
System.out.println("用户名: " + userName);
System.out.println("计算机名: " + computerName);
System.out.println("计算机域名: " + userDomain);
System.out.println("本地ip地址: " + ip);
System.out.println("本地主机名: " + addr.getHostName());
System.out.println("JVM可以使用的总内存: " + r.totalMemory());
System.out.println("JVM可以使用的剩余内存: " + r.freeMemory());
System.out.println("JVM可以使用的处理器个数: " + r.availableProcessors());
System.out.println("Java的运行环境版本: " + props.getProperty("java.version"));
System.out.println("Java的运行环境供应商: " + props.getProperty("java.vendor"));
System.out.println("Java供应商的URL: " + props.getProperty("java.vendor.url"));
System.out.println("Java的安装路径: " + props.getProperty("java.home"));
System.out.println("Java的虚拟机规范版本: " + props.getProperty("java.vm.specification.version"));
System.out.println("Java的虚拟机规范供应商: " + props.getProperty("java.vm.specification.vendor"));
System.out.println("Java的虚拟机规范名称: " + props.getProperty("java.vm.specification.name"));
System.out.println("Java的虚拟机实现版本: " + props.getProperty("java.vm.version"));
System.out.println("Java的虚拟机实现供应商: " + props.getProperty("java.vm.vendor"));
System.out.println("Java的虚拟机实现名称: " + props.getProperty("java.vm.name"));
System.out.println("Java运行时环境规范版本: " + props.getProperty("java.specification.version"));
System.out.println("Java运行时环境规范供应商: " + props.getProperty("java.specification.vender"));
System.out.println("Java运行时环境规范名称: " + props.getProperty("java.specification.name"));
System.out.println("Java的类格式版本号: " + props.getProperty("java.class.version"));
System.out.println("Java的类路径: " + props.getProperty("java.class.path"));
System.out.println("加载库时搜索的路径列表: " + props.getProperty("java.library.path"));
System.out.println("默认的临时文件路径: " + props.getProperty("java.io.tmpdir"));
System.out.println("一个或多个扩展目录的路径: " + props.getProperty("java.ext.dirs"));
System.out.println("操作系统的名称: " + props.getProperty("os.name"));
System.out.println("操作系统的构架: " + props.getProperty("os.arch"));
System.out.println("操作系统的版本: " + props.getProperty("os.version"));
System.out.println("文件分隔符: " + props.getProperty("file.separator"));
System.out.println("路径分隔符: " + props.getProperty("path.separator"));
System.out.println("行分隔符: " + props.getProperty("line.separator"));
System.out.println("用户的账户名称: " + props.getProperty("user.name"));
System.out.println("用户的主目录: " + props.getProperty("user.home"));
System.out.println("用户的当前工作目录: " + props.getProperty("user.dir"));
}

private static void memory() throws SigarException {
Sigar sigar = new Sigar();
Mem mem = sigar.getMem();
// 内存总量
System.out.println("内存总量: " + mem.getTotal() / 1024L + "K av");
// 当前内存使用量
System.out.println("当前内存使用量: " + mem.getUsed() / 1024L + "K used");
// 当前内存剩余量
System.out.println("当前内存剩余量: " + mem.getFree() / 1024L + "K free");
Swap swap = sigar.getSwap();
// 交换区总量
System.out.println("交换区总量: " + swap.getTotal() / 1024L + "K av");
// 当前交换区使用量
System.out.println("当前交换区使用量: " + swap.getUsed() / 1024L + "K used");
// 当前交换区剩余量
System.out.println("当前交换区剩余量: " + swap.getFree() / 1024L + "K free");
}

private static void cpu() throws SigarException {
Sigar sigar = new Sigar();
CpuInfo infos[] = sigar.getCpuInfoList();
CpuPerc cpuList[] = null;
cpuList = sigar.getCpuPercList();
for (int i = 0; i < infos.length; i++) {// 不管是单块CPU还是多CPU都适用
CpuInfo info = infos[i];
System.out.println("第" + (i + 1) + "块CPU信息");
System.out.println("CPU的总量MHz: " + info.getMhz());// CPU的总量MHz
System.out.println("CPU生产商: " + info.getVendor());// 获得CPU的卖主,如:Intel
System.out.println("CPU类别: " + info.getModel());// 获得CPU的类别,如:Celeron
System.out.println("CPU缓存数量: " + info.getCacheSize());// 缓冲存储器数量
printCpuPerc(cpuList[i]);
}
}

private static void printCpuPerc(CpuPerc cpu) {
System.out.println("CPU用户使用率: " + CpuPerc.format(cpu.getUser()));// 用户使用率
System.out.println("CPU系统使用率: " + CpuPerc.format(cpu.getSys()));// 系统使用率
System.out.println("CPU当前等待率: " + CpuPerc.format(cpu.getWait()));// 当前等待率
System.out.println("CPU当前错误率: " + CpuPerc.format(cpu.getNice()));//
System.out.println("CPU当前空闲率: " + CpuPerc.format(cpu.getIdle()));// 当前空闲率
System.out.println("CPU总的使用率: " + CpuPerc.format(cpu.getCombined()));// 总的使用率
}

private static void os() {
OperatingSystem OS = OperatingSystem.getInstance();
// 操作系统内核类型如: 386、486、586等x86
System.out.println("操作系统: " + OS.getArch());
System.out.println("操作系统CpuEndian(): " + OS.getCpuEndian());//
System.out.println("操作系统DataModel(): " + OS.getDataModel());//
// 系统描述
System.out.println("操作系统的描述: " + OS.getDescription());
// 操作系统类型
// System.out.println("OS.getName(): " + OS.getName());
// System.out.println("OS.getPatchLevel(): " + OS.getPatchLevel());//
// 操作系统的卖主
System.out.println("操作系统的卖主: " + OS.getVendor());
// 卖主名称
System.out.println("操作系统的卖主名: " + OS.getVendorCodeName());
// 操作系统名称
System.out.println("操作系统名称: " + OS.getVendorName());
// 操作系统卖主类型
System.out.println("操作系统卖主类型: " + OS.getVendorVersion());
// 操作系统的版本号
System.out.println("操作系统的版本号: " + OS.getVersion());
}

private static void who() throws SigarException {
Sigar sigar = new Sigar();
Who who[] = sigar.getWhoList();
if (who != null && who.length > 0) {
for (int i = 0; i < who.length; i++) {
// System.out.println("当前系统进程表中的用户名" + String.valueOf(i));
Who _who = who[i];
System.out.println("用户控制台: " + _who.getDevice());
System.out.println("用户host: " + _who.getHost());
// System.out.println("getTime(): " + _who.getTime());
// 当前系统进程表中的用户名
System.out.println("当前系统进程表中的用户名: " + _who.getUser());
}
}
}

private static void file() throws Exception {
Sigar sigar = new Sigar();
FileSystem fslist[] = sigar.getFileSystemList();
try {
for (int i = 0; i < fslist.length; i++) {
System.out.println("分区的盘符名称" + i);
FileSystem fs = fslist[i];
// 分区的盘符名称
System.out.println("盘符名称: " + fs.getDevName());
// 分区的盘符名称
System.out.println("盘符路径: " + fs.getDirName());
System.out.println("盘符标志: " + fs.getFlags());//
// 文件系统类型,比如 FAT32、NTFS
System.out.println("盘符类型: " + fs.getSysTypeName());
// 文件系统类型名,比如本地硬盘、光驱、网络文件系统等
System.out.println("盘符类型名: " + fs.getTypeName());
// 文件系统类型
System.out.println("盘符文件系统类型: " + fs.getType());
FileSystemUsage usage = null;
usage = sigar.getFileSystemUsage(fs.getDirName());
switch (fs.getType()) {
case 0: // TYPE_UNKNOWN :未知
break;
case 1: // TYPE_NONE
break;
case 2: // TYPE_LOCAL_DISK : 本地硬盘
// 文件系统总大小
System.out.println(fs.getDevName() + "总大小: " + usage.getTotal() + "KB");
// 文件系统剩余大小
System.out.println(fs.getDevName() + "剩余大小: " + usage.getFree() + "KB");
// 文件系统可用大小
System.out.println(fs.getDevName() + "可用大小: " + usage.getAvail() + "KB");
// 文件系统已经使用量
System.out.println(fs.getDevName() + "已经使用量: " + usage.getUsed() + "KB");
double usePercent = usage.getUsePercent() * 100D;
// 文件系统资源的利用率
System.out.println(fs.getDevName() + "资源的利用率: " + usePercent + "%");
break;
case 3:// TYPE_NETWORK :网络
break;
case 4:// TYPE_RAM_DISK :闪存
break;
case 5:// TYPE_CDROM :光驱
break;
case 6:// TYPE_SWAP :页面交换
break;
}
System.out.println(fs.getDevName() + "读出: " + usage.getDiskReads());
System.out.println(fs.getDevName() + "写入: " + usage.getDiskWrites());
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}

return;
}

private static void net() throws Exception {
Sigar sigar = new Sigar();
String ifNames[] = sigar.getNetInterfaceList();
for (int i = 0; i < ifNames.length; i++) {
String name = ifNames[i];
NetInterfaceConfig ifconfig = sigar.getNetInterfaceConfig(name);
System.out.println("网络设备名: " + name);// 网络设备名
System.out.println("IP地址: " + ifconfig.getAddress());// IP地址
System.out.println("子网掩码: " + ifconfig.getNetmask());// 子网掩码
if ((ifconfig.getFlags() & 1L) <= 0L) {
System.out.println("!IFF_UP...skipping getNetInterfaceStat");
continue;
}
NetInterfaceStat ifstat = sigar.getNetInterfaceStat(name);
System.out.println(name + "接收的总包裹数:" + ifstat.getRxPackets());// 接收的总包裹数
System.out.println(name + "发送的总包裹数:" + ifstat.getTxPackets());// 发送的总包裹数
System.out.println(name + "接收到的总字节数:" + ifstat.getRxBytes());// 接收到的总字节数
System.out.println(name + "发送的总字节数:" + ifstat.getTxBytes());// 发送的总字节数
System.out.println(name + "接收到的错误包数:" + ifstat.getRxErrors());// 接收到的错误包数
System.out.println(name + "发送数据包时的错误数:" + ifstat.getTxErrors());// 发送数据包时的错误数
System.out.println(name + "接收时丢弃的包数:" + ifstat.getRxDropped());// 接收时丢弃的包数
System.out.println(name + "发送时丢弃的包数:" + ifstat.getTxDropped());// 发送时丢弃的包数
}
}

private static void ethernet() throws SigarException {
Sigar sigar = null;
sigar = new Sigar();
String[] ifaces = sigar.getNetInterfaceList();
for (int i = 0; i < ifaces.length; i++) {
NetInterfaceConfig cfg = sigar.getNetInterfaceConfig(ifaces[i]);
if (NetFlags.LOOPBACK_ADDRESS.equals(cfg.getAddress()) || (cfg.getFlags() & NetFlags.IFF_LOOPBACK) != 0
|| NetFlags.NULL_HWADDR.equals(cfg.getHwaddr())) {
continue;
}
System.out.println(cfg.getName() + "IP地址:" + cfg.getAddress());// IP地址
System.out.println(cfg.getName() + "网关广播地址:" + cfg.getBroadcast());// 网关广播地址
System.out.println(cfg.getName() + "网卡MAC地址:" + cfg.getHwaddr());// 网卡MAC地址
System.out.println(cfg.getName() + "子网掩码:" + cfg.getNetmask());// 子网掩码
System.out.println(cfg.getName() + "网卡描述信息:" + cfg.getDescription());// 网卡描述信息
System.out.println(cfg.getName() + "网卡类型" + cfg.getType());//
}
}
}

SerializableFactory4Marshalling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SerializableFactory4Marshalling {

/**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
//jboss-marshalling-serial 包提供
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
// 序列化版本。只要使用JDK5以上版本,version只能定义为5。
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}

/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}

}

HeatbeatMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class HeatbeatMessage implements Serializable {

private static final long serialVersionUID = 2827219147304706826L;
private String ip;
private Map<String, Object> cpuMsgMap;
private Map<String, Object> memMsgMap;
private Map<String, Object> fileSysMsgMap;
@Override
public String toString() {
return "HeatbeatMessage [\nip=" + ip
+ ", \ncpuMsgMap=" + cpuMsgMap
+ ", \nmemMsgMap=" + memMsgMap
+ ", \nfileSysMsgMap=" + fileSysMsgMap + "]";
}

public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Map<String, Object> getCpuMsgMap() {
return cpuMsgMap;
}
public void setCpuMsgMap(Map<String, Object> cpuMsgMap) {
this.cpuMsgMap = cpuMsgMap;
}
public Map<String, Object> getMemMsgMap() {
return memMsgMap;
}
public void setMemMsgMap(Map<String, Object> memMsgMap) {
this.memMsgMap = memMsgMap;
}
public Map<String, Object> getFileSysMsgMap() {
return fileSysMsgMap;
}
public void setFileSysMsgMap(Map<String, Object> fileSysMsgMap) {
this.fileSysMsgMap = fileSysMsgMap;
}

}

RequestMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class RequestMessage implements Serializable {
private static final long serialVersionUID = 7084843947860990140L;
private Long id;
private String message;
private byte[] attachment;
@Override
public String toString() {
return "RequestMessage [id=" + id + ", message=" + message + "]";
}
public RequestMessage() {
super();
}
public RequestMessage(Long id, String message, byte[] attachment) {
super();
this.id = id;
this.message = message;
this.attachment = attachment;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public byte[] getAttachment() {
return attachment;
}
public void setAttachment(byte[] attachment) {
this.attachment = attachment;
}
}

ResponseMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ResponseMessage implements Serializable {
private static final long serialVersionUID = -8134313953478922076L;
private Long id;
private String message;
@Override
public String toString() {
return "ResponseMessage [id=" + id + ", message=" + message + "]";
}
public ResponseMessage() {
super();
}
public ResponseMessage(Long id, String message) {
super();
this.id = id;
this.message = message;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

server端开发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* 1. 双线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. 绑定服务监听端口并启动服务
*/
public class Server4HelloWorld {
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务端启动相关配置信息
private ServerBootstrap bootstrap = null;
public Server4HelloWorld(){
init();
}
private void init(){
// 初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量。
acceptorGroup = new NioEventLoopGroup(); //监听线程组
clientGroup = new NioEventLoopGroup(); //处理客户端线程组
// 初始化服务的配置
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO, 同步非阻塞
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小, 缓存区的单位是字节。
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
/**
* 监听处理逻辑。
* @param port 监听端口。
* @param acceptorHandlers 处理器, 如何处理客户端请求。
* @return
* @throws InterruptedException
*/
public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{

/*
* childHandler是服务的Bootstrap独有的方法。是用于提供处理对象的。
* 可以一次性增加若干个处理逻辑。是类似责任链模式的处理方式。
* 增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A-》B顺序依次处理。
*
* ChannelInitializer - 用于提供处理器的一个模型对象。
* 其中定义了一个方法,initChannel方法。
* 方法是用于初始化处理逻辑责任链条的。
* 可以保证服务端的Bootstrap只初始化一次处理器,尽量提供处理逻辑的重用。
* 避免反复的创建处理器对象。节约资源开销。
*/
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(acceptorHandlers);
}
});
// bind方法 - 绑定监听端口的。ServerBootstrap可以绑定多个监听端口。 多次调用bind方法即可
// sync - 开始监听逻辑。 返回一个ChannelFuture。 返回结果代表的是监听成功后的一个对应的未来结果
// 可以使用ChannelFuture实现后续的服务器和客户端的交互。
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}

/**
* shutdownGracefully - 方法是一个安全关闭的方法。可以保证不放弃任何一个已接收的客户端请求。
*/
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}

public static void main(String[] args){
ChannelFuture future = null;
Server4HelloWorld server = null;
try{
server = new Server4HelloWorld();
future = server.doAccept(9999,new Server4HelloWorldHandler());
System.out.println("server started.");

// 关闭连接的。
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//回收线程组资源
if(null != server){
server.release();
}
}
}

}

serverHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* @Sharable注解 -
* 代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
* 如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
* 如果handler是一个Sharable的,一定避免定义可写的实例变量。修改会发生混乱
* bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new XxxHandler());
}
});
*/
package com.sxt.netty.first;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;

@Sharable
public class Server4HelloWorldHandler extends ChannelHandlerAdapter {

/**
* 业务处理逻辑
* 用于处理读取数据请求的逻辑。
* ctx - 上下文对象。其中包含于客户端建立连接的所有资源。 如: 对应的Channel
* msg - 读取到的数据。 默认类型是ByteBuf,是Netty自定义的。是对ByteBuffer的封装。 不需要考虑复位问题。
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 获取读取的数据, 是一个缓冲。
ByteBuf readBuffer = (ByteBuf) msg;
// 创建一个字节数组,用于保存缓存中的数据。
byte[] tempDatas = new byte[readBuffer.readableBytes()];
// 将缓存中的数据读取到字节数组中。
readBuffer.readBytes(tempDatas);
String message = new String(tempDatas, "UTF-8");
System.out.println("from client : " + message);
if("exit".equals(message)){
ctx.close();
return;
}
String line = "server message to client!";
// 写操作自动释放缓存,避免内存溢出问题。
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
// 注意,如果调用的是write方法。不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行。
// ctx.write(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
// ctx.flush()
}

/**
* 异常处理逻辑, 当客户端异常退出的时候,也会运行。
* ChannelHandlerContext关闭,也代表当前与客户端连接的资源关闭。
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}

}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* 因为客户端是请求的发起者,不需要监听。
* 只需要定义唯一的一个线程组即可。
*/
public class Client4HelloWorld {

// 处理请求和处理服务端响应的线程组
private EventLoopGroup group = null;
// 客户端启动相关配置信息
private Bootstrap bootstrap = null;

public Client4HelloWorld(){
init();
}

private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 绑定线程组
bootstrap.group(group);
// 设定通讯模式为NIO
bootstrap.channel(NioSocketChannel.class);
}

public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
/*
* 客户端的Bootstrap没有childHandler方法。只有handler方法。
* 方法含义等同ServerBootstrap中的childHandler
* 在客户端必须绑定处理器,也就是必须调用handler方法。
* 服务器必须绑定处理器,必须调用childHandler方法。
*/
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(handlers);
}
});
// 建立连接。
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}

public void release(){
this.group.shutdownGracefully();
}

public static void main(String[] args) {
Client4HelloWorld client = null;
ChannelFuture future = null;
try{
client = new Client4HelloWorld();
future = client.doRequest("localhost", 9999, new Client4HelloWorldHandler());

Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server (enter 'exit' for close client) > ");
String line = s.nextLine();
if("exit".equals(line)){
// addListener - 增加监听,当某条件满足的时候,触发监听器。
// ChannelFutureListener.CLOSE - 关闭监听器,代表ChannelFuture执行返回后,关闭连接。
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")))
.addListener(ChannelFutureListener.CLOSE);
break;
}
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}

}

clientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Client4HelloWorldHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
ByteBuf readBuffer = (ByteBuf) msg;
byte[] tempDatas = new byte[readBuffer.readableBytes()];
readBuffer.readBytes(tempDatas);
System.out.println("from server : " + new String(tempDatas, "UTF-8"));
}finally{
// 用于释放缓存。避免内存溢出
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}

/*@Override // 断开连接时执行
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive method run...");
}

@Override // 连接通道建立成功时执行
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive method run...");
}

@Override // 每次读取完成时执行
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete method run...");
}*/

}

合体圆满(基础代码演示)

拆包粘包问题解决

netty 使用 tcp/ip 协议传输数据。而 tcp/ip 协议是类似水流一样的数据传输方式。多次 访问的时候有可能出现数据粘包的问题。(Netty是NIO的模型,是同步非阻塞的,一定执行read/write时候就继续向下执行了,让底层的代码给我们处理执行数据的准备,怎么去读写操作,如果我们客户端发起多个数据,read方法到底是读几条数据?不知道客户端发送过来到底是几套数据,每一条间到底有什么间隔)解决这种问题的方式如下:

  • 定长数据流 :客户端和服务器,提前协调好,每个消息长度固定。(如:长度 10)。如果客户端或服 务器写出的数据不足 10,则使用空白字符补足(如:使用空格)。

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* 1. 双线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. 绑定服务监听端口并启动服务
*/
public class Server4FixedLength {
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务启动相关配置信息
private ServerBootstrap bootstrap = null;
public Server4FixedLength(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port) throws InterruptedException{

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
// 定长Handler。通过构造参数设置消息长度(单位是字节)。发送的消息长度不足可以使用空格补全。
acceptorHandlers[0] = new FixedLengthFrameDecoder(3);
// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
acceptorHandlers[2] = new Server4FixedLengthHandler();
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}

public static void main(String[] args){
ChannelFuture future = null;
Server4FixedLength server = null;
try{
server = new Server4FixedLength();

future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

if(null != server){
server.release();
}
}
}

}

serverHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Server4FixedLengthHandler extends ChannelHandlerAdapter {

// 业务处理逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("from client : " + message.trim());
String line = "ok ";
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}


// 异常处理逻辑
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}

}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* 1. 单线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. connect连接服务,并发起请求
*/
public class Client4FixedLength {

// 处理请求和处理服务端响应的线程组
private EventLoopGroup group = null;
// 服务启动相关配置信息
private Bootstrap bootstrap = null;

public Client4FixedLength(){
init();
}

private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 绑定线程组
bootstrap.group(group);
// 设定通讯模式为NIO
bootstrap.channel(NioSocketChannel.class);
}

public ChannelFuture doRequest(String host, int port) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelHandler[] handlers = new ChannelHandler[3];
handlers[0] = new FixedLengthFrameDecoder(3);
// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
handlers[2] = new Client4FixedLengthHandler();

ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}

public void release(){
this.group.shutdownGracefully();
}

public static void main(String[] args) {
Client4FixedLength client = null;
ChannelFuture future = null;
try{
client = new Client4FixedLength();

future = client.doRequest("localhost", 9999);

Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server > ");
String line = s.nextLine();
byte[] bs = new byte[5];
byte[] temp = line.getBytes("UTF-8");
if(temp.length <= 5){
for(int i = 0; i < temp.length; i++){
bs[i] = temp[i];
}
}
future.channel().writeAndFlush(Unpooled.copiedBuffer(bs));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}

}

clientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Client4FixedLengthHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("from server : " + message);
}finally{
// 用于释放缓存。避免内存溢出
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}

注意:此时客户端只要不能满足的数据是3的长度,要可以进行多次发送。服务端才能收到数据,并且是连起来的。当客户端发的是大3的长度是,如发送abc123def4,服务器会把它当成三条数据,4不会出来,客户端在发送2个长度数据,服务端才可以接收到。

​ 中文问题,unicode中,一个汉字可能长度是2也可能是3。如果在上面代码中的客户端发送”中国”二个字就有意思了。服务端会接收到2次,一个字一次。

  • 特殊结束符 :客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘$_$’、

    AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public ChannelFuture doAccept(int port) throws InterruptedException{

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 数据分隔符, 定义的数据分隔符一定是一个ByteBuf类型的数据对象。
ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
// 处理固定结束标记符号的Handler。这个Handler没有@Sharable注解修饰,
// 必须每次初始化通道时创建一个新对象
// 使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度。netty建议数据有最大长度。
acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
acceptorHandlers[2] = new Server4DelimiterHandler();
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();

serverHandler

1
2
3
4
5
6
7
// 业务处理逻辑
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("from client : " + message);
//会放回三条字符串
String line = "server message $E$ test delimiter handler!! $E$ second message $E$"; ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}

client和server改动基本一样。此时如果客户端不输入分隔符,可以进行不断输入。

  • 协议 :相对最成熟的数据传递方式。有服务器的开发者提供一个固定格式的协议标准。客户端

    和服务器发送数据和接受数据的时候,都依据协议制定和解析消息。

1
协议格式:HEADcontent-length:xxxxHEADBODYxxxxxxBODY
1
2
3
4
5
6
7
8
9
10
11
12
13
public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}

serverHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Sharable
public class Server4ProtocolHandler extends ChannelHandlerAdapter {

// 业务处理逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("server receive protocol content : " + message);
message = ProtocolParser.parse(message);
if(null == message){
System.out.println("error request from client");
return ;
}
System.out.println("from client : " + message);
String line = "server message";
line = ProtocolParser.transferTo(line);
System.out.println("server send protocol content : " + line);
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}

// 异常处理逻辑
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}

static class ProtocolParser{
public static String parse(String message){
String[] temp = message.split("HEADBODY");
temp[0] = temp[0].substring(4);
temp[1] = temp[1].substring(0, (temp[1].length()-4));
int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
if(length != temp[1].length()){
return null;
}
return temp[1];
}
public static String transferTo(String message){
message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
return message;
}
}

}

协议限定拆分的格式。解决粘包。

  • 序列化对象JBoss Marshalling 序列化 ,Java 是面向对象的开发语言。传递的数据如果是 Java 对象,应该是最方便且可靠。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}

serverHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Sharable
public class Server4SerializableHandler extends ChannelHandlerAdapter {

// 业务处理逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from client : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
if(msg instanceof RequestMessage){
RequestMessage request = (RequestMessage)msg;
// 解压缩
// byte[] attachment = GzipUtils.unzip(request.getAttachment());
// System.out.println(new String(attachment));
}
ResponseMessage response = new ResponseMessage(0L, "test response");
ctx.writeAndFlush(response);
}

// 异常处理逻辑
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}

}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}

public void release(){
this.group.shutdownGracefully();
}

public static void main(String[] args) {
Client4Serializable client = null;
ChannelFuture future = null;
try{
client = new Client4Serializable();
future = client.doRequest("localhost", 9999, new Client4SerializableHandler());
String attachment = "test attachment";
byte[] attBuf = attachment.getBytes();
// attBuf = GzipUtils.zip(attBuf);
// RequestMessage msg = new RequestMessage(new Random().nextLong(),
// "test", new byte[0]);
// 压缩,有效减少网络中传递的数据
attBuf = GzipUtils.zip(attBuf);
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test", attBuf);
future.channel().writeAndFlush(msg);
TimeUnit.SECONDS.sleep(1);
future.addListener(ChannelFutureListener.CLOSE);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}

clientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Client4SerializableHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from server : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}

}
  • 定时断线重连 :客户端断线重连机制。 客户端数量多,且需要传递的数据量级较大。可以周期性的发送数据的时候,使用。要 求对数据的即时性不高的时候,才可使用。 优点: 可以使用数据缓存。不是每条数据进行一次数据交互。可以定时回收资源,对 资源利用率高。相对来说,即时性可以通过其他方式保证。如: 120 秒自动断线。数据变 化 1000 次请求服务器一次。300 秒中自动发送不足 1000 次的变化数据(Timer)。

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public ChannelFuture doAccept(int port) throws InterruptedException{

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
// 定义一个定时断线处理器,当多长时间内,没有任何的可读取数据,自动断开连接。
// 没有@Sharable的
// 构造参数,就是间隔时长。 默认的单位是秒。
// 自定义间隔时长单位。 new ReadTimeoutHandler(long times, TimeUnit unit);
ch.pipeline().addLast(new ReadTimeoutHandler(3));
ch.pipeline().addLast(new Server4TimerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public void setHandlers() throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
// 写操作自定断线。 在指定时间内,没有写操作,自动断线。
ch.pipeline().addLast(new WriteTimeoutHandler(3));
ch.pipeline().addLast(new Client4TimerHandler());
}
});
}

public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
// future是null,我们就要建立连接
if(future == null){
future = this.bootstrap.connect(host, port).sync();
}
// 如果非空,看些连接未来状态中的通道是否有效。
// 如果future非空,但是有个channel,证明已经和服务器断开了,但是future没有被回收
// 重连操作
if(!future.channel().isActive()){
future = this.bootstrap.connect(host, port).sync();
}
return future;
}

public void release(){
this.group.shutdownGracefully();
}

public static void main(String[] args) {
Client4Timer client = null;
ChannelFuture future = null;
try{
client = new Client4Timer();
client.setHandlers();

future = client.getChannelFuture("localhost", 9999);
// 循环3从。睡眠2秒,没有超出时长
for(int i = 0; i < 3; i++){
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test"+i, new byte[0]);
future.channel().writeAndFlush(msg);
TimeUnit.SECONDS.sleep(2);
}
// 有超出时长
TimeUnit.SECONDS.sleep(5);

future = client.getChannelFuture("localhost", 9999);
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test", new byte[0]);
future.channel().writeAndFlush(msg);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}

clientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Client4TimerHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from server : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}

/**
* 当连接建立成功后,出发的代码逻辑。
* 在一次连接中只运行唯一一次。
* 通常用于实现连接确认和资源初始化的。
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active");
}

server端运行图。有断线有异常,

.jpg)

.jpg)

  • 心跳监测 :使用定时发送消息的方式,实现硬件检测,达到心态检测的目的。 心跳监测是用于检测电脑硬件和软件信息的一种技术。如:CPU 使用率,磁盘使用率, 内存使用率,进程情况,线程情况等。
  • sigar :需要下载一个 zip 压缩包。内部包含若干 sigar 需要的操作系统文件。sigar 插件是通过 JVM 访问操作系统,读取计算机硬件的一个插件库。读取计算机硬件过程中,必须由操作系统提供硬件信息。硬件信息是通过操作系统提供的。zip 压缩包中是 sigar 编写的操作系统文 件,如:windows 中的动态链接库文件。 解压需要的操作系统文件,将操作系统文件赋值到${Java_home}/bin 目录中。

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class Server4Heatbeat {
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务启动相关配置信息
private ServerBootstrap bootstrap = null;
public Server4Heatbeat(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port) throws InterruptedException{

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
ch.pipeline().addLast(new Server4HeatbeatHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}

public static void main(String[] args){
ChannelFuture future = null;
Server4Heatbeat server = null;
try{
server = new Server4Heatbeat();
future = server.doAccept(9999);
System.out.println("server started.");

future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

if(null != server){
server.release();
}
}
}

}

serverHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Sharable
public class Server4HeatbeatHandler extends ChannelHandlerAdapter {

private static List<String> credentials = new ArrayList<>();
private static final String HEATBEAT_SUCCESS = "SERVER_RETURN_HEATBEAT_SUCCESS";
public Server4HeatbeatHandler(){
// 初始化客户端列表信息。一般通过配置文件读取或数据库读取。
credentials.add("192.168.199.222_WIN-QIUB2JF5TDP");
}

// 业务处理逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof String){
this.checkCredential(ctx, msg.toString());
} else if (msg instanceof HeatbeatMessage){
this.readHeatbeatMessage(ctx, msg);
} else {
// 服务器发送的信息是错误的,或者对外暴露的ip和端口号被与系统无关的人访问到了,自动断开连接
ctx.writeAndFlush("wrong message").addListener(ChannelFutureListener.CLOSE);
}
}

private void readHeatbeatMessage(ChannelHandlerContext ctx, Object msg){
HeatbeatMessage message = (HeatbeatMessage) msg;
System.out.println(message);
System.out.println("=======================================");
ctx.writeAndFlush("receive heatbeat message");
}

/**
* 身份检查。检查客户端身份是否有效。
* 客户端身份信息应该是通过数据库或数据文件定制的。
* 身份通过 - 返回确认消息。
* 身份无效 - 断开连接
* @param ctx
* @param credential
*/
private void checkCredential(ChannelHandlerContext ctx, String credential){
System.out.println(credential);
System.out.println(credentials);
if(credentials.contains(credential)){
ctx.writeAndFlush(HEATBEAT_SUCCESS);
}else{
ctx.writeAndFlush("no credential contains").addListener(ChannelFutureListener.CLOSE);
}
}

// 异常处理逻辑
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}

}

clientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class Client4HeatbeatHandler extends ChannelHandlerAdapter {

private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private ScheduledFuture heatbeat;
private InetAddress remoteAddr;
private static final String HEATBEAT_SUCCESS = "SERVER_RETURN_HEATBEAT_SUCCESS";

//只运行一次
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 获取本地INET信息
this.remoteAddr = InetAddress.getLocalHost();
// 获取本地计算机名
String computerName = System.getenv().get("COMPUTERNAME");
String credentials = this.remoteAddr.getHostAddress() + "_" + computerName;
System.out.println(credentials);
// 发送到服务器,作为信息比对证书
ctx.writeAndFlush(credentials);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
if(msg instanceof String){
if(HEATBEAT_SUCCESS.equals(msg)){
this.heatbeat = this.executorService.scheduleWithFixedDelay(new HeatbeatTask(ctx), 0L, 2L, TimeUnit.SECONDS);
System.out.println("client receive - " + msg);
}else{
System.out.println("client receive - " + msg);
}
}
}finally{
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
// 回收资源
if(this.heatbeat != null){
this.heatbeat.cancel(true);
this.heatbeat = null;
}
ctx.close();
}

class HeatbeatTask implements Runnable{
private ChannelHandlerContext ctx;
public HeatbeatTask(){

}
public HeatbeatTask(ChannelHandlerContext ctx){
this.ctx = ctx;
}
public void run(){
try {
HeatbeatMessage msg = new HeatbeatMessage();
msg.setIp(remoteAddr.getHostAddress());
Sigar sigar = new Sigar();
// CPU信息
CpuPerc cpuPerc = sigar.getCpuPerc();
Map<String, Object> cpuMsgMap = new HashMap<>();
cpuMsgMap.put("Combined", cpuPerc.getCombined());
cpuMsgMap.put("User", cpuPerc.getUser());
cpuMsgMap.put("Sys", cpuPerc.getSys());
cpuMsgMap.put("Wait", cpuPerc.getWait());
cpuMsgMap.put("Idle", cpuPerc.getIdle());

// 内存信息
Map<String, Object> memMsgMap = new HashMap<>();
Mem mem = sigar.getMem();
memMsgMap.put("Total", mem.getTotal());
memMsgMap.put("Used", mem.getUsed());
memMsgMap.put("Free", mem.getFree());

// 文件系统
Map<String, Object> fileSysMsgMap = new HashMap<>();
FileSystem[] list = sigar.getFileSystemList();
fileSysMsgMap.put("FileSysCount", list.length);
List<String> msgList = null;
for(FileSystem fs : list){
msgList = new ArrayList<>();
msgList.add(fs.getDevName() + "总大小: " + sigar.getFileSystemUsage(fs.getDirName()).getTotal() + "KB");
msgList.add(fs.getDevName() + "剩余大小: " + sigar.getFileSystemUsage(fs.getDirName()).getFree() + "KB");
fileSysMsgMap.put(fs.getDevName(), msgList);
}

msg.setCpuMsgMap(cpuMsgMap);
msg.setMemMsgMap(memMsgMap);
msg.setFileSysMsgMap(fileSysMsgMap);

ctx.writeAndFlush(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}

}
  • HTTP 协议处理 :使用 Netty 服务开发。实现 HTTP 协议处理逻辑。 没有客户端,做一个http协议文件传输到netty服务器

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/** 
* http协议文件传输
* @author Qixuan.Chen
* 创建时间:2015年5月4日
*/
public class HttpStaticFileServer {


private final int port;//端口

public HttpStaticFileServer(int port) {
this.port = port;
}

public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();//线程一 //这个是用于serversocketchannel的event
EventLoopGroup workerGroup = new NioEventLoopGroup();//线程二//这个是用于处理accept到的channel
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HttpStaticFileServerInitializer());

b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port = 8089;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8089;
}
new HttpStaticFileServer(port).run();//启动服务
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class HttpStaticFileServerInitializer extends ChannelInitializer<SocketChannel> {  
@Override
public void initChannel(SocketChannel ch) throws Exception {
// Create a default pipeline implementation.
// 通道的连接点
ChannelPipeline pipeline = ch.pipeline();

// Uncomment the following line if you want HTTPS
//SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
//engine.setUseClientMode(false);
//pipeline.addLast("ssl", new SslHandler(engine));
/**
* (1)ReadTimeoutHandler,用于控制读取数据的时候的超时,10表示如果10秒钟都没有数据读取了,那么就引发超时,然后关闭当前的channel

(2)WriteTimeoutHandler,用于控制数据输出的时候的超时,构造参数1表示如果持续1秒钟都没有数据写了,那么就超时。

(3)HttpRequestrianDecoder,这个handler用于从读取的数据中将http报文信息解析出来,无非就是什么requestline,header,body什么的。。。

(4)然后HttpObjectAggregator则是用于将上卖解析出来的http报文的数据组装成为封装好的httprequest对象。。

(5)HttpresponseEncoder,用于将用户返回的httpresponse编码成为http报文格式的数据

(6)HttpHandler,自定义的handler,用于处理接收到的http请求。
*/

pipeline.addLast("decoder", new HttpRequestDecoder());// http-request解码器,http服务器端对request解码
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//对传输文件大少进行限制
pipeline.addLast("encoder", new HttpResponseEncoder());//http-response解码器,http服务器端对response编码
// 向客户端发送数据的一个Handler
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());

pipeline.addLast("handler", new HttpStaticFileServerHandler(true)); // Specify false if SSL.(如果是ssl,就指定为false)
}
}

serverHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {  

public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
public static final int HTTP_CACHE_SECONDS = 60;

private final boolean useSendFile;

public HttpStaticFileServerHandler(boolean useSendFile) {
this.useSendFile = useSendFile;
}

/**
* 类似channelRead方法。
*/
@Override
public void messageReceived(
ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
// 请求头信息是否正确的
if (!request.decoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}

if (request.method() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}

final String uri = request.uri();
System.out.println("-----uri----"+uri);
final String path = sanitizeUri(uri);
System.out.println("-----path----"+path);
if (path == null) {
sendError(ctx, FORBIDDEN);
return;
}

File file = new File(path);
if (file.isHidden() || !file.exists()) {
sendError(ctx, NOT_FOUND);
return;
}

if (file.isDirectory()) {
if (uri.endsWith("/")) {
sendListing(ctx, file);
} else {
sendRedirect(ctx, uri + '/');
}
return;
}

if (!file.isFile()) {
sendError(ctx, FORBIDDEN);
return;
}

// Cache Validation
String ifModifiedSince = (String) request.headers().get(IF_MODIFIED_SINCE);
if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);

// Only compare up to the second because the datetime format we send to the client
// does not have milliseconds
long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;
long fileLastModifiedSeconds = file.lastModified() / 1000;
if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {
sendNotModified(ctx);
return;
}
}
// 文件下载
RandomAccessFile raf;
try {
raf = new RandomAccessFile(file, "r");
} catch (FileNotFoundException fnfe) {
sendError(ctx, NOT_FOUND);
return;
}
long fileLength = raf.length();

HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
//setContentLength(response, fileLength);
HttpHeaderUtil.setContentLength(response, fileLength);
setContentTypeHeader(response, file);
setDateAndCacheHeaders(response, file);
if (HttpHeaderUtil.isKeepAlive(request)) {

response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}

// Write the initial line and the header.
ctx.write(response);

// Write the content.
ChannelFuture sendFileFuture;
if (useSendFile) {
sendFileFuture =
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
} else {
sendFileFuture =
ctx.write(new ChunkedFile(raf, 0, fileLength, 8192), ctx.newProgressivePromise());
}

sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) { // total unknown
System.err.println("Transfer progress: " + progress);
} else {
System.err.println("Transfer progress: " + progress + " / " + total);
}
}

@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.err.println("Transfer complete.");
}
});

// Write the end marker
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);

// Decide whether to close the connection or not.
if (!HttpHeaderUtil.isKeepAlive(request)) {
// Close the connection when the whole content is written out.
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}

private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");

/**
* 路径解码
* @param uri
* @return
*/
private static String sanitizeUri(String uri) {
// Decode the path.
try {
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
try {
uri = URLDecoder.decode(uri, "ISO-8859-1");
} catch (UnsupportedEncodingException e1) {
throw new Error();
}
}

if (!uri.startsWith("/")) {
return null;
}

// Convert file separators.
uri = uri.replace('/', File.separatorChar);

// Simplistic dumb security check.
// You will have to do something serious in the production environment.
if (uri.contains(File.separator + '.') ||
uri.contains('.' + File.separator) ||
uri.startsWith(".") || uri.endsWith(".") ||
INSECURE_URI.matcher(uri).matches()) {
return null;
}

// Convert to absolute path.
return System.getProperty("user.dir") + File.separator + uri;
}

private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");

private static void sendListing(ChannelHandlerContext ctx, File dir) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");

StringBuilder buf = new StringBuilder();
String dirPath = dir.getPath();

buf.append("<!DOCTYPE html>\r\n");
buf.append("<html><head><title>");
buf.append("Listing of: ");
buf.append(dirPath);
buf.append("</title></head><body>\r\n");

buf.append("<h3>Listing of: ");
buf.append(dirPath);
buf.append("</h3>\r\n");

buf.append("<ul>");
buf.append("<li><a href=\"../\">..</a></li>\r\n");

for (File f: dir.listFiles()) {
if (f.isHidden() || !f.canRead()) {
continue;
}

String name = f.getName();
if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
continue;
}

buf.append("<li><a href=\"");
buf.append(name);
buf.append("\">");
buf.append(name);
buf.append("</a></li>\r\n");
}

buf.append("</ul></body></html>\r\n");
ByteBuf buffer = Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8);
response.content().writeBytes(buffer);
buffer.release();

// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
response.headers().set(LOCATION, newUri);

// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");

// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

/**
* When file timestamp is the same as what the browser is sending up, send a "304 Not Modified"
*
* @param ctx
* Context
*/
private static void sendNotModified(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
setDateHeader(response);

// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

/**
* Sets the Date header for the HTTP response
*
* @param response
* HTTP response
*/
private static void setDateHeader(FullHttpResponse response) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));

Calendar time = new GregorianCalendar();
response.headers().set(DATE, dateFormatter.format(time.getTime()));
}

/**
* Sets the Date and Cache headers for the HTTP Response
*
* @param response
* HTTP response
* @param fileToCache
* file to extract content type
*/
private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));

// Date header
Calendar time = new GregorianCalendar();
response.headers().set(DATE, dateFormatter.format(time.getTime()));

// Add cache headers
time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));
response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
response.headers().set(
LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
}

/**
* Sets the content type header for the HTTP Response
*
* @param response
* HTTP response
* @param file
* file to extract content type
*/
private static void setContentTypeHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
}

}

流数据的传输处理

​ 在基于流的传输里比如 TCP/IP,接收到的数据会先被存储到一个 socket 接收缓冲里。不 幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使你发送了 2 个独立 的数据包,操作系统也不会作为 2 个消息处理而仅仅是作为一连串的字节而言。因此这是不 能保证你远程写入的数据就会准确地读取。所以一个接收方不管他是客户端还是服务端,都 应该把接收到的数据整理成一个或者多个更有意思并且能够让程序的业务逻辑更好理解的 数据。

在处理流数据粘包拆包时,可以使用下述处理方式:

使用定长数据处理,如:每个完整请求数据长度为 8 字节等。(FixedLengthFrameDecoder

使用特殊分隔符的方式处理,如:每个完整请求数据末尾使用’\0’作为数据结束标记。(DelimiterBasedFrameDecoder

使用自定义协议方式处理,如:http 协议格式等。

使用 POJO 来替代传递的流数据,如:每个完整的请求数据都是一个 RequestMessage 对象,在 Java 语言中,使用 POJO 更符合语种特性,推荐使用。

转载至微信公众号《Java患者》。

0%