JAVA 并发编程

2019-08-16 10:07:51来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

JAVA 并发编程

极客时间课程
管理时间
推荐使用的java库
节假日api

写在前面

一切技术都是纸老虎,技术就是一层膜,捅破了就什么也不是

// 读书推荐
设计模式:
    Head First Design Patterns
    设计模式之禅
// java 并发编程实战
// 深入理解 java 虚拟机
// effective java

// 代码工程化
代码整洁之道

NIO

Non blocked IO  ==> New IO
传统 IO 面向流 是阻塞式的
NIO 面向缓冲区  像是  火车轨道 (Channel) + 火车 (Buffer)
选择器  

NIO 复制文件小例子

package com.ghc.mmall.concurrency.nio;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

/**
 * @author :Frank Li
 * @date :Created in 2019/7/31 20:20
 * @description:${description}
 * @modified By:
 * @version: $version$
 */
public class ChannelDemo {
    public static void main(String[] args) throws IOException {
         long start = System.currentTimeMillis();
/*        FileInputStream fileInputStream = new FileInputStream("C:\\Users\\FrankLi\\Desktop\\AI\\timesheet\\favicon.ico");
        FileChannel inChannel = fileInputStream.getChannel();

        FileOutputStream fos = new FileOutputStream("C:\\Users\\FrankLi\\Desktop\\AI\\timesheet\\dist\\favicon.ico");
        FileChannel  outChannel = fos.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int len = -1;
        while((len=inChannel.read(byteBuffer)) != -1){
            byteBuffer.flip();
            outChannel.write(byteBuffer);
            byteBuffer.clear();
        }

        outChannel.close();
        inChannel.close();
        fos.close();
        fileInputStream.close();*/
//        copyFile("C:\\Users\\FrankLi\\Desktop\\AI\\timesheet\\favicon.ico","C:\\Users\\FrankLi\\Desktop\\AI\\timesheet\\favicon2.ico");
        copyFile2("C:\\Users\\FrankLi\\Desktop\\AI\\timesheet\\timesheet.py","C:\\Users\\FrankLi\\Desktop\\AI\\timesheet\\timesheet_copied.py");
        long end = System.currentTimeMillis();
        System.out.println(String.format("cost about %d s",end-start));
    }

    public static void copyFile(String src, String tar){
        // 首先获取 读取管道
        FileChannel readChannel = null;
        // 获取 写入管道
        FileChannel writeChannel =null;
        try{
            readChannel = FileChannel.open(Paths.get(src), StandardOpenOption.READ);
            writeChannel = FileChannel.open(Paths.get(tar), StandardOpenOption.WRITE,StandardOpenOption.CREATE);
            ByteBuffer buf = ByteBuffer.allocate(1024);
            while(readChannel.read(buf) != -1){
                buf.flip();
                writeChannel.write(buf);
                buf.clear();
            }
        }catch(IOException ioe){
            ioe.printStackTrace();
        }finally{
            if(readChannel!=null){
                try{
                    readChannel.close();
                }catch(IOException ioe){
                    ioe.printStackTrace();
                }
            }
            if(writeChannel!=null){
                try{
                    writeChannel.close();
                }catch(IOException ioe){
                    ioe.printStackTrace();
                }
            }
        }
    }

    //  使用直接缓冲区完成文件的复制(内存映射文件) ,这样操作可以省去内核与JVM 内存之间的数据 拷贝
    public static void copyFile2(String src, String tar){
        FileChannel inFileChannel = null;
        FileChannel outFileChannel = null;
        try{
            inFileChannel = FileChannel.open(Paths.get(src), StandardOpenOption.READ);
            outFileChannel = FileChannel.open(Paths.get(tar),StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);


            // 这里 如果 直接通道数据传输会更加方便
            /*inFileChannel.transferTo(0,inFileChannel.size(),outFileChannel);
            outFileChannel.transferFrom(inFileChannel,0,inFileChannel.size());
            */
            
            // 内存映射页
            MappedByteBuffer inMappedByteBuffer = inFileChannel.map(FileChannel.MapMode.READ_ONLY,0,inFileChannel.size());
            MappedByteBuffer outMappedByteBuffer = outFileChannel.map(FileChannel.MapMode.READ_WRITE, 0 , inFileChannel.size());

            // 直接对缓冲区进行数据的读写操作
            byte [] dst = new byte[1024];
            inMappedByteBuffer.get(dst);
            outMappedByteBuffer.put(dst);

        }catch(IOException ioe){
            ioe.printStackTrace();
        }finally{
            if(inFileChannel != null){
                try{
                    inFileChannel.close();
                }catch(IOException ioe){
                    ioe.printStackTrace();
                }
            }
            if(outFileChannel != null){
                try{
                    outFileChannel.close();
                }catch(IOException ioe){
                    ioe.printStackTrace();
                }
            }
        }
    }
}

