Kafka之消费与激情
|
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.pollNoWakeup(); } return this.interceptors.onConsume(new ConsumerRecords<>(records)); } final long fetchEnd = time.milliseconds(); elapsedTime += fetchEnd - metadataEnd; } while (elapsedTime < timeoutMs); return ConsumerRecords.empty(); } finally { release(); } } 上述代码中有个方法pollForFetches,它的实现逻辑如下: private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) { final long startMs = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs); // if data is available already, return it immediately final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) { return records; } // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); (编辑:辽源站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- linux – 当我在shell脚本中使用cat命令时,如何在文件末尾保
- 酷狗音乐怎么设置一键强音 手机酷狗音乐一键强音设置方法
- linux – 如何使用automake检查操作系统
- qq好莱坞会员怎么转至微信 qq好莱坞会员转微信账号方法
- 极品飞车17最高通缉修改金钱99999999 无需钛备份和存档
- 罪恶都市十周年即将登入android ios 最新劲爆视频
- nginx的The page you are looking for is temporarily unav
- ireader怎么看本地的书 iReader如何导入本地图书
- CES 2019种草一大波,XESS新品不走寻常路
- linux – 有没有办法一次设置多个ZFS文件系统属性?
