0%

Java——并发1

基本的线程机制

  • 并发编程使我们可以将程序划分为多个分离的、独立运行的任务。通过多线程的机制,这些独立的任务(子任务)中的每一个都将由执行线程来驱动,一个线程就是在进程中的一个单一的顺序控制流,单个进程可以拥有多个并发执行的任务,其底层机制就是CPU的时分复用技术。

定义任务

  • 要想定义任务,你可以实现Runnable接口,该接口有一个run()方法,通过实现该方法可以使该任务执行你的命令,下面是一个小例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.muchlab.concurrence;

import java.util.concurrent.TimeUnit;

class LiftOff implements Runnable {
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;

public LiftOff() {
}
public String status(){
return "#" + id + "(" + (countDown > 0 ? countDown : "LiftOff") + ").";
}

@Override
public void run() {
while(countDown-- > 0){
System.out.println(status());
//Thread.yield()是对线程调度器的一种使用方式,可以将CPU从一个线程转移到另一个线程
Thread.yield();
}
}
}
class MainThread{
public static void main(String[] args) {
final LiftOff liftOff = new LiftOff();
liftOff.run();
}
}

Thread类

  • Runnable对象转变成工作任务的传统方式是把它提交给一个Thread构造器,然后使用Thread对象来驱动Runnable对象,下面是一个小例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.muchlab.concurrence;

public class BasicThreads {
public static void main(String[] args) {
//将Runnable对象转变为工作任务的传统方式就是把它交给一个Thread构造器,使用Thread来驱动LiftOff对象
//这时,创建了一个新的线程
final Thread thread = new Thread(new LiftOff());
/**
* 在Thread调用start前必须对线程进行初始化,然后start会触发Runnable对象中的run方法
* 这成为该新线程启动了该任务
*/
thread.start();
/**
* sout语句是在main线程中执行的,而Runnable对象的任务是在thread线程中进行的,
* 它们两个互不干扰,在thread线程的任务完成前,main线程中的其他操作也会被并发执行,
* 所以可以看到sout的输出并不是在任务输出的下面
*/
System.out.println("Waiting for LiftOff");
}
}
//使用Thread来驱动更多的任务
package com.muchlab.concurrence;

public class MoreBasicThreads {
public static void main(String[] args) throws InterruptedException {
//不同的任务执行在线程被换进换出时混在了一起,这种交换是由线程调度器自动控制的
for (int i = 0; i < 5; i++) {
new Thread(new LiftOff()).start();
System.out.println("Waiting for LiftOff");
}
}
}
  • 多个任务的执行输出说明不同任务的执行在线程被换进换出时混在了一起,这种交换是由线程调度器自动控制的,如果你的机器有多个处理器,线程调度器或在这些处理器之间默默地分发线程。所以程序的运行结果是不确定的。

使用Executor(执行器)

  • 使用Executor可以管理你的Thread对象,从而简化并发编程。它的原理是Executor在客户端和任务执行之间提供了一个间接层;与客户端直接执行任务不同,任务将由Executor这个中介对象来执行。它允许你管理异步任务的执行,而无须显式地管理线程的生命周期,所以,推荐使用Executor来执行任务。
  • 执行器可以创建线程池来管理线程执行任务,下面的例子使用了newCachedThreadPool()方法来创建一个线程池,该线程池会为每个任务提供一个线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.muchlab.concurrence;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPool {
public static void main(String[] args) {
/**
* Executor在客户端和任务执行之间提供了一个间接层,这样你就可以使用中介的Executor来执行任务
* 它允许你管理异步任务的执行,无须显式地管理线程的生命周期,所以,推荐使用Executor来启动任务
*/
final ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
//提交并执行任务
exec.execute(new LiftOff());
}
//防止新的任务提交给exec执行器
//main线程会在shutdown被调用之前继续运行之前提交的所有任务
exec.shutdown();
}
}

FixedThreadPool

  • 你可以很容易地将前面示例中的CachedThreadPool替换为不同类型的ExecutorFixedThreadPool使用了有限的线程集来执行所提交的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.muchlab.concurrence;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPool {
public static void main(String[] args) {
//创建有限的线程池,这里限制了线程池中线程的个数为5,即不用为每个任务都固定地付出创建线程地开销
final ExecutorService executorService = Executors.newFixedThreadPool(5);
// final ExecutorService executorService1 = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executorService.execute(new LiftOff());
}
executorService.shutdown();
}
}

SingleThreadPool

  • SingleThreadExecutor就是线程数为1的FixedThreadPool,每个任务都会在下一个任务开始之前运行结束,即所有任务将使用相同的线程。若提交了多个任务,那这些任务将会排队。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.muchlab.concurrence;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutor {
public static void main(String[] args) {
//使用newSingleThreadExecutor方法可以创建SingleThreadExecutor
final ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
executorService.execute(new LiftOff());
}
executorService.shutdown();
}
}
  • SingleThreadExecutor对于在另一个线程中连续运行的任何事务来说,都是很有用的,比如监听进入的套接字连接任务。对于希望在线程中运行的段任务也是很有用的,比如,更新本地或者远程日志的小人物,或事件分发线程。