分散读取 聚集写入

public static void main(String[] args) throws IOException {
        copyFile("C:\\Users\\FrankLi\\Desktop\\AI\\timesheet\\timesheet.py","C:\\Users\\FrankLi\\Desktop\\AI\\timesheet\\timesheet_2.py");
    }

    public static void copyFile(String src, String dst) throws IOException{
        RandomAccessFile randomAccessFileReader = new RandomAccessFile(src,"r");
        FileChannel inChannel = randomAccessFileReader.getChannel();

        ByteBuffer byteBufferFirst = ByteBuffer.allocate(512);
        ByteBuffer byteBufferSecond = ByteBuffer.allocate(1024*6);
        ByteBuffer [] bufs = {byteBufferFirst,byteBufferSecond};

        inChannel.read(bufs);
        for(ByteBuffer buf:bufs){
            buf.flip();
            System.out.println(new String(buf.array(),0,buf.limit()));
            System.out.println("---------------=============---------------");
        }

        RandomAccessFile randomAccessFileWriter = new RandomAccessFile(dst,"rw");
        FileChannel outChannel = randomAccessFileWriter.getChannel();
        outChannel.write(bufs);

        inChannel.close();
        outChannel.close();
    }

NIO 字符集

        Map<String, Charset> charsetMap = Charset.availableCharsets();
        for(Map.Entry<String, Charset> entrySet:charsetMap.entrySet()){
            System.out.println(entrySet.getKey()+" <----> "+entrySet.getValue());
        }

读写 JSON

jackson 的强项是灵活可定制, 并且具有了一个生态, yaml 也能完美驾驭
gson是轻量 简洁
fastjson 似乎没有一个好的生态 , 性能也比较好

多线程两种实现方式

  • 继承 Thread 类,实现 run 方法将需要多线程启动的功能代码放在 run 方法内 该方式有 isinterrupted 标志位,
    可以根据该标志位在另一个能够获取到该线程的代码块中that.interrupt 实现中断,但是是否真的中断则由that线程决定
  • 实现 runnable 接口,覆写 run 方法将需要多线程启动的功能代码放在 run 方法内,注意这里没有 isInterrupted 标志位
    实际上在一个线程中停止另一个线程可以用 concurrent 包中的 cancel 方法,这个 跟 python 简直一毛一样啊啊啊
    ExecutorService 接口下固定大小线程池 (Fixed),动态变化(Cached) , 以及只有单个(Single)线程的 线程池
// t1.start() 永远使用 start --》 start0 (本地方法) 去启动线程 而非 调用 run 方法
// 这里记得 t1.join() 是等待t1线程执行完成才会继续往下执行
// t1.setDaemon(true) 设置为守护线程,也就是不那么重要的,JVM 在所有非守护线程执行完成后就会退出,垃圾回收就是一个守护线程
// 虽然我们以后使用 concurrent 包来进行并发,但是基础原理一定要掌握牢固
// 进程 六种状态 
NEW:新建状态 刚刚创建出来,还没有调用start方法之前的状态。
RUNNABLE:可运行状态,可能正在执行,也可能不是正在执行,只有在该种状态下的线程才有资格抢CPU。
BLOCKED:锁阻塞状态? 线程要等待另一个线程释放锁对象。
WAITING:无限等待? 线程调用了wait()方法进入的状态,需要其它线程调用notify方法唤醒。
TIMED_WAITING:计时等待状态? 线程调用了sleep方法获wait(long time)方法进入的状态。
TERMINATED:死亡状态? 线程任务执行完毕或调用了stop方法。


