Skip to content

封装阻塞的任务队列

1. 队列选型

多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。 假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据, 就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢? 理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候, 那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。

如果我们想要封装一个队列来处理任务,选用的队列类型是非常重要的,接下来就来分析队列的特征并选择合适的类型。

  1. 普通队列(Queue)
  • Queue 本身没有没有阻塞特征,只能进行普通的队列操作(addremove)等。
  • 缺点:
    • 需要自己管理队列是否已满或者是否为空
    • 如果队列已满,调用 add() 可能会抛出 IllegalStateException 异常,如果队列为空,调用 remove() 会抛 NoSuchElementException 异常。
  1. 无阻塞队列(ConcurrentLinkedQueue)
  • ConcurrentLinkedQueue 是一个无阻塞的线程安全队列,它是一个基于 CAS(Compare-And-Swap)算法来实现高效的无锁操作。

  • 优点:

    • 非阻塞队列适合高并发场景,可以在极短的时间内处理大量线程的队列操作,性能非常高。
  • 缺点:

    • 因为没有阻塞,无法有效控制队列的大小,如果消费者速度远低于生产者速度,队列可能会迅速挤压任务,导致资源浪费。
    • 无法自动阻塞生产者线程,在队列满时生产者仍会继续向队列添加任务,可能导致内存问题。
  1. 阻塞队列(BlockingQueue)

在多线程任务队列中,选择 BlockingQueue 主要是因为它简化了线程之间的协作和管理,尤其是在生产者和消费者之间的调度

  • 优点:
    • 可以有效地控制任务的生产和消费速率,防止队列溢出或任务丢失,避免内存过度占用。
    • 当队列满时,生产者会被阻塞;当队列为空时,消费者会被阻塞。这可以防止无效的CPU占用,保持系统的高效运行。
    • 内置的线程安全保证队列在并发环境下操作时更加简单可靠,不需要开发者手动处理锁和同步问题。
    • 通过阻塞机制,代码可以更简洁地实现任务的调度,不需要在生产者和消费者之间显式地协调。

这里的阻塞有两种常见的场景

  1. 当队列中没有数据,消费者端的所有线程都会被自动阻塞,直到有数据放入队列。
  2. 当队列中填满数据,生产者端的所有线程都会被自动阻塞,直到队列中有空位置,线程被自动唤醒。
  1. 固定大小的阻塞队列(ArrayBlockingQueue)

ArrayBlockingQueue 是一个容量固定的阻塞队列,支持生产者在队列已满时被阻塞,消费者在队列为空时被阻塞。

  • 优点:
  • 它的容量是固定的,因此能够有限限制内存的使用。
  • 具备阻塞机制,避免了资源浪费和任务丢失。
  • 任务处理队列不允许超过固定容量,避免了内存溢出。 缺点:
  • 容量固定,如果需要动态扩容,无法实现。
  • 如果生产者产生任务的速度远远超过消费者的速度。队列满了以后生产者会被阻塞,可能导致系统的吞吐量降低。
  1. 容量可选的阻塞队列(LinkedBlockingQueue)

LinkedBlockingQueue 是一个支持容量控制的阻塞队列,可以选择是否设置队列的最大容量。如果没有指定容量,则默认容量为 Integer.MAX_VALUE

  • 优点:
    • 如果指定队列大小,它会像 ArrayBlockingQueue 一样进行容量控制,避免了内存溢出。
    • 如果没有指定容量,它能够动态扩展队列的大小,适应不同的负载需求。
    • 支持阻塞操作(puttake) ,保证线程安全。
  • 缺点:
    • 如果队列的容量不受限制,可能就会导致内存消耗过多,特别是在负载过大的情况下。
  1. 优先级阻塞队列(PriorityBlockingQueue)

PriorityBlockingQueueArrayBlockingQueue 一样是基于数组实现的,但是 ArrayBlockingQueue 在初始化时需要指定长度,PriorityBlockingQueue 默认长度是11。

  • PriorityBlockingQueue 是一个真正的无界队列,在队列满的时候会自动进行扩容,界限的最大值是 2147483647
  • PriorityBlockingQueue 是权重队列,可以理解为它能进行排序。但是排序不是从小到大或从大到小排列,是基于数组的堆结构(完全二叉树)。
  • 出队方式也是按照权重进行出队。
  • 其存入的元素必须实现Comparator,或者在创建队列的时候自定义Comparator。
  1. SynchronousQueue

  2. DelayQueue

综上,选用 BlockingQueue 进行任务队列的封装以满足,具体原因已在上述内容中给出。

2. 封装设计

  1. 线程安全:任务队列需要支持多个线程并发操作,保证线程安全。
  2. 优先级控制:支持任务优先级,确保优先级高的任务优先执行。
  3. 流量控制和限制任务队列大小:当队列已满时,生产者线程应该被阻塞,直到有空间可以使用。
  4. 任务回调机制:任务完成后,可以通过回调机制通知任务的执行结果(成功,失败,超时等)。
  5. 异常和超时处理:对任务执行中的异常和超时进行处理,避免任务丢失。
  6. 任务重试机制:对于失败或超时的任务,支持重试功能。

任务模型(Task)

任务类包含任务的基本信息,包括任务ID、优先级、执行逻辑、回调接口等。

kotlin
data class Task {
  val id: String = UUID.randomUUID().toString(), // 任务id
  val taskName: String, // 任务名称
  val priority: Int, // 优先级
  val taskAction: Runnable, // 任务的执行逻辑
  val callback: TaskCallback?, // 任务的回调接口
  val retries: Int // 重试次数
  val maxRetries: Int, // 最大重试次数
  val status: TaskStatus, // 任务状态
  val executionHistory: MultableList<String> = mutableListOf() // 执行历史
}

