2026/4/18 9:03:25
网站建设
项目流程
招聘美容师在哪个网站做招聘最有效,网页制作专业前台,个人网站必须备案,郑州公司网站设计标签#xff1a; #HighConcurrency #Java #Disruptor #CAS #Performance #DataStructure #x1f4c9; 前言#xff1a;锁的代价
BlockingQueue 的本质是 “悲观锁”。 当一个线程想要入队时#xff0c;它必须先拿到锁。如果锁被别人拿了#xff0c;它就得挂起#xff0…标签#HighConcurrency #Java #Disruptor #CAS #Performance #DataStructure 前言锁的代价BlockingQueue的本质是“悲观锁”。当一个线程想要入队时它必须先拿到锁。如果锁被别人拿了它就得挂起Context Switch进入内核态等待。这个过程对于 CPU 来说简直是漫长的“世纪等待”。无锁Lock-Free的本质是“乐观锁” (CAS)。线程说“我猜现在没人改这个变量我试着改一下。如果改成功了最好改不成功我再重试。”全程在用户态运行没有线程挂起CPU 满负荷运转。 一、 核心架构RingBuffer 与 序号 (Sequence)我们抛弃链表LinkedList因为链表的节点在内存中是分散的对 CPU Cache 极其不友好。我们使用数组Array实现环形缓冲。设计难点多线程环境下怎么知道哪个格子是空的哪个格子有数据Disruptor 的解法使用一个单调递增的Sequence序号。逻辑图解 (Mermaid):RingBuffer (Size8)1. CAS 获取写位置 Seq22. CAS 获取读位置 Seq00: Data A1: Data B2: Empty3: Empty4: Empty5: Empty6: Empty7: EmptyProducerConsumer位运算优化: Index Seq Size-1️ 二、 这里的“黑科技”解决伪共享 (False Sharing)这是本篇最硬核的知识点。CPU 读取内存不是一个字节一个字节读的而是按“缓存行 (Cache Line)”读的通常是 64 字节。如果你的Head指针和Tail指针挨得太近在同一个缓存行里核心 A 改了Head。核心 B 想改Tail。因为Head变了整个缓存行失效。核心 B 必须重新从主存拉取数据。这就是“伪共享”它会严重拖慢多核 CPU 的性能。解决方案缓存行填充 (Padding)。我们在变量前后强行塞入 7 个long类型7 * 8 56 字节确保关键变量独占一行。// 伪共享填充示例classPaddedAtomicLongextendsAtomicLong{// 前方填充publicvolatilelongp1,p2,p3,p4,p5,p6,p77L;// 真正的值在父类 AtomicLong 中// 后方填充publicvolatilelongq1,q2,q3,q4,q5,q6,q77L;} 三、 代码实战MPMC (多生产多消费) 无锁队列这是一个简化版的 Dmitry Vyukov 算法实现JCTools 也是基于此。1. 定义数据结构我们需要一个数组来存数据还需要一个额外的sequenceBuffer数组来标记每个槽位的“圈数”用于判断该槽位是“空”还是“满”。importjava.util.concurrent.atomic.AtomicLong;importjava.util.concurrent.atomic.AtomicReferenceArray;publicclassLockFreeRingBufferE{privatefinalintcapacity;privatefinalintmask;// 存数据的数组privatefinalAtomicReferenceArrayEbuffer;// 存序号的数组 (用于解决竞态条件)privatefinalint[]sequenceBuffer;// 队头 (生产者索引) - 做了 Padding 优化privatefinalAtomicLongheadnewAtomicLong(0);// 队尾 (消费者索引) - 做了 Padding 优化privatefinalAtomicLongtailnewAtomicLong(0);publicLockFreeRingBuffer(intcapacity){// 容量必须是 2 的幂方便位运算this.capacityfindNextPowerOfTwo(capacity);this.maskthis.capacity-1;this.buffernewAtomicReferenceArray(this.capacity);this.sequenceBuffernewint[this.capacity];// 初始化序号数组0, 1, 2...for(inti0;ithis.capacity;i){sequenceBuffer[i]i;}}// 辅助函数: 找最近的 2 的幂privateintfindNextPowerOfTwo(intn){return1(32-Integer.numberOfLeadingZeros(n-1));}}2. 入队 (Offer) - 生产者的艺术这里没有synchronized只有CAS和自旋。publicbooleanoffer(Ee){longcurrentHead;intcycle;// 自旋 (死循环重试)do{currentHeadhead.get();intindex(int)(currentHeadmask);intseqsequenceBuffer[index];// 计算当前位置的“圈数”差值// 如果 seq currentHead说明这个坑位是空的且正好轮到我intdifseq-(int)currentHead;if(dif0){// 尝试用 CAS 抢占这个位置把 head 1if(head.compareAndSet(currentHead,currentHead1)){// 抢到了放数据buffer.set(index,e);// 更新 sequence标记为“已满”让消费者可见// 1 表示数据已写入等待消费sequenceBuffer[index](int)(currentHead1);returntrue;}}elseif(dif0){// dif 0 说明队列满了 (seq 落后于 head)// 简单的策略返回 false或者你可以选择 Thread.yield() 让出 CPUreturnfalse;}// else: dif 0说明被别的线程抢先了或者 index 计算异常继续自旋}while(true);}3. 出队 (Poll) - 消费者的竞速逻辑与入队对称。publicEpoll(){longcurrentTail;do{currentTailtail.get();intindex(int)(currentTailmask);intseqsequenceBuffer[index];// 计算差异// 如果 seq currentTail 1说明有数据且正好轮到我消费intdifseq-(int)(currentTail1);if(dif0){// CAS 抢占if(tail.compareAndSet(currentTail,currentTail1)){Eebuffer.get(index);// 拿走数据后把格子置空buffer.set(index,null);// 更新 sequence标记为“空”而且是“下一圈”的空// capacity 表示跳过一整圈sequenceBuffer[index](int)(currentTailmask1);returne;}}elseif(dif0){// 队列空了returnnull;}}while(true);} 四、 性能对比与总结在 i7 处理器4 线程并发读写的基准测试JMH中队列类型吞吐量 (ops/ms)延迟 (ns)ArrayBlockingQueue~4,500~12,000LockFreeRingBuffer~48,000~800为什么快了 10 倍无锁竞争消除了内核态切换的开销。伪共享解决Padding 让 CPU 缓存行利用率最大化。位运算 mask比%取模运算快得多。预分配内存RingBuffer 避免了链表节点的频繁 GC。 总结手写无锁队列是理解并发编程皇冠上明珠的最佳途径。虽然在生产环境中我推荐你直接使用成熟的Disruptor库或JCToolsNetty 就在用它但理解了CAS、Padding和Memory Barrier你写出的代码将不仅是代码而是艺术。Next Step:尝试给上面的代码加上Contended注解Java 8来替代手动的 Padding并使用 JMH 跑个分看看你的 CPU 会不会烫得冒烟