Thread 常用方法
构造方法 Thread(Runnable target,String name)

静态方法:
Thread.currentThread().getName()
Thread.sleep(1000) // java 中 单位是毫秒 所以 1000ms = 1 s,python 中直接是 秒

线程安全同步机制 synchronized 同步代码快, 同步方法,可重入锁,可重入读写锁

// synchronized 代码块中 可以 wait  , notify() , notifyAll()
// lock.newCondition 也可以实现  await() signal()  signalAll()

加入 synchronized 同步方法, synchronized 这个方式不如 可重入锁安全,被synchronized修饰的要么获得锁,要么永远等待下去

public class Counter {
    private int value;
    public synchronized  void inc(int m){
            this.value+=m;
    }

    public synchronized void dec(int m){
            this.value-=m;
    }

}

引入可重入锁即可以在同一个线程内多次获取该锁

package com.ghc.test;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Counter {
    private Lock lock = new ReentrantLock();
    private int value;
    public void inc(int m){
            if(lock.tryLock()){
                try{
                    this.value+=m;
                }finally{
                    lock.unlock();
                }
        }
    }

    public void dec(int m){
        if(lock.tryLock()){
            try{
                this.value-=m;
            }finally {
                lock.unlock();
            }
        }
    }

    public int getValue(){
        lock.lock();
            try{
                return this.value;
            }finally {
                lock.unlock();
            }
    }

    public static void main(String [] args) throws InterruptedException{
        System.out.println(Thread.currentThread().getName()+" start...");
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+" start...");
            try{
                Thread.sleep(1000);
            }catch (InterruptedException e){}
            System.out.println(Thread.currentThread().getName()+" end...");
        },"download thread").start();
        Thread.sleep(500);
        System.out.println(Thread.currentThread().getName()+" end...");
    }

}

引入 可重入 读写锁,因为 可以同时读 , 不可同时写入 或者说不可同时读写

引入 可重入读写锁在同时写入的时候会加锁进行同步,而在同时读取的时候则不会提高并发性能

package com.ghc.test;

import java.util.concurrent.locks.ReentrantReadWriteLock;


public class Counter {
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    private int value;
    public void inc(int m){
        // 写 锁
        writeLock.lock();
        try{
            this.value+=m;
        }finally {
            writeLock.unlock();
        }
    }
    
    public void dec(int m){
        // 读锁
        readLock.lock();
        try{
            this.value-=m;
        }finally {
            readLock.unlock();
        }
    }

}

使用 线程池进行并发

package com.ghc.test;

import java.util.concurrent.*;
import java.time.LocalTime;
/**
 * @author :Frank Li
 * @date :Created in 2019/7/11 8:49
 * @description:${description}
 * @modified By:
 * @version: $version$
 */
class PrintThread extends Thread{
    private String taskName;
    PrintThread(String taskName, String threadName){
        this.taskName = taskName;
        this.setName(threadName);
    }
    @Override
    public void run(){
        System.out.println(this.getName()+": Hello, "+this.taskName);
        try{
            Thread.sleep(1000);
            int i = 1/0;
        }catch (InterruptedException e){}
        System.out.println(this.getName()+": Goodbye, "+this.taskName);
    }

}

public class ExecutorServiceTest {

