记录每一次进步

工作生活中的每一次坑,都是程序员的一块勋章

由于在开发过程当中需要获取线程执行的结果,故使用到了Java并发工具包java.util.concurrent下的Future类,最终通过Future的get来获取到线程执行的结果。这里我们来简单解析下,Future的实现原理及使用方式。

Java提供了两个自定义线程的底层接口,一个是Runnable接口一个是Callable接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}

public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

通过比较可以看出,Runnable接口的run函数返回时为Void,而Callable的call函数返回值为V,也就是通过泛型传入的类型参数,由此也就为获取线程执行结果提供了实现的支持。线程的执行我们一般通过交由线程池ThreadPoolExecutor来完成。在该类中,我们有多种提交任务的方式,如

1
public void execute(Runnable command)(...)

在ThreadPoolExecutor的父类AbstractExecutorService中,还提供了几个提交任务的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

这三个函数中,都通过将Runable或Callable包装成RunnableFuture,并交由线程池来执行,那么来看下RunnableFuture的定义

1
2
3
4
5
6
7
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

该接口提供了两个功能,一个是继承Runnable接口提供了任务的运行功能,一个是继承Future接口,用于获取执行结果。接下来我们看下该处使用的RunnableFuture的具体实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

对应Runnable和Callable入参,创建FutureTask。其中由于Runnable是没有返回值的,由第一个函数的javadoc可知,我们可以传递一个T类型的value值,当任务执行完毕时,将会返回该value。
由此,我们大概可以推测出能获取执行结果的任务是如何执行的。首先,通过ThreadPoolExecutor的submit函数提交一个实现了Callable接口或者Runnable接口的实现类给线程池,线程池会将该任务包装成一个RunnableFuture对象(实现了Future接口及Runnable接口),并将该对象提交线程池执行,并返回该对象。在最外层,就可以通过Future的get()来获取线程的执行结果了。如下:

1
2
3
PdfReaderCallable pdfReaderCallable = new PdfReaderCallable(url);
Future<String> task = pdfReadExecutor.submit(pdfReaderCallable);
String result = task.get();

其中PdfReaderCallable实现了Callable接口,pdfReadExecutor为线程池实现类。当执行该逻辑的线程执行了到第三条语句的时候,将被阻塞,一直等到task.get()返回结果才会继续向下执行。
接下来,让我们更深入地来看一下最终提交给线程池执行的FutureTask的实现原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class FutureTask<V> implements RunnableFuture<V> {
...

private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes

...

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

...

public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
...

public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
}

这里只列举了几个比较重要的成员变量及成员函数。通过构造函数可以看出,成员变量callable是实际的任务类。在FutureTask的run方法中最终调用了该实际任务类来执行任务并获取结果result,最后调用set函数将result赋值给outcome对象。而在外部,我们通过get()函数来获取结果,如果任务含没有执行完,那么将会通过awaitDone函数阻塞,一直等待任务执行完毕,才能继续执行并最终通过report函数返回执行结果。
至此,获取任务执行结果的Future实现就分析结束。

欢迎关注个人公众号:
个人公号

ELK

ELK是由Elasticsearch、Logstash、Kibana三部分组成:

  1. Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
  2. Logstash是一个完全开源的工具,他可以对你的日志进行收集、过滤,并将其存储供以后使用(如,搜索)。
  3. Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。
    在大多数情况下,我们都是使用ELK来进行日志的收集及分析工作,直接在Kibana中查看数据信息。那么,如果我们需要利用ELK的能力进行二次开发的话,就需要使用net.logstash.logback来完成我们的需求。有关ELK的部署读者可以参考其他的文章来完成,本文重点介绍在SpringBoot中操作ELK数据的方法及过程。

kibana效果图.PNG

