Java实现FIFO任务调度队列策略

2022-07-19,,,,

目录
  • 前言
  • fifo任务调度器架构
  • 示例代码

前言

在工作中,很多高并发的场景中,我们会用到队列来实现大量的任务请求。当任务需要某些特殊资源的时候,我们还需要合理的分配资源,让队列中的任务高效且有序完成任务。熟悉分布式的话,应该了解yarn的任务调度算法。本文主要用java实现一个fifo(先进先出调度器),这也是常见的一种调度方式。

fifo任务调度器架构

主要实现的逻辑可以归纳为:

1、任务队列主要是单队列,所有任务按照顺序进入队列后,也会按照顺序执行。

2、如果任务无法获得资源,则将任务塞回队列原位置。

示例代码

maven依赖如下:

      	<dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
            <optional>true</optional>
        </dependency>
                <dependency>
            <groupid>cn.hutool</groupid>
            <artifactid>hutool-all</artifactid>
            <version>5.5.2</version>
        </dependency>

具体的原理就不细说了,通过代码我们看看fifo任务调度策略是什么玩的吧。下面的代码也可以作为参考。我们会使用到一个双向阻塞队列linkedblockingdeque。后面的代码说明会提到。

package ai.guiji.csdn.dispatch;

import cn.hutool.core.thread.threadutil;
import lombok.builder;
import lombok.data;
import lombok.extern.slf4j.slf4j;
import org.springframework.scheduling.concurrent.customizablethreadfactory;

import java.util.random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.atomicinteger;
import java.util.stream.intstream;

/**
 * @program: csdn @classname: fifodemo @author: 剑客阿良_aliang @date: 2021-12-24 21:21 @description:
 * fifo队列 @version: v1.0
 */
@slf4j
public class fifodemo {
  private static final linkedblockingdeque<task> task_queue = new linkedblockingdeque<>();
  private static final concurrenthashmap<integer, linkedblockingqueue<resource>> resource_map =
      new concurrenthashmap<>();
  private static final executorservice task_pool =
      new threadpoolexecutor(
          8,
          16,
          0l,
          timeunit.milliseconds,
          new linkedblockingqueue<>(),
          new customizablethreadfactory("task-thread-"),
          new threadpoolexecutor.abortpolicy());
  private static final scheduledexecutorservice engine_pool =
      executors.newsinglethreadscheduledexecutor(new customizablethreadfactory("engine-"));
  private static final atomicinteger code_builder = new atomicinteger(0);

  @data
  @builder
  private static class resource {
    private integer rid;
    private type type;
  }

  @data
  @builder
  private static class task implements runnable {
    private integer tid;
    private runnable work;
    private type type;
    private resource resource;

    @override
    public void run() {
      log.info("[{}]任务,使用资源编号:[{}]", tid, resource.getrid());
      try {
        work.run();
      } catch (exception exception) {
        exception.printstacktrace();
      } finally {
        log.info("[{}]任务结束,回归资源", tid);
        returnresource(resource);
      }
    }
  }

  private enum type {
    /** 资源类型 */
    a("a资源", 1),
    b("b资源", 2),
    c("c资源", 3);

    private final string desc;
    private final integer code;

    type(string desc, integer code) {
      this.desc = desc;
      this.code = code;
    }

    public string getdesc() {
      return desc;
    }

    public integer getcode() {
      return code;
    }
  }

  public static void initresource() {
    random random = new random();
    int acount = random.nextint(10) + 1;
    int bcount = random.nextint(10) + 1;
    int ccount = random.nextint(10) + 1;
    resource_map.put(type.a.getcode(), new linkedblockingqueue<>());
    resource_map.put(type.b.getcode(), new linkedblockingqueue<>());
    resource_map.put(type.c.getcode(), new linkedblockingqueue<>());
    intstream.rangeclosed(1, acount)
        .foreach(
            a ->
                resource_map
                    .get(type.a.getcode())
                    .add(resource.builder().rid(a).type(type.a).build()));
    intstream.rangeclosed(1, bcount)
        .foreach(
            a ->
                resource_map
                    .get(type.b.getcode())
                    .add(resource.builder().rid(a).type(type.b).build()));
    intstream.rangeclosed(1, ccount)
        .foreach(
            a ->
                resource_map
                    .get(type.c.getcode())
                    .add(resource.builder().rid(a).type(type.c).build()));
    log.info("初始化资源a数量:{},资源b数量:{},资源c数量:{}", acount, bcount, ccount);
  }

  public static resource extractresource(type type) {
    return resource_map.get(type.getcode()).poll();
  }

  public static void returnresource(resource resource) {
    log.info("开始归还资源,rid:{},资源类型:{}", resource.getrid(), resource.gettype().getdesc());
    resource_map.get(resource.gettype().code).add(resource);
    log.info("归还资源完成,rid:{},资源类型:{}", resource.getrid(), resource.gettype().getdesc());
  }

  public static void engindo() {
    engine_pool.scheduleatfixedrate(
        () -> {
          task task = task_queue.poll();
          if (task == null) {
            log.info("任务队列为空,无需要执行的任务");
          } else {
            resource resource = extractresource(task.gettype());
            if (resource == null) {
              log.info("[{}]任务无法获取[{}],返回队列", task.gettid(), task.gettype().getdesc());
              task_queue.addfirst(task);
            } else {
              task.setresource(resource);
              task_pool.submit(task);
            }
          }
        },
        0,
        1,
        timeunit.seconds);
  }

  public static void addtask(runnable runnable, type type) {
    integer tid = code_builder.incrementandget();
    task task = task.builder().tid(tid).type(type).work(runnable).build();
    log.info("提交任务[{}]到任务队列", tid);
    task_queue.add(task);
  }

  public static void main(string[] args) {
    initresource();
    engindo();
    random random = new random();
    threadutil.sleep(5000);
    intstream.range(0, 10)
        .foreach(
            a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.seconds), type.a));
    intstream.range(0, 10)
        .foreach(
            a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.seconds), type.b));
    intstream.range(0, 10)
        .foreach(
            a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.seconds), type.c));
  }
}

代码说明:

1、首先我们构造了任务队列,使用的是linkedblockingdeque,使用双向队列的原因是如果任务无法获取资源,还需要塞到队首,保证任务的有序性。

2、使用concurrenthashmap作为资源映射表,为了保证资源队列使用的均衡性,一旦使用完成的资源会塞到对应资源的队尾处。

3、其中实现了添加任务、提取资源、回归资源几个方法。

4、initresource方法可以初始化资源队列,这里面只是简单的随机了几个资源到a、b、c三种资源,塞入各类别队列。

5、任务私有类有自己的任务标识以及执行完后调用回归资源方法。

6、main方法中会分别提交需要3中资源的10个任务,看看调度情况。

执行结果

我们可以通过结果发现任务有序调度,使用完任务后回归队列。 

以上就是java实现fifo任务调度队列策略的详细内容,更多关于java fifo任务调度的资料请关注其它相关文章!

《Java实现FIFO任务调度队列策略.doc》

下载本文的Word格式文档,以方便收藏与打印。