|
| 1 | +import javax.annotation.PostConstruct; |
| 2 | +import java.io.IOException; |
| 3 | +import java.util.*; |
| 4 | +import java.util.concurrent.*; |
| 5 | + |
| 6 | +public class CompletableFutureTest { |
| 7 | + |
| 8 | + // 模拟的请求数量 |
| 9 | + private static final int THREAD_NUM = 1000; |
| 10 | + |
| 11 | + // 倒计数器 juc包中常用工具类 |
| 12 | + private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM); |
| 13 | + |
| 14 | + // 积攒 请求。(每隔N毫秒批量处理一次) |
| 15 | + private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>(); |
| 16 | + |
| 17 | + public static void main(String[] args) throws IOException { |
| 18 | + // 创建 并不是马上发起请求 |
| 19 | + for (int i = 0; i < THREAD_NUM; i++) { |
| 20 | + final String code = "code-" + (i + 1); // 番号 |
| 21 | + // 多线程模拟用户查询请求 |
| 22 | + Thread thread = new Thread(() -> { |
| 23 | + try { |
| 24 | + // 代码在这里等待,等待countDownLatch为0,代表所有线程都start,再运行后续的代码 |
| 25 | + countDownLatch.await(); |
| 26 | + // http请求,实际上就是多线程调用这个方法 |
| 27 | + CompletableFutureTest queryRpc = new CompletableFutureTest(); |
| 28 | + Map<String, Object> result = queryRpc.queryCommodity(code); |
| 29 | + System.out.println(Thread.currentThread().getName() + " 查询结束,结果是:" + result); |
| 30 | + } catch (Exception e) { |
| 31 | + System.out.println(Thread.currentThread().getName() + " 线程执行出现异常:" + e.getMessage()); |
| 32 | + } |
| 33 | + }); |
| 34 | + thread.setName("price-thread-" + code); |
| 35 | + thread.start(); |
| 36 | + countDownLatch.countDown(); |
| 37 | + } |
| 38 | + |
| 39 | + // 输入任意内容退出 |
| 40 | + System.in.read(); |
| 41 | + } |
| 42 | + |
| 43 | + // 1000 用户请求,1000个线程 |
| 44 | + public Map<String, Object> queryCommodity(String movieCode) throws ExecutionException, InterruptedException { |
| 45 | + // 1000次 怎么样才能变成 更少的接口 |
| 46 | + // 思路: 将不同用户的同类请求合并起来 |
| 47 | + // 并非立刻发起接口调用,请求 收集起来,再进行 |
| 48 | + Request request = new Request(); |
| 49 | + request.commodityCode = movieCode; |
| 50 | + // 异步编程: 获取异步处理的结果 |
| 51 | + CompletableFuture<Map<String, Object>> future = new CompletableFuture<>(); |
| 52 | + request.future = future; |
| 53 | + queue.add(request); |
| 54 | + return future.get(); // 此处get方法,会阻塞线程运行,直到future有返回 |
| 55 | + } |
| 56 | + |
| 57 | + // 定时任务的实现,N秒钟处理一次数据 |
| 58 | + @PostConstruct |
| 59 | + public void init() { |
| 60 | + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); |
| 61 | + scheduledExecutorService.scheduleAtFixedRate(() -> { |
| 62 | + // 1、 取出queue的请求,生成一次批量查询 |
| 63 | + int size = queue.size(); |
| 64 | + if (size == 0) { |
| 65 | + return; |
| 66 | + } |
| 67 | + ArrayList<Request> requests = new ArrayList<>(); |
| 68 | + for (int i = 0; i < size; i++) { |
| 69 | + Request request = queue.poll(); |
| 70 | + requests.add(request); |
| 71 | + } |
| 72 | + System.out.println("批量处理数据量:" + size); |
| 73 | + // 2、 组装一个批量查询(一定需要 目的资源能够支持批量查询。 http) |
| 74 | + ArrayList<String> commodityCodes = new ArrayList<>(); |
| 75 | + for (Request request : requests) { |
| 76 | + commodityCodes.add(request.commodityCode); |
| 77 | + } |
| 78 | + QueryRpc queryRpc = new QueryRpc(); |
| 79 | + // 组装一个批量查询 |
| 80 | + List<Map<String, Object>> responses = queryRpc.queryCommodityByCodeBatch(commodityCodes); |
| 81 | + |
| 82 | + // 3、将结果响应 分发给每一个单独的用户请求。 由定时任务处理线程 --> 1000个用户的请求线程 |
| 83 | + HashMap<String, Map<String, Object>> responseMap = new HashMap<>(); |
| 84 | + for (Map<String, Object> response : responses) { |
| 85 | + String code = response.get("code").toString(); |
| 86 | + responseMap.put(code, response); |
| 87 | + } |
| 88 | + for (Request request : requests) { |
| 89 | + // 根据请求中携带的能表示唯一参数,去批量查询的结果中找响应 |
| 90 | + Map<String, Object> result = responseMap.get(request.commodityCode); |
| 91 | + // 将结果返回到对应的请求线程 |
| 92 | + request.future.complete(result); |
| 93 | + } |
| 94 | + }, 0, 10, TimeUnit.MILLISECONDS); |
| 95 | + } |
| 96 | +} |
| 97 | + |
| 98 | +class QueryRpc { |
| 99 | + /** |
| 100 | + * 调用远程的商品信息查询接口 |
| 101 | + * |
| 102 | + * @param code 商品编码 |
| 103 | + * @return 返回商品信息,map格式 |
| 104 | + */ |
| 105 | + public HashMap<String, Object> queryCommodityByCode(String code) { |
| 106 | + try { |
| 107 | + Thread.sleep(50L); |
| 108 | + } catch (InterruptedException e) { |
| 109 | + e.printStackTrace(); |
| 110 | + } |
| 111 | + HashMap<String, Object> hashMap = new HashMap<>(); |
| 112 | + hashMap.put("commodityId", new Random().nextInt(999999999)); |
| 113 | + hashMap.put("code", code); |
| 114 | + hashMap.put("phone", "huawei"); |
| 115 | + hashMap.put("isOk", "true"); |
| 116 | + hashMap.put("price", "4000"); |
| 117 | + return hashMap; |
| 118 | + } |
| 119 | + |
| 120 | + /** |
| 121 | + * 批量查询 - 调用远程的商品信息查询接口 |
| 122 | + * |
| 123 | + * @param codes 多个商品编码 |
| 124 | + * @return 返回多个商品信息 |
| 125 | + */ |
| 126 | + public List<Map<String, Object>> queryCommodityByCodeBatch(List<String> codes) { |
| 127 | + List<Map<String, Object>> result = new ArrayList<>(); |
| 128 | + for (String code : codes) { |
| 129 | + HashMap<String, Object> hashMap = new HashMap<>(); |
| 130 | + hashMap.put("commodityId", new Random().nextInt(999999999)); |
| 131 | + hashMap.put("code", code); |
| 132 | + hashMap.put("phone", "huawei"); |
| 133 | + hashMap.put("isOk", "true"); |
| 134 | + hashMap.put("price", "4000"); |
| 135 | + result.add(hashMap); |
| 136 | + } |
| 137 | + return result; |
| 138 | + } |
| 139 | +} |
| 140 | + |
| 141 | +// 请求包装类 |
| 142 | +class Request { |
| 143 | + String commodityCode; |
| 144 | + CompletableFuture<Map<String, Object>> future; // 接受结果 |
| 145 | + |
| 146 | + public String getCommodityCode() { |
| 147 | + return commodityCode; |
| 148 | + } |
| 149 | + |
| 150 | + public void setCommodityCode(String commodityCode) { |
| 151 | + this.commodityCode = commodityCode; |
| 152 | + } |
| 153 | + |
| 154 | + public CompletableFuture<Map<String, Object>> getFuture() { |
| 155 | + return future; |
| 156 | + } |
| 157 | + |
| 158 | + public void setFuture(CompletableFuture<Map<String, Object>> future) { |
| 159 | + this.future = future; |
| 160 | + } |
| 161 | +} |
0 commit comments