SpringBoot中使用ELK

  1. 添加依赖
    要在SpringBoot中使用ELK,需要添加依赖:
    compile group: ‘com.github.vanroy’, name: ‘spring-boot-starter-data-jest’, version: ‘3.0.0.RELEASE’
    在该包中,有我们操作ELK所需要的接口。
  2. 配置ELK路径
    在SpringBoot的配置文件当中,配置ELK的路径:spring.data.jest.uri = http://ipAddress:9200
  3. 注入JestClient对象
    在需要使用到ELK的模块中,注入JestClient对象,建议实现一个公共的接口,提供ELK功能,这样只需要在该代码中进行一次注入即可:
    1
    2
    @Autowired
    private JestClient jestClient;
    接下来,就可以使用jestClient操作ELK的数据了。下面我们将来看看如何通过JestClient操作ELK。

JestClient操作ElasticSearch数据

ElasticSearch使用查询表达是来进行数据的检索。查询表达式(Query DSL)是一种非常灵活又富有表现力的查询语言。Elasticsearch 使用它可以以简单的JSON接口来展现Lucene功能的绝大部分。在应用中,用它来编写查询语句。它可以使查询语句更灵活、更精确、易读和易调试。要使用这种查询表达式,只需将查询语句传递给 query 参数:

1
2
3
4
GET /my_store/products
{
"query": YOUR_QUERY_HERE
}

同理,ElasticSearch中的其他操作,也是使用类似于该表达式的方式来操作的,比如,添加记录的的方式:

1
2
3
4
5
6
7
8
9
POST /my_store/products/_bulk
{ "index": { "_id": 1 }}
{ "price" : 10, "productID" : "XHDK-A-1293-#fJ3" }
{ "index": { "_id": 2 }}
{ "price" : 20, "productID" : "KDKE-B-9947-#kL5" }
{ "index": { "_id": 3 }}
{ "price" : 30, "productID" : "JODL-X-1937-#pV7" }
{ "index": { "_id": 4 }}
{ "price" : 30, "productID" : "QQPX-R-3956-#aD8" }

关于ElasticSearch的DSL语言部分,有机会的话会单独写一篇文章来总结下,这里只做一个简单的介绍。重要的是,如何通过JestClient来操作ElasticSearch。我们以一个简单的查询操作来看下:

  1. 构建过滤条件:
    1
    2
    3
    BoolQueryBuilder queryBuilder = boolQuery().must(termQuery("dataId.id", 2101987));
    queryBuilder.must(termQuery("dataId.dataSetId", QueryParser.escape(fdmt_main_oper_item)));
    queryBuilder.must(termQuery("operator", operator));
  2. 将条件对象转换位条件语句:
    1
    String queryString = new SearchSourceBuilder().query(queryBuilder).toString();
  3. 使用条件语句进行查询:
    1
    2
    Search search = new Search.Builder(queryString).addIndex(DATA_LIFECYCLE_INDEX).addType(DATA_LIFECYCLE_TYPE).build();
    SearchResult result = jestClient.execute(search);
    来看下第二步操作获取到的字符串:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    {
    "query" : {
    "bool" : {
    "must" : [
    {
    "term" : {
    "dataId.id" : {
    "value" : "2101987",
    "boost" : 1.0
    }
    }
    },
    {
    "term" : {
    "dataId.dataSetId" : {
    "value" : "fdmt_main_oper_item",
    "boost" : 1.0
    }
    }
    }
    ],
    "disable_coord" : false,
    "adjust_pure_negative" : true,
    "boost" : 1.0
    }
    }
    }
    从结构中可以看出,这正是ElasticSearch结构化查询表达式的后半部分,那么,只要把前半部分补充上,不久可以进行查询了吗。而:
    1
    new Search.Builder(queryString).addIndex(DATA_LIFECYCLE_INDEX).addType(DATA_LIFECYCLE_TYPE);
    所做的操作,也就是补全查询表达的查询路径而已。
    接下来我们看看JestClient的execute接口做的操作。在JestClient接口的实现JestHttpClient中,可以看到:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public <T extends JestResult> T execute(Action<T> clientRequest, RequestConfig requestConfig) throws IOException {
    HttpUriRequest request = prepareRequest(clientRequest, requestConfig);
    CloseableHttpResponse response = null;
    try {
    response = executeRequest(request);
    return deserializeResponse(response, request, clientRequest);
    } catch (HttpHostConnectException ex) {
    throw new CouldNotConnectException(ex.getHost().toURI(), ex);
    } finally {
    if (response != null) {
    try {
    response.close();
    } catch (IOException ex) {
    log.error("Exception occurred while closing response stream.", ex);
    }
    }
    }
    }
    execute接口所做的工作,就是发起Http请求,根据url去访问ElasticSearch。由此我们可以看出,使用JestClient来操作ElasticSearch其实很简单,重要的是将ElasticSearch的Restful风格的操作接口构建出来,而ElasticSearch中的所使用到的相关的关键字,都有对应的函数实现。在上面查询条件的构造当中,我们使用了boolQuery(),其在ElasticSearch的结构化查询语言中其实就是对应着"bool" : {}。boolQuery()方法返回的是一个BoolQueryBuilder的对象,该类继承自AbstractQueryBuilder抽象类,在该抽象类下,有多大几十个的子类:
    查询条件.PNG
    这些子类都对应着ElasticSearch中的某一个关键字,具体的大家可以打看IDEA查看下抽象类的具体子类。
    至此,在SringBoot中操作ElasticSearch的的步骤及方法大致清楚了,关于c查询结果SearchResult,大致也是存在着与查询:
    1
    2
    3
    4
    GET /my_store/products
    {
    "query": YOUR_QUERY_HERE
    }
    的结果的对应关系,具体的可以看文档中介绍了。