    public static void main(String[] args) {
        // 阅读源码可以发现 ThreadPoolExecutor 才是万物之基

        // 创建 固定大小的 线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(new PrintThread("Frank", "t1"));
        executor.submit(new PrintThread("May", "t2"));
        executor.shutdown();

        System.out.println("---***华丽的分割线***---");
        // 创建 弹性动态伸缩线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 不指定大小
        cachedThreadPool.submit(new PrintThread("SCALA", "t3"));
        cachedThreadPool.submit(new PrintThread("PYTHON", "t4"));
        cachedThreadPool.shutdown();

        // 创建弹性动态伸缩线程池 但是指定最大线程 为 10 个, 线程池中保持 corePoolSize 个线程即使是idle 的
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        ExecutorService cachedThreadMaxSpecifiedPool = new ThreadPoolExecutor(corePoolSize, 10,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        Future<PrintThread> future = (Future<PrintThread>) cachedThreadMaxSpecifiedPool.submit(new PrintThread("China", "t5"));
        future.cancel(true); // 取消某个线程
        cachedThreadMaxSpecifiedPool.submit(new PrintThread("World", "t6"));
        cachedThreadMaxSpecifiedPool.shutdown();

        // 创建单个线程的线程池
    /*    Executor singleExecutor = Executors.newSingleThreadExecutor();
        singleExecutor.execute(new PrintThread("single", "dog"));
        singleExecutor.execute(new PrintThread("all the way", "single"));*/
        // 创建一个 定时执行 executor
        ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
        scheduledExecutor.scheduleAtFixedRate(new PrintThread("scheduled task "+LocalTime.now(),"t7"), 2, 3, TimeUnit.SECONDS);  // 每间隔 n 秒 执行一次
        scheduledExecutor.scheduleWithFixedDelay(new PrintThread("scheduled delay task "+LocalTime.now(),"t8"), 2, 2,TimeUnit.SECONDS); // 等待上一次任务运行结束delay 运行下一个
//        scheduledExecutor.shutdown();
    }
}

获取 返回值的 线程 需要使用 Callable 的 子类 , 否则 可用 Runnable 接口的子类

package com.ghc.test;
import java.util.concurrent.*;

/**
 * @author :Frank Li
 * @date :Created in 2019/7/11 15:31
 * @description:${description}
 * @modified By:
 * @version: $version$
 */

class CallableFuture implements Callable<String> {
    private String taskName;
    CallableFuture(String taskName){
        this.taskName = taskName;
    }
    @Override
    public String call() throws InterruptedException{
        System.out.println(Thread.currentThread().getName()+"--> "+ "start: Hello, "+this.taskName);
        Thread.sleep(3000);
        System.out.println(Thread.currentThread().getName()+"--> "+ "end: Goodbye, "+this.taskName);
        return this.taskName;
    }
}
public class FutureTest {
    public static void main(String [] args){
       Callable<String> callableTask = new CallableFuture("Frank");
        ExecutorService executor = Executors.newFixedThreadPool(4);
        Future<String> future =  executor.submit(callableTask);
       String result = null;
       try{
           result = future.get(2, TimeUnit.SECONDS); // 可能会阻塞 等待 线程完成 获取返回结果 ,设置超时
           /*java.util.concurrent.TimeoutException
           at java.util.concurrent.FutureTask.get(FutureTask.java:205)
           at com.ghc.test.FutureTest.main(FutureTest.java:32)*/
       }catch (InterruptedException interException){
           interException.printStackTrace();
       }catch (ExecutionException ee){
           ee.printStackTrace();
       }catch (TimeoutException te){
           te.printStackTrace();
       }
        System.out.println(result);
       executor.shutdown();
    }
}

ForkJoin 采用分治算法 ,想到快排 分治 + 挖坑 填坑。 思考, 他与 Map Reduce 并行框架的区别

package com.ghc.test;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * @author :Frank Li
 * @date :Created in 2019/7/11 16:56
 * @description:${description}
 * @modified By:
 * @version: $version$
 */
public class ForkJoinTest extends RecursiveTask<Long> {
    private static final int THRESHOLD = 250;
    long [] array;
    int start;
    int end;

