All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
as_node.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2025 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19#include <aerospike/as_atomic.h>
20#include <aerospike/as_config.h>
22#include <aerospike/as_error.h>
23#include <aerospike/as_event.h>
25#include <aerospike/as_socket.h>
27#include <aerospike/as_queue.h>
28#include <aerospike/as_vector.h>
30
31#if !defined(_MSC_VER)
32#include <netinet/in.h>
33#include <sys/uio.h>
34#endif
35
36#ifdef __cplusplus
37extern "C" {
38#endif
39
40//---------------------------------
41// Macros
42//---------------------------------
43
44/**
45 * Maximum size (including NULL byte) of a hostname.
46 */
47#define AS_HOSTNAME_SIZE 256
48
49/**
50 * Maximum size of node name
51 */
52#define AS_NODE_NAME_SIZE 20
53
54// Leave this is in for backwards compatibility.
55#define AS_NODE_NAME_MAX_SIZE AS_NODE_NAME_SIZE
56
57#define AS_FEATURES_PARTITION_SCAN (1 << 0)
58#define AS_FEATURES_QUERY_SHOW (1 << 1)
59#define AS_FEATURES_BATCH_ANY (1 << 2)
60#define AS_FEATURES_PARTITION_QUERY (1 << 3)
61
62#define AS_ADDRESS4_MAX 4
63#define AS_ADDRESS6_MAX 8
64
65//---------------------------------
66// Types
67//---------------------------------
68
69/**
70 * Socket address information.
71 */
72typedef struct as_address_s {
73 /**
74 * Socket IP address.
75 */
76 struct sockaddr_storage addr;
77
78 /**
79 * Socket IP address string representation including port.
80 */
82
84
85/**
86 * @private
87 * Rack.
88 */
89typedef struct as_rack_s {
90 /**
91 * Namespace
92 */
94
95 /**
96 * Rack ID
97 */
99
100} as_rack;
101
102/**
103 * @private
104 * Racks.
105 */
106typedef struct as_racks_s {
107 /**
108 * Reference count of racks array.
109 */
110 uint32_t ref_count;
111
112 /**
113 * Rack ID when all namespaces use same rack.
114 */
116
117 /**
118 * Length of racks array.
119 */
120 uint32_t size;
121
122 /**
123 * Pad to 8 byte boundary.
124 */
125 uint32_t pad;
126
127 /**
128 * Racks array.
129 */
130 as_rack racks[];
131
132} as_racks;
133
134/**
135 * @private
136 * Session info.
137 */
138typedef struct as_session_s {
139 /**
140 * Reference count of session.
141 */
142 uint32_t ref_count;
143
144 /**
145 * Session token length.
146 */
147 uint32_t token_length;
148
149 /**
150 * Session expiration for this node.
151 */
152 uint64_t expiration;
153
154 /**
155 * Session token for this node.
156 */
157 uint8_t token[];
158
159} as_session;
160
161/**
162 * @private
163 * Async connection pool.
164 */
165typedef struct as_async_conn_pool_s {
166 /**
167 * Async connection queue.
168 */
170
171 /**
172 * Min connections allowed for this pool.
173 */
174 uint32_t min_size;
175
176 /**
177 * Max connections allowed for this pool.
178 */
179 uint32_t limit;
180
181 /**
182 * Total async connections opened.
183 */
184 uint32_t opened;
185
186 /**
187 * Total async connections closed.
188 */
189 uint32_t closed;
190
192
193/**
194 * Namespace metrics.
195 */
196typedef struct {
197 /**
198 * Namespace.
199 */
200 const char* ns;
201
202 /**
203 * Bytes received from the server.
204 */
205 uint64_t bytes_in;
206
207 /**
208 * Bytes sent to the server.
209 */
210 uint64_t bytes_out;
211
212 /**
213 * Command error count since node was initialized. If the error is retryable, multiple errors per
214 * command may occur.
215 */
216 uint64_t error_count;
217
218 /**
219 * Command timeout count since node was initialized. If the timeout is retryable (ie socketTimeout),
220 * multiple timeouts per command may occur.
221 */
223
224 /**
225 * Command key busy error count since node was initialized.
226 */
228
229 /**
230 * Latency histograms.
231 */
233
235
236struct as_cluster_s;
237
238/**
239 * Server node representation.
240 */
241typedef struct as_node_s {
242 /**
243 * Reference count of node.
244 */
245 uint32_t ref_count;
246
247 /**
248 * Reference count of node in partition maps.
249 */
251
252 /**
253 * Server's generation count for partition management.
254 */
256
257 /**
258 * Features supported by server. Stored in bitmap.
259 */
260 uint32_t features;
261
262 /**
263 * Node version.
264 */
266
267 /**
268 * TLS certificate name (needed for TLS only, NULL otherwise).
269 */
270 char* tls_name;
271
272 /**
273 * The name of the node.
274 */
276
277 /**
278 * Primary address index into addresses array.
279 */
281
282 /**
283 * Number of IPv4 addresses.
284 */
286
287 /**
288 * Number of IPv6 addresses.
289 */
291
292 /**
293 * Array of IP addresses. Not thread-safe.
294 */
296
297 /**
298 * Optional hostname. Not thread-safe.
299 */
300 char* hostname;
301
302 /**
303 * Cluster from which this node resides.
304 */
305 struct as_cluster_s* cluster;
306
307 /**
308 * Pools of current, cached sockets.
309 */
311
312 /**
313 * Array of connection pools used in async commands. There is one pool per node/event loop.
314 * Only used by event loop threads. Not thread-safe.
315 */
317
318 /**
319 * Pool of connections used in pipelined async commands. Also not thread-safe.
320 */
322
323 /**
324 * Authentication session.
325 */
327
328 /**
329 * Racks data.
330 */
332
333 /**
334 * Socket used exclusively for cluster tend thread info requests.
335 */
337
338 /**
339 * Connection queue iterator. Not atomic by design.
340 */
341 uint32_t conn_iter;
342
343 /**
344 * Total sync connections opened.
345 */
347
348 /**
349 * Total sync connections closed.
350 */
352
353 /**
354 * Error count for this node's error_rate_window.
355 */
356 uint32_t error_rate;
357
358 /**
359 * Max errors per node per error_rate_window.
360 */
362
363 /**
364 * Server's generation count for peers.
365 */
367
368 /**
369 * Number of peers returned by server node.
370 */
371 uint32_t peers_count;
372
373 /**
374 * Server's generation count for partition rebalancing.
375 */
377
378 /**
379 * Number of other nodes that consider this node a member of the cluster.
380 */
381 uint32_t friends;
382
383 /**
384 * Number of consecutive info request failures.
385 */
386 uint32_t failures;
387
388 /**
389 * Shared memory node array index.
390 */
391 uint32_t index;
392
393 /**
394 * Node/Namespace metrics.
395 */
397
398 /**
399 * Number of metrics namespace entries.
400 */
402
403 /**
404 * Should user login to avoid session expiration.
405 */
407
408 /**
409 * Is node currently active.
410 */
411 uint8_t active;
412
413 /**
414 * Did partition change in current cluster tend.
415 */
417
418 /**
419 * Did rebalance generation change in current cluster tend.
420 */
422
423 /**
424 * Should user-agent-set info command be retried.
425 */
427
428} as_node;
429
430/**
431 * @private
432 * Node discovery information.
433 */
434typedef struct as_node_info_s {
435 /**
436 * Node name.
437 */
439
440 /**
441 * Features supported by server. Stored in bitmap.
442 */
443 uint32_t features;
444
445 /**
446 * Host.
447 */
449
450 /**
451 * Validated socket.
452 */
454
455 /**
456 * Socket address.
457 */
458 struct sockaddr_storage addr;
459
460 /**
461 * Authentication session.
462 */
464
465 /**
466 * Node version.
467 */
469
471
472//---------------------------------
473// Functions
474//---------------------------------
475
476/**
477 * @private
478 * Create new cluster node.
479 */
480as_node*
481as_node_create(struct as_cluster_s* cluster, as_node_info* node_info);
482
483/**
484 * @private
485 * Close all connections in pool and free resources.
486 */
487AS_EXTERN void
489
490/**
491 * @private
492 * Create configured minimum number of connections.
493 */
494void
496
497/**
498 * @private
499 * Check if node is active from a command thread.
500 */
501static inline bool
503{
504 return (bool)as_load_uint8_acq(&node->active);
505}
506
507/**
508 * @private
509 * Set node to inactive.
510 */
511static inline void
513{
514 // Make volatile write so changes are reflected in other threads.
515 as_store_uint8_rls(&node->active, false);
516}
517
518/**
519 * @private
520 * Read volatile node.
521 */
522static inline as_node*
524{
525 return (as_node*)as_load_ptr((void* const*)node);
526}
527
528/**
529 * @private
530 * Reserve existing cluster node.
531 */
532static inline void
534{
536}
537
538/**
539 * @private
540 * Set volatile node.
541 */
542static inline void
544{
545 as_store_ptr_rls((void**)trg, src);
546}
547
548/**
549 * @private
550 * Release existing cluster node.
551 */
552static inline void
554{
555 if (as_aaf_uint32_rls(&node->ref_count, -1) == 0) {
556 as_fence_acq();
557 as_node_destroy(node);
558 }
559}
560
561/**
562 * @private
563 * Release node on next cluster tend iteration.
564 */
565void
567
568/**
569 * @private
570 * Add socket address to node addresses.
571 */
572void
573as_node_add_address(as_node* node, struct sockaddr* addr);
574
575/**
576 * @private
577 * Set hostname.
578 */
579void
580as_node_set_hostname(as_node* node, const char* hostname);
581
582/**
583 * Get primary socket address.
584 */
585static inline as_address*
587{
588 return &node->addresses[node->address_index];
589}
590
591/**
592 * Get socket address as a string.
593 */
594static inline const char*
596{
597 return node->addresses[node->address_index].name;
598}
599
600/**
601 * @private
602 * Attempt to authenticate given current cluster's user and password.
603 */
605as_node_authenticate_connection(struct as_cluster_s* cluster, uint64_t deadline_ms);
606
607/**
608 * @private
609 * Get a connection to the given node from pool and validate. Return 0 on success.
610 */
613 as_error* err, as_node* node, const char* ns, uint32_t socket_timeout, uint64_t deadline_ms,
614 as_socket* sock
615 );
616
617/**
618 * @private
619 * Close a node's connection and update node/pool statistics.
620 */
621static inline void
628
629/**
630 * @private
631 * Close a node's connection and update node statistics.
632 */
633static inline void
639
640/**
641 * @private
642 * Put connection back into pool.
643 */
644static inline void
646{
647 // Save pool.
648 as_conn_pool* pool = sock->pool;
649
650 // Update last used timestamp.
651 sock->last_used = cf_getns();
652
653 // Put into pool.
654 if (! as_conn_pool_push_head(pool, sock)) {
655 as_node_close_connection(node, sock, pool);
656 }
657}
658
659/**
660 * @private
661 * Balance sync connections.
662 */
663void
665
666/**
667 * @private
668 * Are hosts equal.
669 */
670static inline bool
672{
673 return strcmp(h1->name, h2->name) == 0 && h1->port == h2->port;
674}
675
676/**
677 * @private
678 * Destroy node_info contents.
679 */
680static inline void
682{
683 as_socket_close(&node_info->socket);
684 cf_free(node_info->session);
685}
686
687/**
688 * @private
689 * Tell tend thread to perform another node login.
690 */
691void
693
694/**
695 * @private
696 * Does node contain rack.
697 */
698bool
699as_node_has_rack(as_node* node, const char* ns, int rack_id);
700
701/**
702 * @private
703 * Return as_ns_metrics for specified node and namespace.
704 */
706as_node_prepare_metrics(as_node* node, const char* ns);
707
708/**
709 * @private
710 * Record latency of type latency_type for node
711 */
712void
713as_node_add_latency(as_ns_metrics* metrics, as_latency_type latency_type, uint64_t elapsed_nanos);
714
715struct as_metrics_policy_s;
716
717/**
718 * @private
719 * Enable metrics at the node level
720 */
721void
722as_node_enable_metrics(as_node* node, const struct as_metrics_policy_s* policy);
723
724/**
725 * Add bytes received metrics to node/namespace.
726 */
727static inline void
728as_node_add_bytes_in(as_ns_metrics* metrics, uint64_t bytes_in)
729{
730 as_add_uint64(&metrics->bytes_in, bytes_in);
731}
732
733/**
734 * Return bytes received from the server. The value is cumulative and not reset per metrics interval.
735 */
736static inline uint64_t
738{
739 return as_load_uint64(&metrics->bytes_in);
740}
741
742/**
743 * Add bytes sent metrics to node/namespace.
744 */
745static inline void
746as_node_add_bytes_out(as_ns_metrics* metrics, uint64_t bytes_out)
747{
748 as_add_uint64(&metrics->bytes_out, bytes_out);
749}
750
751/**
752 * Return bytes sent to the server. The value is cumulative and not reset per metrics interval.
753 */
754static inline uint64_t
756{
757 return as_load_uint64(&metrics->bytes_out);
758}
759
760/**
761 * Return command error count. The value is cumulative and not reset per metrics interval.
762 */
763static inline uint64_t
765{
766 return as_load_uint64(&metrics->error_count);
767}
768
769/**
770 * Increment command error count. If the error is retryable, multiple errors per
771 * command may occur.
772 */
773void
774as_node_add_error(as_node* node, const char* ns, as_ns_metrics* metrics);
775
776/**
777 * Return command timeout count. The value is cumulative and not reset per metrics interval.
778 */
779static inline uint64_t
781{
782 return as_load_uint64(&metrics->timeout_count);
783}
784
785/**
786 * Increment command timeout count. If the timeout is retryable (ie socketTimeout),
787 * multiple timeouts per command may occur.
788 */
789void
790as_node_add_timeout(as_node* node, const char* ns, as_ns_metrics* metrics);
791
792/**
793 * Return command key busy error count. The value is cumulative and not reset per metrics interval.
794 */
795static inline uint64_t
800
801/**
802 * Increment command key busy error count.
803 */
804void
805as_node_add_key_busy(as_node* node, const char* ns, as_ns_metrics* metrics);
806
807/**
808 * @private
809 * Validate node's error rate.
810 */
811bool
813
814/**
815 * @private
816 * Reset node's error count.
817 */
818void
820
821/**
822 * @private
823 * Get node's error count.
824 */
825static inline uint32_t
827{
828 return as_load_uint32(&node->error_rate);
829}
830
831/**
832 * @private
833 * Volatile read session pointer.
834 */
835static inline as_session*
837{
838 return (as_session*)as_load_ptr((void* const*)session);
839}
840
841/**
842 * @private
843 * Release existing session.
844 */
845static inline void
847{
848 if (as_aaf_uint32_rls(&session->ref_count, -1) == 0) {
849 cf_free(session);
850 }
851}
852
853#ifdef __cplusplus
854} // end extern "C"
855#endif
#define AS_IP_ADDRESS_SIZE
Definition as_address.h:32
#define as_fence_acq()
#define as_load_ptr(_target)
#define as_incr_uint32(_target)
#define as_store_ptr_rls(_target, _value)
#define as_store_uint8_rls(_target, _value)
#define as_add_uint64(_target, _value)
#define as_load_uint8_acq(_target)
#define as_load_uint64(_target)
#define as_aaf_uint32_rls(_target, _value)
#define as_load_uint32(_target)
static bool as_conn_pool_push_head(as_conn_pool *pool, as_socket *sock)
static void as_conn_pool_decr(as_conn_pool *pool)
#define AS_LATENCY_TYPE_MAX
Definition as_latency.h:38
uint8_t as_latency_type
Definition as_latency.h:30
AS_EXTERN void as_node_destroy(as_node *node)
static void as_node_deactivate(as_node *node)
Definition as_node.h:512
static void as_node_info_destroy(as_node_info *node_info)
Definition as_node.h:681
static uint64_t as_node_get_key_busy_count(as_ns_metrics *metrics)
Definition as_node.h:796
static uint64_t as_node_get_bytes_in(as_ns_metrics *metrics)
Definition as_node.h:737
void as_node_add_key_busy(as_node *node, const char *ns, as_ns_metrics *metrics)
void as_node_balance_connections(as_node *node)
static as_session * as_session_load(as_session **session)
Definition as_node.h:836
void as_node_add_latency(as_ns_metrics *metrics, as_latency_type latency_type, uint64_t elapsed_nanos)
bool as_node_valid_error_rate(as_node *node)
void as_node_create_min_connections(as_node *node)
static uint64_t as_node_get_bytes_out(as_ns_metrics *metrics)
Definition as_node.h:755
bool as_node_has_rack(as_node *node, const char *ns, int rack_id)
static void as_session_release(as_session *session)
Definition as_node.h:846
static as_address * as_node_get_address(as_node *node)
Definition as_node.h:586
static void as_node_close_connection(as_node *node, as_socket *sock, as_conn_pool *pool)
Definition as_node.h:622
static void as_node_add_bytes_in(as_ns_metrics *metrics, uint64_t bytes_in)
Definition as_node.h:728
as_status as_node_get_connection(as_error *err, as_node *node, const char *ns, uint32_t socket_timeout, uint64_t deadline_ms, as_socket *sock)
as_node * as_node_create(struct as_cluster_s *cluster, as_node_info *node_info)
static bool as_host_equals(as_host *h1, as_host *h2)
Definition as_node.h:671
static void as_node_store(as_node **trg, as_node *src)
Definition as_node.h:543
static bool as_node_is_active(const as_node *node)
Definition as_node.h:502
as_status as_node_authenticate_connection(struct as_cluster_s *cluster, uint64_t deadline_ms)
static uint32_t as_node_get_error_rate(as_node *node)
Definition as_node.h:826
static const char * as_node_get_address_string(as_node *node)
Definition as_node.h:595
void as_node_add_error(as_node *node, const char *ns, as_ns_metrics *metrics)
void as_node_add_address(as_node *node, struct sockaddr *addr)
static uint64_t as_node_get_timeout_count(as_ns_metrics *metrics)
Definition as_node.h:780
static void as_node_close_socket(as_node *node, as_socket *sock)
Definition as_node.h:634
static as_node * as_node_load(as_node **node)
Definition as_node.h:523
static void as_node_add_bytes_out(as_ns_metrics *metrics, uint64_t bytes_out)
Definition as_node.h:746
#define AS_NODE_NAME_SIZE
Definition as_node.h:52
static uint64_t as_node_get_error_count(as_ns_metrics *metrics)
Definition as_node.h:764
void as_node_add_timeout(as_node *node, const char *ns, as_ns_metrics *metrics)
void as_node_release_delayed(as_node *node)
void as_node_enable_metrics(as_node *node, const struct as_metrics_policy_s *policy)
as_ns_metrics * as_node_prepare_metrics(as_node *node, const char *ns)
void as_node_reset_error_rate(as_node *node)
static void as_node_put_connection(as_node *node, as_socket *sock)
Definition as_node.h:645
static void as_node_reserve(as_node *node)
Definition as_node.h:533
void as_node_set_hostname(as_node *node, const char *hostname)
void as_node_signal_login(as_node *node)
static void as_node_release(as_node *node)
Definition as_node.h:553
#define AS_MAX_NAMESPACE_SIZE
void as_socket_close(as_socket *sock)
as_status
Definition as_status.h:30
#define AS_EXTERN
Definition as_std.h:25
char name[AS_IP_ADDRESS_SIZE]
Definition as_node.h:81
uint32_t min_size
Definition as_node.h:174
uint32_t opened
Definition as_node.h:184
uint32_t closed
Definition as_node.h:189
char * name
Definition as_host.h:37
uint16_t port
Definition as_host.h:47
as_version version
Definition as_node.h:468
as_host host
Definition as_node.h:448
as_session * session
Definition as_node.h:463
as_socket socket
Definition as_node.h:453
uint32_t features
Definition as_node.h:443
as_session * session
Definition as_node.h:326
bool retry_user_agent
Definition as_node.h:426
uint32_t peers_count
Definition as_node.h:371
bool rebalance_changed
Definition as_node.h:421
as_conn_pool * sync_conn_pools
Definition as_node.h:310
uint32_t ref_count
Definition as_node.h:245
uint32_t address4_size
Definition as_node.h:285
struct as_cluster_s * cluster
Definition as_node.h:305
uint32_t partition_generation
Definition as_node.h:255
uint32_t failures
Definition as_node.h:386
uint32_t peers_generation
Definition as_node.h:366
uint32_t address_index
Definition as_node.h:280
as_address * addresses
Definition as_node.h:295
as_version version
Definition as_node.h:265
uint32_t friends
Definition as_node.h:381
as_async_conn_pool * pipe_conn_pools
Definition as_node.h:321
uint32_t max_error_rate
Definition as_node.h:361
uint32_t sync_conns_opened
Definition as_node.h:346
char * hostname
Definition as_node.h:300
uint32_t index
Definition as_node.h:391
uint32_t conn_iter
Definition as_node.h:341
uint32_t sync_conns_closed
Definition as_node.h:351
as_async_conn_pool * async_conn_pools
Definition as_node.h:316
char * tls_name
Definition as_node.h:270
uint8_t active
Definition as_node.h:411
uint8_t perform_login
Definition as_node.h:406
uint32_t address6_size
Definition as_node.h:290
as_ns_metrics ** metrics
Definition as_node.h:396
uint32_t features
Definition as_node.h:260
uint8_t metrics_size
Definition as_node.h:401
uint32_t error_rate
Definition as_node.h:356
as_racks * racks
Definition as_node.h:331
uint32_t partition_ref_count
Definition as_node.h:250
uint32_t rebalance_generation
Definition as_node.h:376
bool partition_changed
Definition as_node.h:416
as_socket info_socket
Definition as_node.h:336
uint64_t timeout_count
Definition as_node.h:222
uint64_t bytes_in
Definition as_node.h:205
uint64_t key_busy_count
Definition as_node.h:227
uint64_t error_count
Definition as_node.h:216
const char * ns
Definition as_node.h:200
uint64_t bytes_out
Definition as_node.h:210
int rack_id
Definition as_node.h:98
uint32_t ref_count
Definition as_node.h:110
uint32_t pad
Definition as_node.h:125
uint32_t size
Definition as_node.h:120
int rack_id
Definition as_node.h:115
uint32_t ref_count
Definition as_node.h:142
uint32_t token_length
Definition as_node.h:147
uint64_t expiration
Definition as_node.h:152
struct as_conn_pool_s * pool
Definition as_socket.h:87
uint64_t last_used
Definition as_socket.h:88