欢迎关注个人公众号:
个人公号

在讲解Spring中使用事务管理之前,首先让我们来看下什么是事务。

数据库事务

一个数据库事务通常包含了一个序列的对数据库的读/写操作。它的存在包含有以下两个目的:

  1. 为数据库操作序列提供了一个从失败中恢复到正常状态的方法,同时提供了数据库即使在异常状态下仍能保持一致性的方法。
  2. 当多个应用程序在并发访问数据库时,可以在这些应用程序之间提供一个隔离方法,以防止彼此的操作互相干扰。
    当事务被提交给了DBMS(数据库管理系统),则DBMS(数据库管理系统)需要确保该事务中的所有操作都成功完成且其结果被永久保存在数据库中,如果事务中有的操作没有成功完成,则事务中的所有操作都需要被回滚,回到事务执行前的状态;同时,该事务对数据库或者其他事务的执行无影响,所有的事务都好像在独立的运行。

事务的四个特性

数据库事务拥有以下四个特性,习惯上被称之为ACID特性:

  1. 原子性(Atomicity):事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。事务的操作如果成功就必须要完全应用到数据库,如果操作失败则不能对数据库有任何影响。
  2. 一致性(Consistency):事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。拿转账来说,假设用户A和用户B两者的钱加起来一共是5000,那么不管A和B之间如何转账,转几次账,事务结束后两个用户的钱相加起来应该还得是5000,这就是事务的一致性。
  3. 隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行。当多个用户并发访问数据库时,比如操作同一张表时,数据库为每一个用户开启的事务,不能被其他事务的操作所干扰,多个并发事务之间要相互隔离。即要达到这么一种效果:对于任意两个并发的事务T1和T2,在事务T1看来,T2要么在T1开始之前就已经结束,要么在T1结束之后才开始,这样每个事务都感觉不到有其他事务在并发地执行。
  4. 持久性(Durability):持久性是指一个事务一旦被提交了,那么对数据库中的数据的改变就是永久性的,即便是在数据库系统遇到故障的情况下也不会丢失提交事务的操作。

事务的隔离

