封装阻塞的任务队列
1. 队列选型
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。 假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据, 就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢? 理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候, 那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。
如果我们想要封装一个队列来处理任务,选用的队列类型是非常重要的,接下来就来分析队列的特征并选择合适的类型。
- 普通队列(Queue)
Queue
本身没有没有阻塞特征,只能进行普通的队列操作(add
、remove
)等。- 缺点:
- 需要自己管理队列是否已满或者是否为空
- 如果队列已满,调用
add()
可能会抛出IllegalStateException
异常,如果队列为空,调用remove()
会抛NoSuchElementException
异常。
- 无阻塞队列(ConcurrentLinkedQueue)
ConcurrentLinkedQueue 是一个无阻塞的线程安全队列,它是一个基于 CAS(Compare-And-Swap)算法来实现高效的无锁操作。
优点:
- 非阻塞队列适合高并发场景,可以在极短的时间内处理大量线程的队列操作,性能非常高。
缺点:
- 因为没有阻塞,无法有效控制队列的大小,如果消费者速度远低于生产者速度,队列可能会迅速挤压任务,导致资源浪费。
- 无法自动阻塞生产者线程,在队列满时生产者仍会继续向队列添加任务,可能导致内存问题。
- 阻塞队列(BlockingQueue)
在多线程任务队列中,选择 BlockingQueue
主要是因为它简化了线程之间的协作和管理,尤其是在生产者和消费者之间的调度
- 优点:
- 可以有效地控制任务的生产和消费速率,防止队列溢出或任务丢失,避免内存过度占用。
- 当队列满时,生产者会被阻塞;当队列为空时,消费者会被阻塞。这可以防止无效的CPU占用,保持系统的高效运行。
- 内置的线程安全保证队列在并发环境下操作时更加简单可靠,不需要开发者手动处理锁和同步问题。
- 通过阻塞机制,代码可以更简洁地实现任务的调度,不需要在生产者和消费者之间显式地协调。
这里的阻塞有两种常见的场景:
- 当队列中没有数据,消费者端的所有线程都会被自动阻塞,直到有数据放入队列。
- 当队列中填满数据,生产者端的所有线程都会被自动阻塞,直到队列中有空位置,线程被自动唤醒。
- 固定大小的阻塞队列(ArrayBlockingQueue)
ArrayBlockingQueue
是一个容量固定的阻塞队列,支持生产者在队列已满时被阻塞,消费者在队列为空时被阻塞。
- 优点:
- 它的容量是固定的,因此能够有限限制内存的使用。
- 具备阻塞机制,避免了资源浪费和任务丢失。
- 任务处理队列不允许超过固定容量,避免了内存溢出。 缺点:
- 容量固定,如果需要动态扩容,无法实现。
- 如果生产者产生任务的速度远远超过消费者的速度。队列满了以后生产者会被阻塞,可能导致系统的吞吐量降低。
- 容量可选的阻塞队列(LinkedBlockingQueue)
LinkedBlockingQueue
是一个支持容量控制的阻塞队列,可以选择是否设置队列的最大容量。如果没有指定容量,则默认容量为 Integer.MAX_VALUE
。
- 优点:
- 如果指定队列大小,它会像
ArrayBlockingQueue
一样进行容量控制,避免了内存溢出。 - 如果没有指定容量,它能够动态扩展队列的大小,适应不同的负载需求。
- 支持阻塞操作(
put
和take
) ,保证线程安全。
- 如果指定队列大小,它会像
- 缺点:
- 如果队列的容量不受限制,可能就会导致内存消耗过多,特别是在负载过大的情况下。
- 优先级阻塞队列(PriorityBlockingQueue)
PriorityBlockingQueue
和 ArrayBlockingQueue
一样是基于数组实现的,但是 ArrayBlockingQueue
在初始化时需要指定长度,PriorityBlockingQueue
默认长度是11。
PriorityBlockingQueue
是一个真正的无界队列,在队列满的时候会自动进行扩容,界限的最大值是2147483647
。PriorityBlockingQueue
是权重队列,可以理解为它能进行排序。但是排序不是从小到大或从大到小排列,是基于数组的堆结构(完全二叉树)。- 出队方式也是按照权重进行出队。
- 其存入的元素必须实现Comparator,或者在创建队列的时候自定义Comparator。
SynchronousQueue
DelayQueue
综上,选用 BlockingQueue
进行任务队列的封装以满足,具体原因已在上述内容中给出。
2. 封装设计
- 线程安全:任务队列需要支持多个线程并发操作,保证线程安全。
- 优先级控制:支持任务优先级,确保优先级高的任务优先执行。
- 流量控制和限制任务队列大小:当队列已满时,生产者线程应该被阻塞,直到有空间可以使用。
- 任务回调机制:任务完成后,可以通过回调机制通知任务的执行结果(成功,失败,超时等)。
- 异常和超时处理:对任务执行中的异常和超时进行处理,避免任务丢失。
- 任务重试机制:对于失败或超时的任务,支持重试功能。
任务模型(Task)
任务类包含任务的基本信息,包括任务ID、优先级、执行逻辑、回调接口等。
data class Task {
val id: String = UUID.randomUUID().toString(), // 任务id
val taskName: String, // 任务名称
val priority: Int, // 优先级
val taskAction: Runnable, // 任务的执行逻辑
val callback: TaskCallback?, // 任务的回调接口
val retries: Int // 重试次数
val maxRetries: Int, // 最大重试次数
val status: TaskStatus, // 任务状态
val executionHistory: MultableList<String> = mutableListOf() // 执行历史
}
interface TaskCallback {
fun onSuccess(taskId: String, result: Any)
fun onFailure(taskId: String, error: String)
fun onTimeOut(taskId: String)
}
enum class TaskStatus {
PENDING, // 等待中
RUNNING, // 执行中
COMPLETED, // 完成
FAILED, // 失败
TIMED_OUT // 超时
}
队列封装
class BlockingTaskQueue(
private val maxQueueSize: Int = 10, // 最大队列容量
private val maxConcurrency: Int = 3, // 最大并发数
private val taskTimeoutMs: Long = 5000L, // 任务超时
private val retryDelayMs: Long = 1000L // 重试延迟
){
private val taskQueue: BlockingQueue<Task> = PriorityBlockingQueue(maxQueueSize, compareBy<Task> { it.priority })
private val executor: ExecutorService = ThreadPoolExecutor(
maxConcurrency
maxConcurrency,
60L,
TimeUnit.SECONDS,
LinkedBlockingQueue<Runnable> ()
)
// 启动任务消费线程
init {
startTaskConsumer()
}
private fun startTaskConsumer(){
executor.submit {
while(true){
try {
val task = taskQueue.take() // 阻塞获取任务
processTask(task)
} catch(e: InterruptedExecution) {
Thread.currentThread().interrupt()
break
}
}
}
}
private fun processTask(task: Task) {
task.executionHistory.add("Task started at ${System.currentTimeMillis()}")
// 执行任务并处理异常和超时
val future = executor.submit(task.taskAction)
try {
// 等待任务执行结果,控制超时
future.get(taskTimeoutMs, TimeUnit.MILLISECONDS)
updateTaskStatus(task.id, TaskStatus.COMPLETED)
task.callback?.onSuccess(task.id, "Task completed successfully")
task.executionHistory.add("Task completed at ${System.currentTimeMillis()}")
} catch (e: TimeoutException) {
if (task.retries < task.maxRetries) {
task.retries++
taskQueue.offer(task) // 任务重试
task.callback?.onTimeout(task.id)
task.executionHistory.add("Task timed out and retried at ${System.currentTimeMillis()}")
} else {
updateTaskStatus(task.id, TaskStatus.TIMED_OUT)
task.callback?.onTimeout(task.id)
task.executionHistory.add("Task timed out and reached max retries")
}
} catch (e: Exception) {
updateTaskStatus(task.id, TaskStatus.FAILED)
task.callback?.onFailure(task.id, e.message ?: "Unknown error")
task.executionHistory.add("Task failed at ${System.currentTimeMillis()}")
}
}
private fun updateTaskStatus(taskId: String, status: TaskStatus) {
// 更新任务的状态
taskManager[taskId]?.status = status
taskManager[taskId]?.executionHistory?.add("Status changed to $status at ${System.currentTimeMillis()}")
}
// 提交任务到队列
fun submitTask(taskName: String, priority: Int, taskAction: Runnable, callback: TaskCallback): Boolean {
val task = Task(taskName = taskName, priority = priority, taskAction = taskAction, callback = callback)
return try {
val success = taskQueue.offer(task, 1, TimeUnit.SECONDS)
if (success) {
logger.info("Task added to queue: ${task.id}")
} else {
logger.warning("Task queue is full. Could not add task: ${task.id}")
}
success
} catch (e: InterruptedException) {
logger.severe("Task submission interrupted: ${e.message}")
Thread.currentThread().interrupt()
false
}
}
// 获取任务的状态
fun getTaskStatus(taskId: String): Task? {
return taskManager[taskId]
}
// 关闭队列
fun shutdown() {
executor.shutdown()
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow()
}
} catch (e: InterruptedException) {
executor.shutdownNow()
}
logger.info("Task queue shutdown complete.")
}
}
3. 使用
fun main() {
val taskQueue = BlockingTaskQueue(
maxQueueSize = 10, // 最大队列容量
maxConcurrency = 3, // 最大并发线程数
taskTimeoutMs = 5000L, // 每个任务的超时时间,单位为毫秒
retryDelayMs = 1000L // 任务超时后的重试延迟,单位为毫秒
)
// 创建回调接口的实现
val callback = object : TaskCallback {
override fun onSuccess(taskId: String, result: Any) {
println("Task $taskId succeeded: $result")
}
override fun onFailure(taskId: String, error: String) {
println("Task $taskId failed: $error")
}
override fun onTimeout(taskId: String) {
println("Task $taskId timed out")
}
}
// 提交任务并传递回调
repeat(5) { i ->
taskQueue.submitTask("Task $i", priority = i) {
println("Executing task $i")
if (i % 2 == 0) Thread.sleep(3000) // 模拟任务超时
else println("Task $i completed successfully")
}, callback)
}
// 等待任务完成
Thread.sleep(10000)
// 输出队列状态
taskQueue.getTaskStatus("Task 0")?.let { task ->
println("Task ID: ${task.id}")
println("Status: ${task.status}")
println("Execution History: ${task.executionHistory.joinToString()}")
}
// 关闭队列
taskQueue.shutdown()
}