    ForkJoinTest(long [] array, int start, int end){
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute(){
        if(end-start <= THRESHOLD){
            // 如果任务量足够小,直接计算;
            long sum = 0;
            for(int i = start; i < end; i++){
                sum += this.array[i];
                try{
                    Thread.sleep(2);
                }catch (InterruptedException interExcept){
                    interExcept.printStackTrace();
                }
            }
            return sum;
        }else{
            // 当任务太大, 我们将大任务进行拆分
            int middle = (end + start) / 2;
            System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start,end,start,middle,middle,end));
            ForkJoinTest subTask1 = new ForkJoinTest(this.array, start, middle);
            ForkJoinTest subTask2 = new ForkJoinTest(this.array,middle, end);
            invokeAll(subTask1, subTask2);
            Long subResult1 = subTask1.join();
            Long subResult2 = subTask2.join();
            Long result = subResult1 + subResult2;
            System.out.println(String.format("result = %d + %d ==> %d", subResult1,subResult2, subResult1+subResult2));
            return result;
        }
    }

    public static void main(String [] args){
        long [] array = new long[1000];
        long expectedSum = 0;
        for(int i=0; i < array.length; i++){
            array[i] = random();
            expectedSum+=array[i];
        }
        System.out.println("Expected sum: "+ expectedSum);
        ForkJoinTask<Long> task = new ForkJoinTest(array, 0, array.length);
        long startTime = System.currentTimeMillis();
        Long result = ForkJoinPool.commonPool().invoke(task);
        long endTime = System.currentTimeMillis();
        System.out.println("Fork/join sum: "+ result + " in "+(endTime-startTime)+" ms");
    }

    static Random random = new Random(0);
    static long random(){
        return random.nextInt(10000);
    }

}

ThreadLocal 用作单个线程内部,传递变量,必须用 try finally 处理 , 可以避免 变量在每一个调用方法处传递

package com.ghc.test;

/**
 * @author :Frank Li
 * @date :Created in 2019/7/11 18:23
 * @description:${description}
 * @modified By:
 * @version: $version$
 */

class User{
    String name;
    int level;
    User(String name, int level){
        this.name = name;
        this.level = level;
    }
}

class UserContext implements AutoCloseable{
    // 全局唯一静态变量
    private static final ThreadLocal<User> context = new ThreadLocal<>();
    // 获取当前线程的 ThreadLocal User:
    public static User getCurrentUser(){
        return context.get();
    }
    // 初始化 ThreadLocal 的 User
    public UserContext(User user){
        context.set(user);
    }

    @Override
    public void close(){
        context.remove();
    }
}

class ProcessThread extends Thread{
    User user;
    ProcessThread(User user){
        this.user = user;
    }

    public void run(){
        try(UserContext ctx = new UserContext(user)){
            new Greeting().hello();
            Level.checkLevel();

        }
    }

}
class Greeting{
    void hello(){
        User user = UserContext.getCurrentUser();
        System.out.println("hello "+user.name+" !");
    }
}
class Level{
    static void checkLevel(){
        User user = UserContext.getCurrentUser();
        if(user.level > 100){
            System.out.println(user.name+ " is a VIP user");
        }else{
            System.out.println(user.name + " is a registered user.");
        }
    }
}
public class ThreadLocalTest {
    public static void main(String [] args) throws InterruptedException{
        Thread t1 = new ProcessThread(new User("Bob", 120));
        Thread t2 = new ProcessThread(new User("Frank", 98));
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("Main end...");
    }
}

深入学习 并发编程以及 高并发

CPU 多级缓存机制 MESI (Modified Exclusive Shared Or Invalid) 四种状态 , Localread local write remote read remote write 四种读写

CPU 对运行代码 进行乱序 优化可能带来 实际结果与 逻辑结果不一致



怕什么真理无穷,进一寸有一寸的欢喜。
给以后的自己看

JMM java memory model ==》 java 虚拟机内存模型

JMM 同步操作 八个步骤 lock -> 从主内存 read -> load 到每个线程独享内存-> use --> assign --> store --> write --》unlock

并发的 优缺点

一个 线程不安全的小例子

package com.ghc.mmall.concurrency.test;

import com.ghc.mmall.concurrency.annotations.UnThreadSafe;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
 * @author :Frank Li
 * @date :Created in 2019/7/17 16:27
 * @description:${description}
 * @modified By:
 * @version: $version$
 */
@UnThreadSafe
public class ConcurrencyTest {
    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;
    public static int count = 0;
    private static final Logger logger = Logger.getGlobal();
    private static final Lock lock = new ReentrantLock();