当多个线程都开启事务操作数据库中的数据时,数据库系统要能进行隔离操作,以保证各个线程获取数据的准确性,在介绍数据库提供的各种隔离级别之前,我们先看看如果不考虑事务的隔离性,会发生的几种问题:

脏读

脏读就是指当一个事务正在访问数据,并且对数据进行了修改,而这种修改还没有提交到数据库中,这时,另外一个事务也访问这个数据,然后使用了这个数据。

不可重复读

是指在一个事务内,多次读同一数据。在这个事务还没有结束时,另外一个事务也访问该同一数据。那么,在第一个事务中的两 次读数据之间,由于第二个事务的修改,那么第一个事务两次读到的的数据可能是不一样的。

虚读(幻读)

是指当事务不是独立执行时发生的一种现象,例如第一个事务对一个表中的数据进行了修改,这种修改涉及到表中的全部数据行。 同时,第二个事务也修改这个表中的数据,这种修改是向表中插入一行新数据。那么,以后就会发生操作第一个事务的用户发现表中还有没有修改的数据行,就好象 发生了幻觉一样。

事务隔离级别

数据库为我们提供的四种隔离级别:

  1. Serializable (串行化):可避免脏读、不可重复读、幻读的发生
  2. Repeatable read (可重复读):可避免脏读、不可重复读的发生
  3. Read committed (读已提交):可避免脏读的发生
  4. Read uncommitted (读未提交):最低级别,任何情况都无法保证

Spring中事务级别

在SpringBoot中,要使用事务,方法很简单,这里我们以Jpa为例

1
2
3
4
@Transactional(rollbackFor = Exception.class)
public interface FdmtMoItemRepository extends JpaRepository<FdmtMoItem, Integer> {
void deleteByItemIdIn(List<Long> itemIds);
}

在定义的interface中,需要为类或者函数上加上@Transactional注解。打开@Transactional直接的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Transactional {
@AliasFor("transactionManager")
String value() default "";

@AliasFor("value")
String transactionManager() default "";

Propagation propagation() default Propagation.REQUIRED;

Isolation isolation() default Isolation.DEFAULT;

int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;

boolean readOnly() default false;

Class<? extends Throwable>[] rollbackFor() default {};

String[] rollbackForClassName() default {};

Class<? extends Throwable>[] noRollbackFor() default {};

String[] noRollbackForClassName() default {};
}

在注解中,上述函数的作用分别是:

  1. value transactionManager 用于设置事务管理器的名字
  2. propagation 设置事务的传播机制。也就是,但需要进行事务操作的时候:
    • PROPAGATION_REQUIRED 如果当前已经存在事务,那么加入该事务,如果不存在事务,创建一个事务,这是默认的传播属性值。
    • PROPAGATION_SUPPORTS 如果当前已经存在事务,那么加入该事务,否则创建一个所谓的空事务(可以认为无事务执行)。
    • PROPAGATION_MANDATORY 当前必须存在一个事务,否则抛出异常。
    • PROPAGATN_REQUIRES_NEW 如果当前存在事务,先把当前事务相关内容封装到一个实体,然后重新创建一个新事务,接受这个实体为参数,用于事务的恢复。更直白的说法就是暂停当前事务(当前无事务则不需要),创建一个新事务。 针对这种情况,两个事务没有依赖关系,可以实现新事务回滚了,但外部事务继续执行。
    • NOT_SUPPORTED 如果当前存在事务,挂起当前事务,然后新的方法在没有事务的环境中执行,没有spring事务的环境下,sql的提交完全依赖于 defaultAutoCommit属性值 。
    • PROPAGATION_NEVER 如果当前存在事务,则抛出异常,否则在无事务环境上执行代码。
    • PROPAGATION_NESTED 如果当前存在事务,则使用 SavePoint 技术把当前事务状态进行保存,然后底层共用一个连接,当NESTED内部出错的时候,自行回滚到 SavePoint这个状态,只要外部捕获到了异常,就可以继续进行外部的事务提交,而不会受到内嵌业务的干扰,但是,如果外部事务抛出了异常,整个大事务都会回滚。
  3. isolation 设置事务的隔离级别,也就是对应了上面提到的数据库的几个事务隔离级别
    • DEFAULT 使用数据库设置的隔离级别 ( 默认 ) ,由 DBA 默认的设置来决定隔离级别。
    • SERIALIZABLE 保证所有的情况不会发生(锁表)
    • REPEATABLE_READ 会出幻读(锁定所读取的所有行)。
    • READ_COMMITTED 会出现不可重复读、幻读问题(锁定正在读取的行)。
    • READ_UNCOMMITTED 会出现脏读、不可重复读、幻读 ( 隔离级别最低,并发性能高 )。
  4. timeout 设置事务的超时时间
  5. readOnly 设置是否为只读事务
  6. rollbackFor 设置事务在法身什么异常的情况下进行回滚
  7. rollbackForClassName 设置当发生对应名字的异常时进行回滚
  8. noRollbackFor 设置发生什么异常的情况下不回滚
  9. noRollbackForClassName 设置当发生对应名字的异常时进行不回滚

