25 #include <citrusleaf/cf_ll.h>
28 #if defined(AS_USE_LIBEV)
30 #elif defined(AS_USE_LIBUV)
33 #elif defined(AS_USE_LIBEVENT)
34 #include <event2/event.h>
46 #define AS_ASYNC_STATE_UNREGISTERED 0
47 #define AS_ASYNC_STATE_REGISTERED 1
48 #define AS_ASYNC_STATE_DELAY_QUEUE 2
49 #define AS_ASYNC_STATE_CONNECT 3
50 #define AS_ASYNC_STATE_TLS_CONNECT 4
51 #define AS_ASYNC_STATE_AUTH_WRITE 5
52 #define AS_ASYNC_STATE_AUTH_READ_HEADER 6
53 #define AS_ASYNC_STATE_AUTH_READ_BODY 7
54 #define AS_ASYNC_STATE_COMMAND_WRITE 8
55 #define AS_ASYNC_STATE_COMMAND_READ_HEADER 9
56 #define AS_ASYNC_STATE_COMMAND_READ_BODY 10
57 #define AS_ASYNC_STATE_QUEUE_ERROR 11
58 #define AS_ASYNC_STATE_RETRY 12
60 #define AS_ASYNC_FLAGS_DESERIALIZE 1
61 #define AS_ASYNC_FLAGS_READ 2
62 #define AS_ASYNC_FLAGS_HAS_TIMER 4
63 #define AS_ASYNC_FLAGS_USING_SOCKET_TIMER 8
64 #define AS_ASYNC_FLAGS_EVENT_RECEIVED 16
65 #define AS_ASYNC_FLAGS_FREE_BUF 32
66 #define AS_ASYNC_FLAGS_LINEARIZE 64
67 #define AS_ASYNC_FLAGS_HEAP_REC 128
69 #define AS_ASYNC_AUTH_RETURN_CODE 1
71 #define AS_EVENT_CONNECTION_COMPLETE 0
72 #define AS_EVENT_CONNECTION_PENDING 1
73 #define AS_EVENT_CONNECTION_ERROR 2
75 #define AS_EVENT_QUEUE_INITIAL_CAPACITY 256
81 #if defined(AS_USE_LIBEV)
84 #elif defined(AS_USE_LIBUV)
86 struct as_uv_tls* tls;
93 #elif defined(AS_USE_LIBEVENT)
117 #if defined(AS_USE_LIBEV)
118 struct ev_timer timer;
119 #elif defined(AS_USE_LIBUV)
121 #elif defined(AS_USE_LIBEVENT)
300 #if defined(AS_USE_LIBEV)
302 void as_ev_timer_cb(
struct ev_loop* loop, ev_timer* timer,
int revents);
303 void as_ev_repeat_cb(
struct ev_loop* loop, ev_timer* timer,
int revents);
333 conn->socket.last_used = cf_getns();
339 ev_timer_init(&cmd->timer, as_ev_timer_cb, (
double)timeout / 1000.0, 0.0);
340 cmd->timer.data = cmd;
348 ev_init(&cmd->timer, as_ev_repeat_cb);
349 cmd->timer.repeat = (double)repeat / 1000.0;
350 cmd->timer.data = cmd;
392 #elif defined(AS_USE_LIBUV)
394 void as_uv_timer_cb(uv_timer_t* timer);
395 void as_uv_repeat_cb(uv_timer_t* timer);
416 if (uv_fileno((uv_handle_t*)&conn->socket, &fd) == 0) {
425 conn->last_used = cf_getns();
433 cmd->timer.data = cmd;
435 uv_timer_start(&cmd->timer, as_uv_timer_cb, timeout, 0);
444 cmd->timer.data = cmd;
446 uv_timer_start(&cmd->timer, as_uv_repeat_cb, repeat, repeat);
460 uv_timer_stop(&cmd->timer);
470 uv_read_stop((uv_stream_t*)conn);
476 uv_read_stop((uv_stream_t*)conn);
480 as_uv_timer_closed(uv_handle_t* handle);
487 uv_close((uv_handle_t*)&cmd->timer, as_uv_timer_closed);
498 #elif defined(AS_USE_LIBEVENT)
500 void as_libevent_timer_cb(evutil_socket_t sock,
short events,
void*
udata);
501 void as_libevent_repeat_cb(evutil_socket_t sock,
short events,
void*
udata);
531 conn->socket.last_used = cf_getns();
537 evtimer_assign(&cmd->timer, cmd->
event_loop->
loop, as_libevent_timer_cb, cmd);
539 tv.tv_sec = (uint32_t)timeout / 1000;
540 tv.tv_usec = ((uint32_t)timeout % 1000) * 1000;
541 evtimer_add(&cmd->timer, &tv);
548 event_assign(&cmd->timer, cmd->
event_loop->
loop, -1, EV_PERSIST, as_libevent_repeat_cb, cmd);
550 tv.tv_sec = (uint32_t)repeat / 1000;
551 tv.tv_usec = ((uint32_t)repeat % 1000) * 1000;
552 evtimer_add(&cmd->timer, &tv);
566 evtimer_del(&cmd->timer);
573 event_del(&conn->watcher);
685 cmd->
len =
sizeof(as_proto);
694 as_proto*
proto = (as_proto*)cmd->
buf;
700 cmd->
len = (uint32_t)proto->sz;
718 pool->
limit = max_size;
822 pthread_mutex_destroy(&event_loop->
lock);
as_event_loop * event_loop
as_event_parse_results_fn parse_results
bool as_event_command_parse_success_failure(as_event_command *cmd)
static void as_event_timer_stop(as_event_command *cmd)
void(* as_event_executable)(as_event_loop *event_loop, void *udata)
#define AS_ASYNC_FLAGS_USING_SOCKET_TIMER
static void as_event_command_destroy(as_event_command *cmd)
as_event_state * event_state
static void as_event_release_connection(as_event_connection *conn, as_async_conn_pool *pool)
as_event_executor_complete_fn complete_fn
bool(* as_event_parse_results_fn)(struct as_event_command *cmd)
#define AS_ASYNC_STATE_AUTH_READ_HEADER
as_pipe_listener listener
void as_event_command_write_start(as_event_command *cmd)
AS_EXTERN bool as_queue_init(as_queue *queue, uint32_t item_size, uint32_t capacity)
static bool as_event_socket_retry(as_event_command *cmd)
void as_event_command_free(as_event_command *cmd)
static bool as_event_set_auth_parse_header(as_event_command *cmd)
static void as_event_set_auth_read_header(as_event_command *cmd)
static void as_node_incr_error_count(as_node *node)
bool as_event_command_parse_header(as_event_command *cmd)
static void as_event_command_release(as_event_command *cmd)
void as_event_node_destroy(as_node *node)
void(* as_event_executor_complete_fn)(struct as_event_executor *executor)
void as_event_process_timer(as_event_command *cmd)
void as_event_notify_error(as_event_command *cmd, as_error *err)
static void as_event_release_async_connection(as_event_command *cmd)
AS_EXTERN bool as_queue_push_head(as_queue *queue, const void *ptr)
AS_EXTERN bool as_queue_push(as_queue *queue, const void *ptr)
void as_event_socket_error(as_event_command *cmd, as_error *err)
void as_event_query_complete(as_event_command *cmd)
static void as_event_timer_repeat(as_event_command *cmd, uint64_t repeat)
struct as_event_command ** commands
static bool as_async_conn_pool_push(as_async_conn_pool *pool, as_event_connection *conn)
static void as_event_set_auth_write(as_event_command *cmd, as_session *session)
bool as_event_create_loop(as_event_loop *event_loop)
static bool as_async_conn_pool_push_head(as_async_conn_pool *pool, as_event_connection *conn)
as_event_loop * event_loop
as_status as_event_command_execute(as_event_command *cmd, as_error *err)
as_async_conn_pool * async_conn_pools
int as_socket_validate_fd(as_socket_fd fd)
static void as_event_set_conn_last_used(as_event_connection *conn)
static bool as_event_conn_current_trim(as_event_connection *conn, uint64_t max_socket_idle_ns)
as_event_executable executable
bool as_event_proto_parse_auth(as_event_command *cmd, as_proto *proto)
bool as_event_command_retry(as_event_command *cmd, bool timeout)
void as_event_close_cluster(as_cluster *cluster)
void as_event_connector_success(as_event_command *cmd)
#define AS_ASYNC_STATE_AUTH_READ_BODY
static void as_event_stop_watcher(as_event_command *cmd, as_event_connection *conn)
void(* as_pipe_listener)(void *udata, as_event_loop *event_loop)
static void as_event_set_write(as_event_command *cmd)
void as_event_response_error(as_event_command *cmd, as_error *err)
static void as_event_decr_conn(as_event_command *cmd)
void as_event_execute_retry(as_event_command *cmd)
void as_event_total_timeout(as_event_command *cmd)
static void as_event_loop_destroy(as_event_loop *event_loop)
bool as_event_execute(as_event_loop *event_loop, as_event_executable executable, void *udata)
as_async_conn_pool * pipe_conn_pools
void as_event_connect(as_event_command *cmd, as_async_conn_pool *pool)
void as_event_register_external_loop(as_event_loop *event_loop)
void as_event_command_schedule(as_event_command *cmd)
static void as_event_timer_again(as_event_command *cmd)
static void as_event_close_connection(as_event_connection *conn)
bool as_event_command_parse_info(as_event_command *cmd)
static void as_event_connection_timeout(as_event_command *cmd, as_async_conn_pool *pool)
void as_event_socket_timeout(as_event_command *cmd)
static as_event_loop * as_event_loop_get()
void as_event_executor_cancel(as_event_executor *executor, uint32_t queued_count)
as_event_connection * conn
void as_event_executor_complete(as_event_executor *executor)
void as_event_executor_error(as_event_executor *executor, as_error *err, uint32_t command_count)
uint32_t as_authenticate_set(struct as_cluster_s *cluster, struct as_session_s *session, uint8_t *buffer)
static void as_node_release(as_node *node)
as_pipe_listener pipe_listener
void as_socket_close(as_socket *sock)
as_policy_replica replica
bool as_event_command_parse_result(as_event_command *cmd)
static bool as_socket_current_trim(uint64_t last_used, uint64_t max_socket_idle_ns)
struct as_event_command * cmd
static int as_event_conn_validate(as_event_connection *conn)
static as_event_loop * as_event_assign(as_event_loop *event_loop)
static bool as_event_conn_current_tran(as_event_connection *conn, uint64_t max_socket_idle_ns)
bool as_event_proto_parse(as_event_command *cmd, as_proto *proto)
static void as_event_stop_read(as_event_connection *conn)
void as_event_error_callback(as_event_command *cmd, as_error *err)
AS_EXTERN void as_queue_destroy(as_queue *queue)
static void as_event_timer_once(as_event_command *cmd, uint64_t timeout)
void as_event_parse_error(as_event_command *cmd, as_error *err)
static bool as_async_conn_pool_incr_total(as_async_conn_pool *pool)
void as_event_batch_complete(as_event_command *cmd)
static bool as_socket_current_tran(uint64_t last_used, uint64_t max_socket_idle_ns)
bool as_event_decompress(as_event_command *cmd)
void as_event_create_connections(as_node *node, as_async_conn_pool *pools)
#define AS_ASYNC_FLAGS_HAS_TIMER
static void as_async_conn_pool_init(as_async_conn_pool *pool, uint32_t min_size, uint32_t max_size)
uint32_t command_sent_counter
static void as_queue_decr_total(as_queue *queue)