Semaphore 信号量如何实现文件导出限流?

情景导入我想各位小伙伴一定都做过导出功能 , 就算没做过 , 那肯定也用过 。
如果你既没有吃过猪肉 , 也没有见过猪跑 。 那这篇文章也可以读一读 , 可以补充点知识 。
导出作为一个非常常见的功能 , 也是稍有不慎就会导致系统压力剧增的问题之一 。
有类似苦恼的小伙伴可以阅读下我以前写的文章:
java 导出 excel 最佳实践 , java 大文件 excel 避免OOM(内存溢出) excel 工具框架
cpu 又报警了 , 快看看为什么?记得前不久 , 一个本该和平的上午 , 报警群里忽然就炸了 , 报警信息不断地轰炸过来 。
“快看看怎么了?” , 项目经理赶紧让各位同事查查问题 。
“CPU 太高了” , 查了下 , 有人回到 , “不知道是谁在一直导出大文件 。 ”
我们知道 excel 的导出是非常消耗较器性能的 , 为了限制范围 , 以前已经加了时间范围 1 个月 , 还做了很多优化 。
但是现在操作员可能在同时导出 , 导致机器压力还是太大了 。
“看一下能不能解决这个问题” , 项目经理说 , “这么报警也不是办法 。 ”
怎么解决?各位小伙伴 , 如果是你来解决这个问题 , 你会怎么做呢?
我们今天来介绍一种比较常用的解决方案 , 需要用到今天的主角 Semaphore 信号量 。
小伙伴有其他想法也欢迎评论区和大家分享讨论一下 。
Semaphore 信号量如何实现文件导出限流?文章插图
Semaphore 介绍计数信号量(Counting Semaphore)用来控制同时访问的某个特定资源的操作数量 , 或者同时执行某个指定操作的数量 。
ps: 我们可以根据这个特性 , 灵活地限制同时导出的执行数量 。
计算信号量还可以用来实现某种资源池 , 或者对容器施加边界 。
Semaphore 中管理着一组虚拟许可(permit) , 许可的初始量可通过构造函数来指定 。
在执行操作时可以首先获得许可(只要还有剩余的许可) , 并在使用以后释放许可 。
如果没有许可 , 那么acquire将阻塞直到有许可(或者直到被中断或者操作超时) 。
release方法将返回一个许可给信号量 。 计算信号量的一种简化形式是二值信号量 , 即初始化值为1的Semaphore 。 二值信号量可以用做互斥体(mutex) , 并具备不可重入的加锁语义:谁拥有这个唯一的许可 , 谁就拥有了互斥锁 。
使用例子public class TestSemaphore {public static void main(String[] args) {// 线程池ExecutorService exec = Executors.newCachedThreadPool();// 只能5个线程同时访问final Semaphore semp = new Semaphore(5);// 模拟20个客户端访问for (int index = 0; index < 20; index++) {final int NO = index;Runnable run = new Runnable() {public void run() {try {// 获取许可semp.acquire();System.out.println("Accessing: " + NO);Thread.sleep((long) (Math.random() * 10000));// 访问完后 , 释放semp.release();System.out.println("-----------------" + semp.availablePermits());} catch (InterruptedException e) {e.printStackTrace();}}};exec.execute(run);}// 退出线程池exec.shutdown();}}使用说明声明信号量:
// 只能5个线程同时访问final Semaphore semp = new Semaphore(5);获取许可:
// 获取许可semp.acquire();释放许可:
// 访问完后 , 释放semp.release();测试日志Accessing: 1Accessing: 3Accessing: 0Accessing: 2Accessing: 5-----------------1Accessing: 4-----------------1Accessing: 6-----------------1Accessing: 7-----------------1Accessing: 18-----------------1Accessing: 11-----------------1Accessing: 12-----------------1Accessing: 15-----------------1Accessing: 9-----------------1Accessing: 10-----------------1Accessing: 13-----------------1Accessing: 16-----------------1Accessing: 14-----------------1Accessing: 17-----------------1Accessing: 8-----------------1Accessing: 19-----------------1-----------------2-----------------3-----------------4-----------------5可以看到一开始前 5 个客户端都拿到了访问权限 , 但是后面的就需要等待了 。
等到后面逐渐释放锁 , 锁的 semp.availablePermits() 又恢复到了 5 个 。
Semaphore 信号量如何实现文件导出限流?文章插图
源码解析jdk 版本不同的 jdk 版本源码可能存在差异 , 老马这次梳理的 jdk 信息如下:
java version "1.8.0_191"Java(TM) SE Runtime Environment (build 1.8.0_191-b12)Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)类定义Semaphore 实现了 Serializable 接口 。
Sync 是整个实现最核心的部分 。
public class Semaphore implements java.io.Serializable {private static final long serialVersionUID = -3222578661600680210L;/** All mechanics via AbstractQueuedSynchronizer subclass */private final Sync sync;/*** Creates a {@code Semaphore} with the given number of* permits and nonfair fairness setting.** @param permits the initial number of permits available.*This value may be negative, in which case releases*must occur before any acquires will be granted.*/public Semaphore(int permits) {sync = new NonfairSync(permits);}/*** Creates a {@code Semaphore} with the given number of* permits and the given fairness setting.** @param permits the initial number of permits available.*This value may be negative, in which case releases*must occur before any acquires will be granted.* @param fair {@code true} if this semaphore will guarantee*first-in first-out granting of permits under contention,*else {@code false}*/public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}}