欢迎关注个人公众号:
个人公号

在使用JpaRepository接口进行数据的删除操作过程中发现,当首先使用自定义的deleteBy方法删除某一数据实体,然后再对该数据实体(删除ID的值)进行save时(deleteBy和save的调用是在同一个事务当中且为连续的调用),会发生数据库错误,根据log,知道是违反了唯一键约束。说明deleteBy并没有成功。但是如果在两个方法调用之间插入一个查询,则持久化能够成功。下面分析下原因。
自定义的deleteBy方法已经使用@Transactional以及@Modifying进行注解并且调用该函数的地方也使用了@Transactional进行注解,按理来说应该已经保证了deleteBy方法能够执行,那么为什么会失败呢?其实,由于Jpa底层默认使用的是Hiberanate框架来实现的,所以产生该问题的根源是在Hiberanate。首先,让我们先来看一张图:
数据持久化
其实,在内存中,有一份Hiberanate进行管理的缓存。我们调用Jpa进行deleteBy时,其实是作用在这部分缓存上的,并没有真正地写到数据库当中。Hibernate会尽量将与数据库的操作延迟,直到必须要与数据库进行交互,例如save方法一般会在提交时才真正执行,最终在提交时会以批处理的方式与数据库进行交互,以提高效率。而将操作延迟,就是利用缓存,将最后要处理的操作放到缓存中。
因此,如果要将缓存中的数据持久化到数据库中,则需要调用JpaRepository的flush()方法(内部调用了EntityManager的flush()方法)来完成,flush方法的主要作用就是清理缓存,强制数据库与Hibernate缓存同步,以保证数据的一致性。但即使调用了该函数,数据也还是没有持久化到数据库中。这是因为,前面也提到过,deleteBy和save的调用是在同一个事务当中,而根据Hibernate的机制,flush()方法进行清理缓存的操作,它的主要动作就是向数据库发送一系列的sql语句,并执行这些sql语句,但是不会向数据库提交。而只有到将事务进行提交时,数据库才会去执行这些SQL,将数据持久化到数据库中。因此,即使在事务过程当中调用了flush()方法,也无法保证数据被持久化。而之所以加上查询后能够进行持久化,就是因为,执行查询时,会首先将事务进行提交才执行查询,因此deleteBy才成功了。

欢迎关注个人公众号:
个人公号

概况