interface TaskCallback {
  fun onSuccess(taskId: String, result: Any)
  fun onFailure(taskId: String, error: String)
  fun onTimeOut(taskId: String)
}

enum class TaskStatus {
    PENDING, // 等待中
    RUNNING, // 执行中
    COMPLETED, // 完成
    FAILED, // 失败
    TIMED_OUT // 超时
}

队列封装

kotlin
class BlockingTaskQueue(
    private val maxQueueSize: Int = 10, // 最大队列容量
    private val maxConcurrency: Int = 3, // 最大并发数
    private val taskTimeoutMs: Long = 5000L, // 任务超时
    private val retryDelayMs: Long = 1000L // 重试延迟
){
  private val taskQueue: BlockingQueue<Task> = PriorityBlockingQueue(maxQueueSize, compareBy<Task> { it.priority })
  private val executor: ExecutorService = ThreadPoolExecutor(
    maxConcurrency
    maxConcurrency,
    60L,
    TimeUnit.SECONDS,
    LinkedBlockingQueue<Runnable> ()
  )

  // 启动任务消费线程
  init {
    startTaskConsumer()
  }

  private fun startTaskConsumer(){
    executor.submit {
      while(true){
        try {
          val task = taskQueue.take() // 阻塞获取任务
          processTask(task)
        } catch(e: InterruptedExecution) {
          Thread.currentThread().interrupt()
          break
        }
      }
    }
  }

    private fun processTask(task: Task) {
        task.executionHistory.add("Task started at ${System.currentTimeMillis()}")

        // 执行任务并处理异常和超时
        val future = executor.submit(task.taskAction)

        try {
            // 等待任务执行结果,控制超时
            future.get(taskTimeoutMs, TimeUnit.MILLISECONDS)
            updateTaskStatus(task.id, TaskStatus.COMPLETED)
            task.callback?.onSuccess(task.id, "Task completed successfully")
            task.executionHistory.add("Task completed at ${System.currentTimeMillis()}")
        } catch (e: TimeoutException) {
            if (task.retries < task.maxRetries) {
                task.retries++
                taskQueue.offer(task)  // 任务重试
                task.callback?.onTimeout(task.id)
                task.executionHistory.add("Task timed out and retried at ${System.currentTimeMillis()}")
            } else {
                updateTaskStatus(task.id, TaskStatus.TIMED_OUT)
                task.callback?.onTimeout(task.id)
                task.executionHistory.add("Task timed out and reached max retries")
            }
        } catch (e: Exception) {
            updateTaskStatus(task.id, TaskStatus.FAILED)
            task.callback?.onFailure(task.id, e.message ?: "Unknown error")
            task.executionHistory.add("Task failed at ${System.currentTimeMillis()}")
        }
    }

    private fun updateTaskStatus(taskId: String, status: TaskStatus) {
        // 更新任务的状态
        taskManager[taskId]?.status = status
        taskManager[taskId]?.executionHistory?.add("Status changed to $status at ${System.currentTimeMillis()}")
    }

    // 提交任务到队列
    fun submitTask(taskName: String, priority: Int, taskAction: Runnable, callback: TaskCallback): Boolean {
        val task = Task(taskName = taskName, priority = priority, taskAction = taskAction, callback = callback)

        return try {
            val success = taskQueue.offer(task, 1, TimeUnit.SECONDS)
            if (success) {
                logger.info("Task added to queue: ${task.id}")
            } else {
                logger.warning("Task queue is full. Could not add task: ${task.id}")
            }
            success
        } catch (e: InterruptedException) {
            logger.severe("Task submission interrupted: ${e.message}")
            Thread.currentThread().interrupt()
            false
        }
    }

    // 获取任务的状态
    fun getTaskStatus(taskId: String): Task? {
        return taskManager[taskId]
    }

    // 关闭队列
    fun shutdown() {
        executor.shutdown()
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow()
            }
        } catch (e: InterruptedException) {
            executor.shutdownNow()
        }
        logger.info("Task queue shutdown complete.")
    }
}

3. 使用

kotlin
fun main() {
    val taskQueue = BlockingTaskQueue(    
      maxQueueSize = 10,        // 最大队列容量
      maxConcurrency = 3,       // 最大并发线程数
      taskTimeoutMs = 5000L,    // 每个任务的超时时间,单位为毫秒
      retryDelayMs = 1000L     // 任务超时后的重试延迟,单位为毫秒
    )

    // 创建回调接口的实现
    val callback = object : TaskCallback {
        override fun onSuccess(taskId: String, result: Any) {
            println("Task $taskId succeeded: $result")
        }

        override fun onFailure(taskId: String, error: String) {
            println("Task $taskId failed: $error")
        }

        override fun onTimeout(taskId: String) {
            println("Task $taskId timed out")
        }
    }

    // 提交任务并传递回调
    repeat(5) { i ->
        taskQueue.submitTask("Task $i", priority = i) {
            println("Executing task $i")
            if (i % 2 == 0) Thread.sleep(3000) // 模拟任务超时
            else println("Task $i completed successfully")
        }, callback)
    }

    // 等待任务完成
    Thread.sleep(10000)

    // 输出队列状态
    taskQueue.getTaskStatus("Task 0")?.let { task ->
        println("Task ID: ${task.id}")
        println("Status: ${task.status}")
        println("Execution History: ${task.executionHistory.joinToString()}")
    }

    // 关闭队列
    taskQueue.shutdown()
}

Released under the MIT License.