|
1 | 1 | #include <postgres.h> |
2 | | -#include <curl/multi.h> |
3 | | -#include <stdbool.h> |
| 2 | +#include <stddef.h> |
| 3 | +#include <errno.h> |
4 | 4 | #include <unistd.h> |
5 | 5 |
|
6 | | -#include <sys/epoll.h> |
7 | | -#include <sys/timerfd.h> |
8 | | - |
9 | 6 | #include "event.h" |
10 | 7 |
|
| 8 | +#ifdef WAIT_USE_EPOLL |
| 9 | + |
11 | 10 | static int timerfd = 0; |
12 | 11 | static bool timer_created = false; |
13 | 12 |
|
@@ -120,3 +119,113 @@ int get_curl_event(event ev){ |
120 | 119 | int get_socket_fd(event ev){ |
121 | 120 | return ev.data.fd; |
122 | 121 | } |
| 122 | + |
| 123 | +#else |
| 124 | + |
| 125 | +typedef struct { |
| 126 | + curl_socket_t sockfd; |
| 127 | + int action; |
| 128 | +} SocketInfo ; |
| 129 | + |
| 130 | +int inline wait_event(int fd, event *events, size_t maxevents, int wait_milliseconds){ |
| 131 | + return kevent(fd, NULL, 0, events, maxevents, &(struct timespec){.tv_sec = wait_milliseconds/1000}); |
| 132 | +} |
| 133 | + |
| 134 | +int inline event_monitor(){ |
| 135 | + return kqueue(); |
| 136 | +} |
| 137 | + |
| 138 | +void ev_monitor_close(LoopState *lstate){ |
| 139 | + close(lstate->epfd); |
| 140 | +} |
| 141 | + |
| 142 | +int multi_timer_cb(CURLM *multi, long timeout_ms, LoopState *lstate) { |
| 143 | + elog(DEBUG2, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms); |
| 144 | + event timer_event; |
| 145 | + int id = 1; |
| 146 | + |
| 147 | + if (timeout_ms > 0) { |
| 148 | + EV_SET(&timer_event, id, EVFILT_TIMER, EV_ADD, 0, timeout_ms, NULL); //0 means milliseconds (the default) |
| 149 | + } else if (timeout_ms == 0){ |
| 150 | + /* libcurl wants us to timeout now, however setting both fields of |
| 151 | + * new_value.it_value to zero disarms the timer. The closest we can |
| 152 | + * do is to schedule the timer to fire in 1 ns. */ |
| 153 | + EV_SET(&timer_event, id, EVFILT_TIMER, EV_ADD, NOTE_NSECONDS, 1, NULL); |
| 154 | + } else { |
| 155 | + // libcurl passes a -1 to indicate the timer should be deleted |
| 156 | + EV_SET(&timer_event, id, EVFILT_TIMER, EV_DELETE, 0, 0, NULL); |
| 157 | + } |
| 158 | + |
| 159 | + if (kevent(lstate->epfd, &timer_event, 1, NULL, 0, NULL) < 0) { |
| 160 | + int save_errno = errno; |
| 161 | + ereport(ERROR, errmsg("kevent with EVFILT_TIMER failed: %s", strerror(save_errno))); |
| 162 | + } |
| 163 | + |
| 164 | + return 0; |
| 165 | +} |
| 166 | + |
| 167 | +int multi_socket_cb(CURL *easy, curl_socket_t sockfd, int what, LoopState *lstate, void *socketp) { |
| 168 | + static char *whatstrs[] = { "NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT", "CURL_POLL_REMOVE" }; |
| 169 | + elog(DEBUG2, "multi_socket_cb: sockfd %d received %s", sockfd, whatstrs[what]); |
| 170 | + |
| 171 | + SocketInfo *sock_info = (SocketInfo *)socketp; |
| 172 | + struct kevent ev[2]; |
| 173 | + int count = 0; |
| 174 | + |
| 175 | + if (what == CURL_POLL_REMOVE) { |
| 176 | + if (sock_info->action & CURL_POLL_IN) |
| 177 | + EV_SET(&ev[count++], sockfd, EVFILT_READ, EV_DELETE, 0, 0, sock_info); |
| 178 | + |
| 179 | + if (sock_info->action & CURL_POLL_OUT) |
| 180 | + EV_SET(&ev[count++], sockfd, EVFILT_WRITE, EV_DELETE, 0, 0, sock_info); |
| 181 | + |
| 182 | + curl_multi_assign(lstate->curl_mhandle, sockfd, NULL); |
| 183 | + pfree(sock_info); |
| 184 | + } else { |
| 185 | + if (!sock_info) { |
| 186 | + sock_info = palloc(sizeof(SocketInfo)); |
| 187 | + sock_info->sockfd = sockfd; |
| 188 | + sock_info->action = what; |
| 189 | + curl_multi_assign(lstate->curl_mhandle, sockfd, sock_info); |
| 190 | + } |
| 191 | + |
| 192 | + if (what & CURL_POLL_IN) |
| 193 | + EV_SET(&ev[count++], sockfd, EVFILT_READ, EV_ADD, 0, 0, sock_info); |
| 194 | + |
| 195 | + if (what & CURL_POLL_OUT) |
| 196 | + EV_SET(&ev[count++], sockfd, EVFILT_WRITE, EV_ADD, 0, 0, sock_info); |
| 197 | + } |
| 198 | + |
| 199 | + Assert(count <= 2); |
| 200 | + |
| 201 | + if (kevent(lstate->epfd, &ev[0], count, NULL, 0, NULL) < 0) { |
| 202 | + int save_errno = errno; |
| 203 | + ereport(ERROR, errmsg("kevent with %s failed for sockfd %d: %s", whatstrs[what], sockfd, strerror(save_errno))); |
| 204 | + } |
| 205 | + |
| 206 | + return 0; |
| 207 | +} |
| 208 | + |
| 209 | +bool is_timer(event ev){ |
| 210 | + return ev.filter == EVFILT_TIMER; |
| 211 | +} |
| 212 | + |
| 213 | +int get_curl_event(event ev){ |
| 214 | + int ev_bitmask = 0; |
| 215 | + if (ev.filter == EVFILT_READ) |
| 216 | + ev_bitmask |= CURL_CSELECT_IN; |
| 217 | + else if (ev.filter == EVFILT_WRITE) |
| 218 | + ev_bitmask |= CURL_CSELECT_OUT; |
| 219 | + else |
| 220 | + ev_bitmask = CURL_CSELECT_ERR; |
| 221 | + |
| 222 | + return ev_bitmask; |
| 223 | +} |
| 224 | + |
| 225 | +int get_socket_fd(event ev){ |
| 226 | + SocketInfo *sock_info = (SocketInfo *) ev.udata; |
| 227 | + |
| 228 | + return sock_info->sockfd; |
| 229 | +} |
| 230 | + |
| 231 | +#endif |
0 commit comments