pacemaker  2.0.4-2deceaa3ae
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2020 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21 
22 #include <qb/qbipcc.h>
23 #include <qb/qbutil.h>
24 
25 #include <corosync/corodefs.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/hdb.h>
28 #include <corosync/cpg.h>
29 
30 #include <crm/msg_xml.h>
31 
32 #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
33 
34 cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
35 
36 static bool cpg_evicted = FALSE;
37 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
38 
39 #define cs_repeat(counter, max, code) do { \
40  code; \
41  if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
42  counter++; \
43  crm_debug("Retrying operation after %ds", counter); \
44  sleep(counter); \
45  } else { \
46  break; \
47  } \
48  } while(counter < max)
49 
50 void
52 {
53  pcmk_cpg_handle = 0;
54  if (cluster->cpg_handle) {
55  crm_trace("Disconnecting CPG");
56  cpg_leave(cluster->cpg_handle, &cluster->group);
57  cpg_finalize(cluster->cpg_handle);
58  cluster->cpg_handle = 0;
59 
60  } else {
61  crm_info("No CPG connection");
62  }
63 }
64 
65 uint32_t get_local_nodeid(cpg_handle_t handle)
66 {
67  cs_error_t rc = CS_OK;
68  int retries = 0;
69  static uint32_t local_nodeid = 0;
70  cpg_handle_t local_handle = handle;
71  cpg_callbacks_t cb = { };
72  int fd = -1;
73  uid_t found_uid = 0;
74  gid_t found_gid = 0;
75  pid_t found_pid = 0;
76  int rv;
77 
78  if(local_nodeid != 0) {
79  return local_nodeid;
80  }
81 
82  if(handle == 0) {
83  crm_trace("Creating connection");
84  cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
85  if (rc != CS_OK) {
86  crm_err("Could not connect to the CPG API: %s (%d)",
87  cs_strerror(rc), rc);
88  return 0;
89  }
90 
91  rc = cpg_fd_get(local_handle, &fd);
92  if (rc != CS_OK) {
93  crm_err("Could not obtain the CPG API connection: %s (%d)",
94  cs_strerror(rc), rc);
95  goto bail;
96  }
97 
98  /* CPG provider run as root (in given user namespace, anyway)? */
99  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
100  &found_uid, &found_gid))) {
101  crm_err("CPG provider is not authentic:"
102  " process %lld (uid: %lld, gid: %lld)",
103  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
104  (long long) found_uid, (long long) found_gid);
105  goto bail;
106  } else if (rv < 0) {
107  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
108  strerror(-rv), -rv);
109  goto bail;
110  }
111  }
112 
113  if (rc == CS_OK) {
114  retries = 0;
115  crm_trace("Performing lookup");
116  cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
117  }
118 
119  if (rc != CS_OK) {
120  crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
121  }
122 
123 bail:
124  if(handle == 0) {
125  crm_trace("Closing connection");
126  cpg_finalize(local_handle);
127  }
128  crm_debug("Local nodeid is %u", local_nodeid);
129  return local_nodeid;
130 }
131 
132 
135 
136 static ssize_t crm_cs_flush(gpointer data);
137 
138 static gboolean
139 crm_cs_flush_cb(gpointer data)
140 {
141  cs_message_timer = 0;
142  crm_cs_flush(data);
143  return FALSE;
144 }
145 
146 #define CS_SEND_MAX 200
147 static ssize_t
148 crm_cs_flush(gpointer data)
149 {
150  int sent = 0;
151  ssize_t rc = 0;
152  int queue_len = 0;
153  static unsigned int last_sent = 0;
154  cpg_handle_t *handle = (cpg_handle_t *)data;
155 
156  if (*handle == 0) {
157  crm_trace("Connection is dead");
158  return pcmk_ok;
159  }
160 
161  queue_len = g_list_length(cs_message_queue);
162  if ((queue_len % 1000) == 0 && queue_len > 1) {
163  crm_err("CPG queue has grown to %d", queue_len);
164 
165  } else if (queue_len == CS_SEND_MAX) {
166  crm_warn("CPG queue has grown to %d", queue_len);
167  }
168 
169  if (cs_message_timer) {
170  /* There is already a timer, wait until it goes off */
171  crm_trace("Timer active %d", cs_message_timer);
172  return pcmk_ok;
173  }
174 
175  while (cs_message_queue && sent < CS_SEND_MAX) {
176  struct iovec *iov = cs_message_queue->data;
177 
178  errno = 0;
179  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
180 
181  if (rc != CS_OK) {
182  break;
183  }
184 
185  sent++;
186  last_sent++;
187  crm_trace("CPG message sent, size=%llu",
188  (unsigned long long) iov->iov_len);
189 
190  cs_message_queue = g_list_remove(cs_message_queue, iov);
191  free(iov->iov_base);
192  free(iov);
193  }
194 
195  queue_len -= sent;
196  if (sent > 1 || cs_message_queue) {
197  crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
198  sent, queue_len, last_sent, ais_error2text(rc),
199  (long long) rc);
200  } else {
201  crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
202  sent, queue_len, last_sent, ais_error2text(rc),
203  (long long) rc);
204  }
205 
206  if (cs_message_queue) {
207  uint32_t delay_ms = 100;
208  if(rc != CS_OK) {
209  /* Proportionally more if sending failed but cap at 1s */
210  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
211  }
212  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
213  }
214 
215  return rc;
216 }
217 
218 gboolean
219 send_cpg_iov(struct iovec * iov)
220 {
221  static unsigned int queued = 0;
222 
223  queued++;
224  crm_trace("Queueing CPG message %u (%llu bytes)",
225  queued, (unsigned long long) iov->iov_len);
226  cs_message_queue = g_list_append(cs_message_queue, iov);
227  crm_cs_flush(&pcmk_cpg_handle);
228  return TRUE;
229 }
230 
231 static int
232 pcmk_cpg_dispatch(gpointer user_data)
233 {
234  int rc = 0;
235  crm_cluster_t *cluster = (crm_cluster_t*) user_data;
236 
237  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
238  if (rc != CS_OK) {
239  crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
240  cpg_finalize(cluster->cpg_handle);
241  cluster->cpg_handle = 0;
242  return -1;
243 
244  } else if(cpg_evicted) {
245  crm_err("Evicted from CPG membership");
246  return -1;
247  }
248  return 0;
249 }
250 
251 char *
252 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
253  uint32_t *kind, const char **from)
254 {
255  char *data = NULL;
256  AIS_Message *msg = (AIS_Message *) content;
257 
258  if(handle) {
259  // Do filtering and field massaging
260  uint32_t local_nodeid = get_local_nodeid(handle);
261  const char *local_name = get_local_node_name();
262 
263  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
264  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
265  return NULL;
266 
267  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
268  /* Not for us */
269  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
270  return NULL;
271  } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
272  /* Not for us */
273  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
274  return NULL;
275  }
276 
277  msg->sender.id = nodeid;
278  if (msg->sender.size == 0) {
279  crm_node_t *peer = crm_get_peer(nodeid, NULL);
280 
281  if (peer == NULL) {
282  crm_err("Peer with nodeid=%u is unknown", nodeid);
283 
284  } else if (peer->uname == NULL) {
285  crm_err("No uname for peer with nodeid=%u", nodeid);
286 
287  } else {
288  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
289  msg->sender.size = strlen(peer->uname);
290  memset(msg->sender.uname, 0, MAX_NAME);
291  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
292  }
293  }
294  }
295 
296  crm_trace("Got new%s message (size=%d, %d, %d)",
297  msg->is_compressed ? " compressed" : "",
298  ais_data_len(msg), msg->size, msg->compressed_size);
299 
300  if (kind != NULL) {
301  *kind = msg->header.id;
302  }
303  if (from != NULL) {
304  *from = msg->sender.uname;
305  }
306 
307  if (msg->is_compressed && msg->size > 0) {
308  int rc = BZ_OK;
309  char *uncompressed = NULL;
310  unsigned int new_size = msg->size + 1;
311 
312  if (check_message_sanity(msg, NULL) == FALSE) {
313  goto badmsg;
314  }
315 
316  crm_trace("Decompressing message data");
317  uncompressed = calloc(1, new_size);
318  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
319 
320  if (rc != BZ_OK) {
321  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
322  bz2_strerror(rc), rc);
323  free(uncompressed);
324  goto badmsg;
325  }
326 
327  CRM_ASSERT(rc == BZ_OK);
328  CRM_ASSERT(new_size == msg->size);
329 
330  data = uncompressed;
331 
332  } else if (check_message_sanity(msg, data) == FALSE) {
333  goto badmsg;
334 
335  } else if (safe_str_eq("identify", data)) {
336  char *pid_s = pcmk__getpid_s();
337 
338  send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
339  free(pid_s);
340  return NULL;
341 
342  } else {
343  data = strdup(msg->data);
344  }
345 
346  // Is this necessary?
347  crm_get_peer(msg->sender.id, msg->sender.uname);
348 
349  crm_trace("Payload: %.200s", data);
350  return data;
351 
352  badmsg:
353  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
354  " min=%d, total=%d, size=%d, bz2_size=%d",
355  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
356  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
357  msg->sender.pid, (int)sizeof(AIS_Message),
358  msg->header.size, msg->size, msg->compressed_size);
359 
360  free(data);
361  return NULL;
362 }
363 
364 static int cmp_member_list_nodeid(const void *first,
365  const void *second)
366 {
367  const struct cpg_address *const a = *((const struct cpg_address **) first),
368  *const b = *((const struct cpg_address **) second);
369  if (a->nodeid < b->nodeid) {
370  return -1;
371  } else if (a->nodeid > b->nodeid) {
372  return 1;
373  }
374  /* don't bother with "reason" nor "pid" */
375  return 0;
376 }
377 
378 static const char *
379 cpgreason2str(cpg_reason_t reason)
380 {
381  switch (reason) {
382  case CPG_REASON_JOIN: return " via cpg_join";
383  case CPG_REASON_LEAVE: return " via cpg_leave";
384  case CPG_REASON_NODEDOWN: return " via cluster exit";
385  case CPG_REASON_NODEUP: return " via cluster join";
386  case CPG_REASON_PROCDOWN: return " for unknown reason";
387  default: break;
388  }
389  return "";
390 }
391 
392 static inline const char *
393 peer_name(crm_node_t *peer)
394 {
395  if (peer == NULL) {
396  return "unknown node";
397  } else if (peer->uname == NULL) {
398  return "peer node";
399  } else {
400  return peer->uname;
401  }
402 }
403 
404 void
405 pcmk_cpg_membership(cpg_handle_t handle,
406  const struct cpg_name *groupName,
407  const struct cpg_address *member_list, size_t member_list_entries,
408  const struct cpg_address *left_list, size_t left_list_entries,
409  const struct cpg_address *joined_list, size_t joined_list_entries)
410 {
411  int i;
412  gboolean found = FALSE;
413  static int counter = 0;
414  uint32_t local_nodeid = get_local_nodeid(handle);
415  const struct cpg_address *key, **sorted;
416 
417  sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
418  CRM_ASSERT(sorted != NULL);
419 
420  for (size_t iter = 0; iter < member_list_entries; iter++) {
421  sorted[iter] = member_list + iter;
422  }
423  /* so that the cross-matching multiply-subscribed nodes is then cheap */
424  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
425  cmp_member_list_nodeid);
426 
427  for (i = 0; i < left_list_entries; i++) {
428  crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
429  const struct cpg_address **rival = NULL;
430 
431  /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
432  and not playing by this rule may go wild in case of multiple
433  residual instances of the same pacemaker daemon at the same node
434  -- we must ensure that the possible local rival(s) won't make us
435  cry out and bail (e.g. when they quit themselves), since all the
436  surrounding logic denies this simple fact that the full membership
437  is discriminated also per the PID of the process beside mere node
438  ID (and implicitly, group ID); practically, this will be sound in
439  terms of not preventing progress, since all the CPG joiners are
440  also API end-point carriers, and that's what matters locally
441  (who's the winner);
442  remotely, we will just compare leave_list and member_list and if
443  the left process has its node retained in member_list (under some
444  other PID, anyway) we will just ignore it as well
445  XXX: long-term fix is to establish in-out PID-aware tracking? */
446  if (peer) {
447  key = &left_list[i];
448  rival = bsearch(&key, sorted, member_list_entries,
449  sizeof(const struct cpg_address *),
450  cmp_member_list_nodeid);
451  }
452 
453  if (rival == NULL) {
454  crm_info("Group %s event %d: %s (node %u pid %u) left%s",
455  groupName->value, counter, peer_name(peer),
456  left_list[i].nodeid, left_list[i].pid,
457  cpgreason2str(left_list[i].reason));
458  if (peer) {
459  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg,
460  OFFLINESTATUS);
461  }
462  } else if (left_list[i].nodeid == local_nodeid) {
463  crm_warn("Group %s event %d: duplicate local pid %u left%s",
464  groupName->value, counter,
465  left_list[i].pid, cpgreason2str(left_list[i].reason));
466  } else {
467  crm_warn("Group %s event %d: "
468  "%s (node %u) duplicate pid %u left%s (%u remains)",
469  groupName->value, counter, peer_name(peer),
470  left_list[i].nodeid, left_list[i].pid,
471  cpgreason2str(left_list[i].reason), (*rival)->pid);
472  }
473  }
474  free(sorted);
475  sorted = NULL;
476 
477  for (i = 0; i < joined_list_entries; i++) {
478  crm_info("Group %s event %d: node %u pid %u joined%s",
479  groupName->value, counter, joined_list[i].nodeid,
480  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
481  }
482 
483  for (i = 0; i < member_list_entries; i++) {
484  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
485 
486  if (member_list[i].nodeid == local_nodeid
487  && member_list[i].pid != getpid()) {
488  /* see the note above */
489  crm_warn("Group %s event %d: detected duplicate local pid %u",
490  groupName->value, counter, member_list[i].pid);
491  continue;
492  }
493  crm_info("Group %s event %d: %s (node %u pid %u) is member",
494  groupName->value, counter, peer_name(peer),
495  member_list[i].nodeid, member_list[i].pid);
496 
497  /* If the caller left auto-reaping enabled, this will also update the
498  * state to member.
499  */
500  peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
501 
502  if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
503  /* The node is a CPG member, but we currently think it's not a
504  * cluster member. This is possible only if auto-reaping was
505  * disabled. The node may be joining, and we happened to get the CPG
506  * notification before the quorum notification; or the node may have
507  * just died, and we are processing its final messages; or a bug
508  * has affected the peer cache.
509  */
510  time_t now = time(NULL);
511 
512  if (peer->when_lost == 0) {
513  // Track when we first got into this contradictory state
514  peer->when_lost = now;
515 
516  } else if (now > (peer->when_lost + 60)) {
517  // If it persists for more than a minute, update the state
518  crm_warn("Node %u is member of group %s but was believed offline",
519  member_list[i].nodeid, groupName->value);
520  crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0);
521  }
522  }
523 
524  if (local_nodeid == member_list[i].nodeid) {
525  found = TRUE;
526  }
527  }
528 
529  if (!found) {
530  crm_err("Local node was evicted from group %s", groupName->value);
531  cpg_evicted = TRUE;
532  }
533 
534  counter++;
535 }
536 
537 gboolean
539 {
540  cs_error_t rc;
541  int fd = -1;
542  int retries = 0;
543  uint32_t id = 0;
544  crm_node_t *peer = NULL;
545  cpg_handle_t handle = 0;
546  const char *message_name = pcmk_message_name(crm_system_name);
547  uid_t found_uid = 0;
548  gid_t found_gid = 0;
549  pid_t found_pid = 0;
550  int rv;
551 
552  struct mainloop_fd_callbacks cpg_fd_callbacks = {
553  .dispatch = pcmk_cpg_dispatch,
554  .destroy = cluster->destroy,
555  };
556 
557  cpg_callbacks_t cpg_callbacks = {
558  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
559  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
560  /* .cpg_deliver_fn = pcmk_cpg_deliver, */
561  /* .cpg_confchg_fn = pcmk_cpg_membership, */
562  };
563 
564  cpg_evicted = FALSE;
565  cluster->group.length = 0;
566  cluster->group.value[0] = 0;
567 
568  /* group.value is char[128] */
569  strncpy(cluster->group.value, message_name, 127);
570  cluster->group.value[127] = 0;
571  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
572 
573  cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
574  if (rc != CS_OK) {
575  crm_err("Could not connect to the CPG API: %s (%d)",
576  cs_strerror(rc), rc);
577  goto bail;
578  }
579 
580  rc = cpg_fd_get(handle, &fd);
581  if (rc != CS_OK) {
582  crm_err("Could not obtain the CPG API connection: %s (%d)",
583  cs_strerror(rc), rc);
584  goto bail;
585  }
586 
587  /* CPG provider run as root (in given user namespace, anyway)? */
588  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
589  &found_uid, &found_gid))) {
590  crm_err("CPG provider is not authentic:"
591  " process %lld (uid: %lld, gid: %lld)",
592  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
593  (long long) found_uid, (long long) found_gid);
594  rc = CS_ERR_ACCESS;
595  goto bail;
596  } else if (rv < 0) {
597  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
598  strerror(-rv), -rv);
599  rc = CS_ERR_ACCESS;
600  goto bail;
601  }
602 
603  id = get_local_nodeid(handle);
604  if (id == 0) {
605  crm_err("Could not get local node id from the CPG API");
606  goto bail;
607 
608  }
609  cluster->nodeid = id;
610 
611  retries = 0;
612  cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
613  if (rc != CS_OK) {
614  crm_err("Could not join the CPG group '%s': %d", message_name, rc);
615  goto bail;
616  }
617 
618  pcmk_cpg_handle = handle;
619  cluster->cpg_handle = handle;
620  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
621 
622  bail:
623  if (rc != CS_OK) {
624  cpg_finalize(handle);
625  return FALSE;
626  }
627 
628  peer = crm_get_peer(id, NULL);
629  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
630  return TRUE;
631 }
632 
633 gboolean
634 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
635 {
636  gboolean rc = TRUE;
637  char *data = NULL;
638 
639  data = dump_xml_unformatted(msg);
641  free(data);
642  return rc;
643 }
644 
645 gboolean
646 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
647  gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
648 {
649  static int msg_id = 0;
650  static int local_pid = 0;
651  static int local_name_len = 0;
652  static const char *local_name = NULL;
653 
654  char *target = NULL;
655  struct iovec *iov;
656  AIS_Message *msg = NULL;
658 
659  switch (msg_class) {
660  case crm_class_cluster:
661  break;
662  default:
663  crm_err("Invalid message class: %d", msg_class);
664  return FALSE;
665  }
666 
667  CRM_CHECK(dest != crm_msg_ais, return FALSE);
668 
669  if(local_name == NULL) {
670  local_name = get_local_node_name();
671  }
672  if(local_name_len == 0 && local_name) {
673  local_name_len = strlen(local_name);
674  }
675 
676  if (data == NULL) {
677  data = "";
678  }
679 
680  if (local_pid == 0) {
681  local_pid = getpid();
682  }
683 
684  if (sender == crm_msg_none) {
685  sender = local_pid;
686  }
687 
688  msg = calloc(1, sizeof(AIS_Message));
689 
690  msg_id++;
691  msg->id = msg_id;
692  msg->header.id = msg_class;
693  msg->header.error = CS_OK;
694 
695  msg->host.type = dest;
696  msg->host.local = local;
697 
698  if (node) {
699  if (node->uname) {
700  target = strdup(node->uname);
701  msg->host.size = strlen(node->uname);
702  memset(msg->host.uname, 0, MAX_NAME);
703  memcpy(msg->host.uname, node->uname, msg->host.size);
704  } else {
705  target = crm_strdup_printf("%u", node->id);
706  }
707  msg->host.id = node->id;
708  } else {
709  target = strdup("all");
710  }
711 
712  msg->sender.id = 0;
713  msg->sender.type = sender;
714  msg->sender.pid = local_pid;
715  msg->sender.size = local_name_len;
716  memset(msg->sender.uname, 0, MAX_NAME);
717  if(local_name && msg->sender.size) {
718  memcpy(msg->sender.uname, local_name, msg->sender.size);
719  }
720 
721  msg->size = 1 + strlen(data);
722  msg->header.size = sizeof(AIS_Message) + msg->size;
723 
724  if (msg->size < CRM_BZ2_THRESHOLD) {
725  msg = realloc_safe(msg, msg->header.size);
726  memcpy(msg->data, data, msg->size);
727 
728  } else {
729  char *compressed = NULL;
730  unsigned int new_size = 0;
731  char *uncompressed = strdup(data);
732 
733  if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
734  &compressed, &new_size) == pcmk_rc_ok) {
735 
736  msg->header.size = sizeof(AIS_Message) + new_size;
737  msg = realloc_safe(msg, msg->header.size);
738  memcpy(msg->data, compressed, new_size);
739 
740  msg->is_compressed = TRUE;
741  msg->compressed_size = new_size;
742 
743  } else {
744  // cppcheck seems not to understand the abort logic in realloc_safe
745  // cppcheck-suppress memleak
746  msg = realloc_safe(msg, msg->header.size);
747  memcpy(msg->data, data, msg->size);
748  }
749 
750  free(uncompressed);
751  free(compressed);
752  }
753 
754  iov = calloc(1, sizeof(struct iovec));
755  iov->iov_base = msg;
756  iov->iov_len = msg->header.size;
757 
758  if (msg->compressed_size) {
759  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
760  msg->id, target, (unsigned long long) iov->iov_len,
761  msg->compressed_size, data);
762  } else {
763  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
764  msg->id, target, (unsigned long long) iov->iov_len,
765  msg->size, data);
766  }
767  free(target);
768 
769  send_cpg_iov(iov);
770 
771  return TRUE;
772 }
773 
775 text2msg_type(const char *text)
776 {
777  int type = crm_msg_none;
778 
779  CRM_CHECK(text != NULL, return type);
780  text = pcmk_message_name(text);
781  if (safe_str_eq(text, "ais")) {
782  type = crm_msg_ais;
783  } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
784  type = crm_msg_cib;
785  } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)
786  || safe_str_eq(text, CRM_SYSTEM_DC)) {
787  type = crm_msg_crmd;
788  } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
789  type = crm_msg_te;
790  } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
791  type = crm_msg_pe;
792  } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
793  type = crm_msg_lrmd;
794  } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
796  } else if (safe_str_eq(text, "stonith-ng")) {
798  } else if (safe_str_eq(text, "attrd")) {
800 
801  } else {
802  /* This will normally be a transient client rather than
803  * a cluster daemon. Set the type to the pid of the client
804  */
805  int scan_rc = sscanf(text, "%d", &type);
806 
807  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
808  /* Ensure it's sane */
809  type = crm_msg_none;
810  }
811  }
812  return type;
813 }
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node's state and membership information.
Definition: membership.c:965
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:785
AIS_Host sender
Definition: internal.h:5
#define ais_data_len(msg)
Definition: internal.h:125
enum crm_ais_msg_types type
Definition: internal.h:3
@ crm_proc_cpg
Definition: internal.h:48
crm_node_t * crm_find_peer(unsigned int id, const char *uname)
Definition: membership.c:522
struct crm_ais_msg_s AIS_Message
Definition: internal.h:16
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
Definition: corosync.c:422
char data[0]
Definition: internal.h:10
uint32_t id
Definition: internal.h:0
gboolean local
Definition: internal.h:2
uint32_t pid
Definition: internal.h:1
crm_ais_msg_types
Definition: cluster.h:99
@ crm_msg_stonithd
Definition: cluster.h:106
@ crm_msg_none
Definition: cluster.h:100
@ crm_msg_cib
Definition: cluster.h:103
@ crm_msg_pe
Definition: cluster.h:108
@ crm_msg_attrd
Definition: cluster.h:105
@ crm_msg_ais
Definition: cluster.h:101
@ crm_msg_te
Definition: cluster.h:107
@ crm_msg_stonith_ng
Definition: cluster.h:109
@ crm_msg_crmd
Definition: cluster.h:104
@ crm_msg_lrmd
Definition: cluster.h:102
const char * get_local_node_name(void)
Definition: cluster.c:120
#define CRM_NODE_MEMBER
Definition: cluster.h:34
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Definition: membership.c:653
crm_ais_msg_class
Definition: cluster.h:95
@ crm_class_cluster
Definition: cluster.h:96
const char * pcmk_message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition: utils.c:517
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition: strings.c:541
#define ONLINESTATUS
Definition: util.h:37
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
#define OFFLINESTATUS
Definition: util.h:38
gboolean safe_str_neq(const char *a, const char *b)
Definition: strings.c:263
#define safe_str_eq(a, b)
Definition: util.h:65
gboolean send_cluster_message_cs(xmlNode *msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:634
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:405
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:51
#define CS_SEND_MAX
Definition: cpg.c:146
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:646
int cs_message_timer
Definition: cpg.c:134
gboolean send_cpg_iov(struct iovec *iov)
Definition: cpg.c:219
cpg_handle_t pcmk_cpg_handle
Definition: cpg.c:34
uint32_t get_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:65
GListPtr cs_message_queue
Definition: cpg.c:133
#define cs_repeat(counter, max, code)
Definition: cpg.c:39
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition: cpg.c:252
enum crm_ais_msg_types text2msg_type(const char *text)
Definition: cpg.c:775
gboolean(* pcmk_cpg_dispatch_fn)(int kind, const char *from, const char *data)
Definition: cpg.c:37
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:538
#define CRM_SYSTEM_CIB
Definition: crm.h:102
#define CRM_SYSTEM_CRMD
Definition: crm.h:103
#define CRM_SYSTEM_DC
Definition: crm.h:99
GList * GListPtr
Definition: crm.h:215
#define CRM_SYSTEM_STONITHD
Definition: crm.h:107
#define CRM_SYSTEM_LRMD
Definition: crm.h:104
#define CRM_SYSTEM_TENGINE
Definition: crm.h:106
#define MAX_NAME
Definition: crm.h:61
char * crm_system_name
Definition: utils.c:52
#define CRM_SYSTEM_PENGINE
Definition: crm.h:105
Wrappers for and extensions to libqb IPC.
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process.
Definition: ipc.c:1672
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:34
#define crm_info(fmt, args...)
Definition: logging.h:366
#define crm_warn(fmt, args...)
Definition: logging.h:364
#define CRM_XS
Definition: logging.h:54
#define crm_notice(fmt, args...)
Definition: logging.h:365
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:233
#define crm_debug(fmt, args...)
Definition: logging.h:368
#define crm_err(fmt, args...)
Definition: logging.h:363
#define crm_trace(fmt, args...)
Definition: logging.h:369
Wrappers for and extensions to glib mainloop.
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:881
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:152
int rc
Definition: pcmk_fence.c:34
const char * target
Definition: pcmk_fence.c:28
char * strerror(int errnum)
#define CRM_ASSERT(expr)
Definition: results.h:42
const char * bz2_strerror(int rc)
Definition: results.c:718
@ pcmk_rc_ok
Definition: results.h:141
#define pcmk_ok
Definition: results.h:67
char uname[MAX_NAME]
Definition: internal.h:24
gboolean local
Definition: internal.h:21
uint32_t pid
Definition: internal.h:20
enum crm_ais_msg_types type
Definition: internal.h:22
uint32_t id
Definition: internal.h:19
uint32_t size
Definition: internal.h:23
uint32_t size
Definition: internal.h:36
AIS_Host sender
Definition: internal.h:34
gboolean is_compressed
Definition: internal.h:31
AIS_Host host
Definition: internal.h:33
uint32_t compressed_size
Definition: internal.h:37
uint32_t id
Definition: internal.h:30
char data[0]
Definition: internal.h:39
uint32_t nodeid
Definition: cluster.h:79
void(* destroy)(gpointer)
Definition: cluster.h:81
char * uname
Definition: cluster.h:57
uint32_t id
Definition: cluster.h:65
char * state
Definition: cluster.h:59
time_t when_lost
Definition: cluster.h:66
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:115
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3321
#define CRM_BZ2_THRESHOLD
Definition: xml.h:47