|
| 1 | +import java.util.ArrayList; |
| 2 | +import java.util.Collections; |
| 3 | +import java.util.List; |
| 4 | +import java.util.concurrent.CountDownLatch; |
| 5 | +import java.util.concurrent.ExecutorService; |
| 6 | +import java.util.concurrent.Executors; |
| 7 | +import java.util.concurrent.Semaphore; |
| 8 | + |
| 9 | +public class ConcurrencyValue1 { |
| 10 | + |
| 11 | + public static void main(String[] args) { |
| 12 | + int count = 10000000; |
| 13 | + List<Long> listKey = new ArrayList<>(); |
| 14 | + for (int i = 0; i < count; i++) { |
| 15 | + listKey.add(114560315500000000L + i); |
| 16 | + } |
| 17 | + ConcurrencyValue1 concurrencyTest = new ConcurrencyValue1(); |
| 18 | + List<Long> valueList1 = concurrencyTest.getValueList1(listKey); |
| 19 | + System.out.println("====>> getValueList1 valueList.size: " + valueList1.size()); |
| 20 | + } |
| 21 | + |
| 22 | + /** |
| 23 | + * 模拟一千万请求数据,创建一个线程池10个线程控制最大并发数,每个线程一次处理10万条,开启100个线程。(用于控制每批次线程的数据量业务场景) |
| 24 | + * |
| 25 | + * @param listKey 请求处理的总数据量 |
| 26 | + * @return |
| 27 | + */ |
| 28 | + public List<Long> getValueList1(List<Long> listKey) { |
| 29 | + |
| 30 | + /** |
| 31 | + (1)newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 |
| 32 | + (2)newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 |
| 33 | + (3)newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。 |
| 34 | + (4)newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 |
| 35 | + */ |
| 36 | + // 创建一个线程池 |
| 37 | + final ExecutorService executorService = Executors.newFixedThreadPool(10); //用于控制同时并发执行的线程数,使用线程池创建资源提高效率 |
| 38 | + List<Long> list_val = Collections.synchronizedList(new ArrayList<>()); //保证多线程操作的是同一个List |
| 39 | + try { |
| 40 | + long t1 = System.currentTimeMillis(); |
| 41 | + int max_one_batch = 100000; // 一批次最多处理100000条数据 |
| 42 | + List<List<Long>> newList = ListUtils.splitList(listKey, max_one_batch); |
| 43 | + int runSize = newList.size(); // 开启的线程数 |
| 44 | + |
| 45 | + /** |
| 46 | + * CountDownLanch 只需要在子线程执行之前, 赋予初始化countDownLanch, 并赋予线程数量为初始值。 |
| 47 | + * 每个线程执行完毕的时候, 就countDown一下。主线程只需要调用await方法, 可以等待所有子线程执行结束。 |
| 48 | + */ |
| 49 | + final CountDownLatch countDownLatch = new CountDownLatch(runSize); //计数器 |
| 50 | + |
| 51 | + /** |
| 52 | + * Semaphore(信号量)是用来控制同时访问特定资源的线程数量,拿到信号量的线程可以进入,否则就等待。 |
| 53 | + * 通过acquire()和release()获取和释放访问许可。 |
| 54 | + */ |
| 55 | + final Semaphore semaphore = new Semaphore(runSize); //信号量 |
| 56 | + // 循环创建线程 |
| 57 | + for (int j = 0; j < runSize; j++) { |
| 58 | + final int i = j; |
| 59 | + executorService.execute(() -> { |
| 60 | + try { |
| 61 | + semaphore.acquire(); |
| 62 | + // 执行程序 |
| 63 | + List<Long> subList = newList.get(i); |
| 64 | + List<Long> sub_ret = getValue(subList); |
| 65 | + list_val.addAll(sub_ret); |
| 66 | + System.out.println(Thread.currentThread().getName() + ": 当前线程/总线程: [" + i + "/" + runSize + "]" |
| 67 | + + ", 处理的数据条数:" + subList.size()); |
| 68 | + semaphore.release(); |
| 69 | + } catch (Exception e) { |
| 70 | + System.out.println(e.getMessage()); |
| 71 | + } finally { |
| 72 | + // 计数器减一 |
| 73 | + countDownLatch.countDown(); |
| 74 | + } |
| 75 | + }); |
| 76 | + } |
| 77 | + |
| 78 | + // 阻塞主线程等待所有线程执行完成 |
| 79 | + countDownLatch.await(); |
| 80 | + // 所有线程执行完,之后才能执行的部分 |
| 81 | + long t2 = System.currentTimeMillis(); |
| 82 | + System.out.printf("Call getValueList1 success... ret: {} size, threadCount: {}, costs time: {} ms" + list_val.size() + runSize + (t2 - t1)); |
| 83 | + return list_val; |
| 84 | + } catch (Exception e) { |
| 85 | + return null; |
| 86 | + } finally { |
| 87 | + // 关闭线程池 |
| 88 | + executorService.shutdown(); |
| 89 | + } |
| 90 | + } |
| 91 | + |
| 92 | + private List<Long> getValue(List<Long> listKey) { |
| 93 | + //具体操作省略 |
| 94 | + return listKey; |
| 95 | + } |
| 96 | + |
| 97 | +} |
0 commit comments