Callable接口:从任务中产生返回值

  • 如果你希望任务在完成时能够返回一个值,则你可以实现Callable接口而不是Runnable接口,它需要实现一个call()方法,这个方法其实就是run()方法加上返回值,并且必须使用ExecutorService.submit()方法来调用它,下面是一个小例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.muchlab.concurrence;

import java.util.ArrayList;
import java.util.concurrent.*;
/**
* Runnable接口只是执行工作的独立任务,但它不返回任何值,
* Callable跟Runnable一样也是用来定义任务的,但不同的是它能够在任务完成时返回一个值
* 在Callable接口中需要实现的时call方法,并且必须使用ExecutorService.submit()方法来调用
*/
public class TaskWithResult implements Callable<String> {
private int id;

public TaskWithResult(int id) {
this.id = id;
}

@Override
public String call() throws Exception {
return "Result From TaskWithResult" + id;
}
}
class CallableDemo{
public static void main(String[] args) {
final ExecutorService executorService = Executors.newCachedThreadPool();
final ArrayList<Future<String>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
//submit方法返回一个Future类型的结果,该结果表示异步操作后的返回结果,可以使用isDone来查询Futurn是否已经完成
results.add(executorService.submit(new TaskWithResult(i)));
}
for (Future<String> result : results) {
try {

//使用get可以获取异步结果中的确切结果
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
}
}

休眠和优先级

休眠

  • 影响任务行为的一种简单方法是调用Thread的sleep()方法,JavaSE5引进了更加显式的sleep()版本,使用TimeUnit类,能够指定sleep()延迟的事件单元,可以提高代码的可读性,另外,TimeUnit还可以被用来执行转换。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.muchlab.concurrence;

import java.util.concurrent.TimeUnit;

public class SleepingTask extends LiftOff {
@Override
public void run() {
try {
while(countDown-->0)
System.out.println(status());
//指定延迟时间单元为微秒
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

优先级

  • CPU处理现有的线程集的顺序是不确定的,但调度器会倾向于让优先权高的线程先执行。然而,这并不意味着优先权低的线程得不到执行(若一直执行优先权高的,会导致饥饿现象)。优先权低的线程仅仅是执行的频率比较低。
  • 我们可以调用Thread对象中的getPriority()setPriority()两个方法来获取和设置线程的优先级
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.muchlab.concurrence;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimplePriorities implements Runnable {
private int countDown = 5;
//volatile表示不会使用该变量进行运算不会进行任何编译器优化
private volatile double d;
private int priority;

public SimplePriorities(int priority) {
this.priority = priority;
}

@Override
public String toString() {
//这里分别打印了线程名、优先级、线程组名:countDown的值
return Thread.currentThread() + ": " + countDown;

}

@Override
public void run() {
Thread.currentThread().setPriority(priority);
while (true){
//执行了开销相当大的浮点运算,目的是观察优先级的效果
//若没有这些运算,在main线程中优先级低的比较先被执行,将会看不到不同优先级抢占CPU的效果
for (int i = 0; i < 100000; i++) {
d += (Math.PI + Math.E)/(double)i;
if(i%1000 ==0)
//当调用切换时,CPU会有很大概率被切换到优先级高的线程
Thread.yield();
}
System.out.println(this);
if(--countDown==0)return;
}
}

public static void main(String[] args) {
final ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new SimplePriorities(Thread.MIN_PRIORITY));
}
exec.execute(new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
}

后台线程

  • 所谓后台线程,是指在程序运行的时候在后台提供一种通用服务的线程,并且这种线程不必属于程序。当所有的非后台线程结束时,程序也就终止了,同时会杀死进程中所有的后台线程,main线程就是一个非后台线程
  • 后台线程区别于普通线程,普通线程又可以称为用户线程,只完成用户自己想要完成的任务,不提供公共服务。而有时,我们希望编写一段程序,能够提供公共的服务,保证所有用户针对该线程的请求都能有响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.muchlab.concurrence;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

//通过编写线程工厂可以定制由Executor创建的线程属性:优先级、后、名称
class DaemonFactory implements ThreadFactory{

@Override
public Thread newThread(Runnable r) {
final Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
}

public class DaemonFromFactory implements Runnable {

@Override
public void run() {
try {
while (true){
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread()+" "+this);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
//向线程池传入工厂对象可以使执行器在执行任务时执行工厂类中的初始化信息。
final ExecutorService exec = Executors.newCachedThreadPool(new DaemonFactory());
for (int i = 0; i < 10; i++) {
exec.execute(new DaemonFromFactory());
}
System.out.println("All daemons started");
TimeUnit.MILLISECONDS.sleep(500);

}
}

继承Thread类来定义任务

  • 除了实现Runnable接口来定义任务外,也可以继承Thread类来定义任务,这两种方式可以说完全一致的,但一般情况下,推荐实现Runnable接口来定义任务,因为Runnable可以继承其他的类,而Thread则不行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.muchlab.concurrence;

public class SimpleThread extends Thread {
private int countDown = 5;
private static int threadCount = 0;

public SimpleThread() {
super("thread" + ++threadCount);
//启动任务
start();
}

@Override
public String toString() {
return "SimpleThread{" +getName()+
", countDown=" + countDown +
'}';
}

@Override
public void run() {
while(true){
System.out.println(this);
if(--countDown==0)
return;
}
}

public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
final SimpleThread simpleThread = new SimpleThread();
}
}
}
-------------本文结束感谢您的阅读-------------