线程池,从字面含义来看,是指管理一组同构工作线程的资源池。线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。在上一节当中,我们介绍了同步容器及并发容器相关的知识,在最后,我们讲到了阻塞队列这类并发容器。线程池的实现中,工作队列就用到了阻塞队列。
类库提供了一个灵活的线程池以及一些有用的默认配置,可以通过调用Executors中的静态工行方法之一来创建一个线程池:线程池创建
从函数名字可以看出,Executors能够创建一下几种类型的线程池:

  • newFixedThreadPool
    创建一个固定长度的线程池,每当提交一个任务时就创建一个线程成,知道达到线程池的最大数量,这是线程池的规模将不再变化
  • newSingleThreadExecutor
    创建一个单线程的线程池。
  • newCachedThreadPool
    创建一个可缓存的线程池,如果线程池当前规模超出了处理器需求,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
  • ScheduledExecutorService
    创建一个固定长度的线程池,而且以延迟或者定时的方式来执行任务。

在这几个静态构造函数中,其实实质上都调用了ThreadPoolExecutor类的构造函数来创建一个线程池。我们来看下,包java.util.concurrent下ThreadPoolExecutor类的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

在该构造函数当中,corePoolSize表示线程池中线程的数量,maximumPoolSize表示线程池中最大线程数据,keepAliveTime表示的是,当线程池中的线程数量大于corePoolSize时,闲置线程终结前等待任务分配的最大等待时间(如果超过这个时间线程还没有被分配任务,该线程将终结)。workQueue,handler,threadFactory是线程池的核心内容,我们将具体来讲解一下这三个属性的作用。

工作队列(BlockingQueue

首先,来看一下该属性的定义:

/**
 * The queue used for holding tasks and handing off to worker
 * threads.  We do not require that workQueue.poll() returning
 * null necessarily means that workQueue.isEmpty(), so rely
 * solely on isEmpty to see if the queue is empty (which we must
 * do for example when deciding whether to transition from
 * SHUTDOWN to TIDYING).  This accommodates special-purpose
 * queues such as DelayQueues for which poll() is allowed to
 * return null even if it may later return non-null when delays
 * expire.
 */
private final BlockingQueue<Runnable> workQueue;

BlockingQueue就是在上一篇文中当中介绍到的阻塞队列。具体的实现类,则需要根据具体的业务场景以及其他的配置参数做出选择。从同步容器与并发容器类简介中我们可以知道,阻塞队列可粗分为三类,无界队列,有界队列,以及同步移交(Synchronous Handoff)。对于无界队列,不存在队列的饱和情况,而对于游街队列及同步移交来说,当任务的数量大于固定队列的大小时,如果处理这部分对于的任务,则需要根据饱和策略来决定。

饱和策略(RejectedExecutionHandler)

当有界队列被填满后,饱和策略开始发挥作用。JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含了不同的饱和策略:

  • AbortPolicy
    “中止”策略是默认的饱和策略。该策略将抛出未检查的RejectedExecutionException异常。
  • CallerRunsPolicy
    “调用者运行”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了excute的线程中执行该任务。
  • DiscardPolicy
    当新提交的任务无法保存到队列中等待执行时,“抛弃(Discard)”策略会悄悄抛弃该任务。
  • DiscardOldestPolicy
    “抛弃最旧的”策略会抛弃下一个将被执行的任务。

线程工厂(ThreadFactory)

每当线程池需要创建一个线程时,都是通过线程工厂方法来完成了。默认的线程工厂将创建一个新的,非守护的线程,并不包含特殊的配置信息。因此,当我没需要创建特殊的线程(不如在创建过程当中打印日志,给线程修改名字)时,就可以提供我们自己的线程工厂实例就可以了。自定义一个线程工厂很简单,只需要实现ThreadFactory接口即可:

1
2
3
4
5
6
7
8
9
10
11
12
public interface ThreadFactory {

/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}

该接口只有一个函数,我们可以通过该函数,定制化我们自己的线程。
线程池的介绍就到这里,在程序代码中,强烈建议,不要显示地定义线程去执行任务,而是通过线程池来执行任务,防止不可预料的错误发生。

欢迎关注个人公众号:
个人公号

0%