    public static void main(String [] args) throws InterruptedException {
        logger.setLevel(Level.INFO);
        ExecutorService executor = Executors.newCachedThreadPool();
        final Semaphore  semaphore = new Semaphore(threadTotal);
        final CountDownLatch  countDownLatch = new CountDownLatch(clientTotal);
        for(int i=0;i<clientTotal;i++){
            executor.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        logger.info("count after countDownLatch...:"+count);
    }

    public synchronized static void add(){
        count++;
    }
}

使用锁机制,手动加锁也是可以保证线程安全的

package com.ghc.mmall.concurrency.test;

import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadSafeDemo {
    private static int count = 0;
    private final static int clientTotal = 5000;
    private final static int threadTotal = 200;
    private final static Lock lock = new ReentrantLock();
    private final static Logger logger = Logger.getGlobal();
    public static void main(String [] args) throws InterruptedException {
        logger.setLevel(Level.INFO);
        ExecutorService executor = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(threadTotal);
        CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for(int i=0;i<clientTotal;i++){
            executor.execute(()->{
                try{
                    semaphore.acquire();
                    add();
                    semaphore.release();
                }catch(InterruptedException e){}
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executor.shutdown();
        logger.info("count: "+count);
    }

    private  static void add() throws InterruptedException{

        /*synchronized(ThreadSafeDemo.class){
            count++;
        }*/

        if(lock.tryLock(1, TimeUnit.SECONDS)){// (可重入锁是JDK层面实现的锁)可以 有效避免死锁, 使用 上面的 synchronized (基于JVM层面的锁) 可能会 死锁
            try{
            count++;
            }finally{
                lock.unlock();
            }
        }
    }
}

下面这个采用 原子操作 所以是线程 安全的

package com.ghc.mmall.concurrency.test;

import com.ghc.mmall.concurrency.annotations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
 * @author :Frank Li
 * @date :Created in 2019/7/17 18:58
 * @description:${description}
 * @modified By:
 * @version: $version$
 */
@Slf4j
@ThreadSafe
public class AtomicBooleanTest {
    private static AtomicInteger count = new AtomicInteger(0);
    private static int clientTotal = 5000;
    private static int threadTotal = 200;
    private static final Logger logger = Logger.getGlobal();
    public static void main(String [] args) throws InterruptedException {
        logger.setLevel(Level.INFO);
        ExecutorService executor = Executors.newCachedThreadPool();
        final Semaphore  semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for(int i=0;i<clientTotal;i++){
            executor.execute(()->{
               try{
                   semaphore.acquire();
                   incr();
                   semaphore.release();
               } catch(InterruptedException e){

               }
               countDownLatch.countDown();
            });

        }
        countDownLatch.await();
        executor.shutdown();
        logger.info("count: "+count);
    }

    public static void incr(){
        count.getAndIncrement();
    }

}


// 改用 LongAdder 来 替代 AtomicLong 有时候会更加高效
package com.ghc.mmall.concurrency.test;

import com.ghc.mmall.concurrency.annotations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
 * @author :Frank Li
 * @date :Created in 2019/7/17 18:58
 * @description:${description}
 * @modified By:
 * @version: $version$
 */
@Slf4j
@ThreadSafe
public class AtomicBooleanTest {
    private static LongAdder count = new LongAdder();
    private static int clientTotal = 5000;
    private static int threadTotal = 200;
    private static final Logger logger = Logger.getGlobal();
    public static void main(String [] args) throws InterruptedException {
        logger.setLevel(Level.INFO);
        ExecutorService executor = Executors.newCachedThreadPool();
        final Semaphore  semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for(int i=0;i<clientTotal;i++){
            executor.execute(()->{
               try{
                   semaphore.acquire();
                   incr();
                   semaphore.release();
               } catch(InterruptedException e){

               }
               countDownLatch.countDown();
            });

        }
        countDownLatch.await();
        executor.shutdown();
        logger.info("count: "+count);
    }

    public static void incr(){
        count.increment();
    }

}

安全的线程需要 满足以下特性

  1. 原子性 , CAS (compareAndSwapxxx , compareAndSet xxx 为 Int Long 等) 底层采用循环不断比较本地内存与主内存中的值是否一样,不一样就一直去取直到一样取出来相加
  2. 可见性 一个线程 对住内存的修改可以及时地被其他线程观察到
  3. 有序性
// 特别注意 对于 Long 类型提供了 两个 原子操作的类
AutomicLong , LongAdder (后者更高效但有可能不够精确)  // 不妨优先考虑 后者

可见性

安全发布对象

线程安全的 例子, 推荐使用 枚举方法 创建单例

饿汉式单例模式 虽然简单 但是可能造成资源浪费

package com.ghc.mmall.concurrency.singleton;

import com.ghc.mmall.concurrency.annotations.NotRecommend;
import com.ghc.mmall.concurrency.annotations.ThreadSafe;

@ThreadSafe
@NotRecommend
public class Singleton1 {
    // 饿汉式  单例模式
    // 首先 私有构造方法
     private Singleton1(){}

     private  static Singleton1 singleton = new Singleton1();

     public static Singleton1 getInstance(){
         return singleton;
     }

}

懒汉式单例模式 虽然线程安全但是 书写复杂容易造成 线程不安全 所以也不推荐

package com.ghc.mmall.concurrency.singleton;

import com.ghc.mmall.concurrency.annotations.NotRecommend;
import com.ghc.mmall.concurrency.annotations.ThreadSafe;


@ThreadSafe
@NotRecommend
public class Singleton2 {
    // 懒汉式 单例模式
    // 第一步同样是需要私有化构造方法
    private Singleton2(){}

    // 1  memory= allocate()  分配对象的内存空间
    // 2  ctorInstance() 初始化对象
    // 3  instance = memory 设置 instance 指向刚分配的内存
    // 第二步 延迟给变量赋值 volatile 确保 JVM CPU 优化指令重排序不会影响线程安全
    private static volatile Singleton2 singleton = null;
    // volatile + 双重检测机制 --》 禁止对象指令重排

    // 静态工厂方法
    public static Singleton2 getInstance(){
        if(singleton==null){
            synchronized (Singleton2.class){
                // 双重同步锁 确保线程安全第一点
                if(singleton==null){
                    singleton = new Singleton2();
                }
            }
        }
        return singleton;
    }

}

使用内部枚举类 来利用JVM 控制多线程 运行时侯始终 只有一个实例被创建 推荐使用

package com.ghc.mmall.concurrency.singleton;

import com.ghc.mmall.concurrency.annotations.Recommend;
import com.ghc.mmall.concurrency.annotations.ThreadSafe;

/**
 * 
 */
@ThreadSafe
@Recommend
public class Singleton3 {

    // 使用枚举类确保线程安全
    // 第一步 仍然是私有化构造方法
    private  Singleton3(){}

    // 第二步 提供对外访问的静态公用接口
    public static Singleton3 getInstance(){
        return Singleton.INSTANCE.getInstance();
    }

    public enum  Singleton{
        INSTANCE;
        private Singleton3 singleton = null;
        Singleton(){
            singleton = new Singleton3();
        }

        public Singleton3 getInstance(){
            return singleton;
        }
    }
}

不可变对象 参考 String 类


同步容器

并发容器


ForkJoin

package com.ghc.mmall.concurrency.test;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample(int start, int end){
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        // 如果任务足够小 就计算任务
        boolean canCompute = (end - start) <= threshold;
        if(canCompute){
            for(int i = start; i<= end;i++){
                sum+=i;
            }
        }else{
            // 如果任务大于阈值 , 就分类成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle+1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待子任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务的结果
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 生成一个计算任务  计算 1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
        Future<Integer> result = forkJoinPool.submit(task);
        try{
            log.info("result: {}", result.get());
        }catch(Exception e){
            log.error("exception: ", e);
        }
    }
}

写在最后


原文链接:https://www.cnblogs.com/Frank99/p/11161816.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:多线程与高并发(四)volatile关键字

下一篇:自己实现SpringAOP,含AOP实现的步骤分解