1*4882a593Smuzhiyun // SPDX-License-Identifier: GPL-2.0-only
2*4882a593Smuzhiyun /******************************************************************************
3*4882a593Smuzhiyun *******************************************************************************
4*4882a593Smuzhiyun **
5*4882a593Smuzhiyun ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6*4882a593Smuzhiyun ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
7*4882a593Smuzhiyun **
8*4882a593Smuzhiyun **
9*4882a593Smuzhiyun *******************************************************************************
10*4882a593Smuzhiyun ******************************************************************************/
11*4882a593Smuzhiyun
12*4882a593Smuzhiyun #include "dlm_internal.h"
13*4882a593Smuzhiyun #include "lockspace.h"
14*4882a593Smuzhiyun #include "member.h"
15*4882a593Smuzhiyun #include "lowcomms.h"
16*4882a593Smuzhiyun #include "midcomms.h"
17*4882a593Smuzhiyun #include "rcom.h"
18*4882a593Smuzhiyun #include "recover.h"
19*4882a593Smuzhiyun #include "dir.h"
20*4882a593Smuzhiyun #include "config.h"
21*4882a593Smuzhiyun #include "memory.h"
22*4882a593Smuzhiyun #include "lock.h"
23*4882a593Smuzhiyun #include "util.h"
24*4882a593Smuzhiyun
rcom_response(struct dlm_ls * ls)25*4882a593Smuzhiyun static int rcom_response(struct dlm_ls *ls)
26*4882a593Smuzhiyun {
27*4882a593Smuzhiyun return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28*4882a593Smuzhiyun }
29*4882a593Smuzhiyun
create_rcom(struct dlm_ls * ls,int to_nodeid,int type,int len,struct dlm_rcom ** rc_ret,struct dlm_mhandle ** mh_ret)30*4882a593Smuzhiyun static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
31*4882a593Smuzhiyun struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
32*4882a593Smuzhiyun {
33*4882a593Smuzhiyun struct dlm_rcom *rc;
34*4882a593Smuzhiyun struct dlm_mhandle *mh;
35*4882a593Smuzhiyun char *mb;
36*4882a593Smuzhiyun int mb_len = sizeof(struct dlm_rcom) + len;
37*4882a593Smuzhiyun
38*4882a593Smuzhiyun mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
39*4882a593Smuzhiyun if (!mh) {
40*4882a593Smuzhiyun log_print("create_rcom to %d type %d len %d ENOBUFS",
41*4882a593Smuzhiyun to_nodeid, type, len);
42*4882a593Smuzhiyun return -ENOBUFS;
43*4882a593Smuzhiyun }
44*4882a593Smuzhiyun memset(mb, 0, mb_len);
45*4882a593Smuzhiyun
46*4882a593Smuzhiyun rc = (struct dlm_rcom *) mb;
47*4882a593Smuzhiyun
48*4882a593Smuzhiyun rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
49*4882a593Smuzhiyun rc->rc_header.h_lockspace = ls->ls_global_id;
50*4882a593Smuzhiyun rc->rc_header.h_nodeid = dlm_our_nodeid();
51*4882a593Smuzhiyun rc->rc_header.h_length = mb_len;
52*4882a593Smuzhiyun rc->rc_header.h_cmd = DLM_RCOM;
53*4882a593Smuzhiyun
54*4882a593Smuzhiyun rc->rc_type = type;
55*4882a593Smuzhiyun
56*4882a593Smuzhiyun spin_lock(&ls->ls_recover_lock);
57*4882a593Smuzhiyun rc->rc_seq = ls->ls_recover_seq;
58*4882a593Smuzhiyun spin_unlock(&ls->ls_recover_lock);
59*4882a593Smuzhiyun
60*4882a593Smuzhiyun *mh_ret = mh;
61*4882a593Smuzhiyun *rc_ret = rc;
62*4882a593Smuzhiyun return 0;
63*4882a593Smuzhiyun }
64*4882a593Smuzhiyun
send_rcom(struct dlm_ls * ls,struct dlm_mhandle * mh,struct dlm_rcom * rc)65*4882a593Smuzhiyun static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
66*4882a593Smuzhiyun struct dlm_rcom *rc)
67*4882a593Smuzhiyun {
68*4882a593Smuzhiyun dlm_rcom_out(rc);
69*4882a593Smuzhiyun dlm_lowcomms_commit_buffer(mh);
70*4882a593Smuzhiyun }
71*4882a593Smuzhiyun
set_rcom_status(struct dlm_ls * ls,struct rcom_status * rs,uint32_t flags)72*4882a593Smuzhiyun static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
73*4882a593Smuzhiyun uint32_t flags)
74*4882a593Smuzhiyun {
75*4882a593Smuzhiyun rs->rs_flags = cpu_to_le32(flags);
76*4882a593Smuzhiyun }
77*4882a593Smuzhiyun
78*4882a593Smuzhiyun /* When replying to a status request, a node also sends back its
79*4882a593Smuzhiyun configuration values. The requesting node then checks that the remote
80*4882a593Smuzhiyun node is configured the same way as itself. */
81*4882a593Smuzhiyun
set_rcom_config(struct dlm_ls * ls,struct rcom_config * rf,uint32_t num_slots)82*4882a593Smuzhiyun static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
83*4882a593Smuzhiyun uint32_t num_slots)
84*4882a593Smuzhiyun {
85*4882a593Smuzhiyun rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
86*4882a593Smuzhiyun rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
87*4882a593Smuzhiyun
88*4882a593Smuzhiyun rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
89*4882a593Smuzhiyun rf->rf_num_slots = cpu_to_le16(num_slots);
90*4882a593Smuzhiyun rf->rf_generation = cpu_to_le32(ls->ls_generation);
91*4882a593Smuzhiyun }
92*4882a593Smuzhiyun
check_rcom_config(struct dlm_ls * ls,struct dlm_rcom * rc,int nodeid)93*4882a593Smuzhiyun static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
94*4882a593Smuzhiyun {
95*4882a593Smuzhiyun struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
96*4882a593Smuzhiyun
97*4882a593Smuzhiyun if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
98*4882a593Smuzhiyun log_error(ls, "version mismatch: %x nodeid %d: %x",
99*4882a593Smuzhiyun DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
100*4882a593Smuzhiyun rc->rc_header.h_version);
101*4882a593Smuzhiyun return -EPROTO;
102*4882a593Smuzhiyun }
103*4882a593Smuzhiyun
104*4882a593Smuzhiyun if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
105*4882a593Smuzhiyun le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
106*4882a593Smuzhiyun log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
107*4882a593Smuzhiyun ls->ls_lvblen, ls->ls_exflags, nodeid,
108*4882a593Smuzhiyun le32_to_cpu(rf->rf_lvblen),
109*4882a593Smuzhiyun le32_to_cpu(rf->rf_lsflags));
110*4882a593Smuzhiyun return -EPROTO;
111*4882a593Smuzhiyun }
112*4882a593Smuzhiyun return 0;
113*4882a593Smuzhiyun }
114*4882a593Smuzhiyun
allow_sync_reply(struct dlm_ls * ls,uint64_t * new_seq)115*4882a593Smuzhiyun static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
116*4882a593Smuzhiyun {
117*4882a593Smuzhiyun spin_lock(&ls->ls_rcom_spin);
118*4882a593Smuzhiyun *new_seq = ++ls->ls_rcom_seq;
119*4882a593Smuzhiyun set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
120*4882a593Smuzhiyun spin_unlock(&ls->ls_rcom_spin);
121*4882a593Smuzhiyun }
122*4882a593Smuzhiyun
disallow_sync_reply(struct dlm_ls * ls)123*4882a593Smuzhiyun static void disallow_sync_reply(struct dlm_ls *ls)
124*4882a593Smuzhiyun {
125*4882a593Smuzhiyun spin_lock(&ls->ls_rcom_spin);
126*4882a593Smuzhiyun clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
127*4882a593Smuzhiyun clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
128*4882a593Smuzhiyun spin_unlock(&ls->ls_rcom_spin);
129*4882a593Smuzhiyun }
130*4882a593Smuzhiyun
131*4882a593Smuzhiyun /*
132*4882a593Smuzhiyun * low nodeid gathers one slot value at a time from each node.
133*4882a593Smuzhiyun * it sets need_slots=0, and saves rf_our_slot returned from each
134*4882a593Smuzhiyun * rcom_config.
135*4882a593Smuzhiyun *
136*4882a593Smuzhiyun * other nodes gather all slot values at once from the low nodeid.
137*4882a593Smuzhiyun * they set need_slots=1, and ignore the rf_our_slot returned from each
138*4882a593Smuzhiyun * rcom_config. they use the rf_num_slots returned from the low
139*4882a593Smuzhiyun * node's rcom_config.
140*4882a593Smuzhiyun */
141*4882a593Smuzhiyun
dlm_rcom_status(struct dlm_ls * ls,int nodeid,uint32_t status_flags)142*4882a593Smuzhiyun int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
143*4882a593Smuzhiyun {
144*4882a593Smuzhiyun struct dlm_rcom *rc;
145*4882a593Smuzhiyun struct dlm_mhandle *mh;
146*4882a593Smuzhiyun int error = 0;
147*4882a593Smuzhiyun
148*4882a593Smuzhiyun ls->ls_recover_nodeid = nodeid;
149*4882a593Smuzhiyun
150*4882a593Smuzhiyun if (nodeid == dlm_our_nodeid()) {
151*4882a593Smuzhiyun rc = ls->ls_recover_buf;
152*4882a593Smuzhiyun rc->rc_result = dlm_recover_status(ls);
153*4882a593Smuzhiyun goto out;
154*4882a593Smuzhiyun }
155*4882a593Smuzhiyun
156*4882a593Smuzhiyun retry:
157*4882a593Smuzhiyun error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
158*4882a593Smuzhiyun sizeof(struct rcom_status), &rc, &mh);
159*4882a593Smuzhiyun if (error)
160*4882a593Smuzhiyun goto out;
161*4882a593Smuzhiyun
162*4882a593Smuzhiyun set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
163*4882a593Smuzhiyun
164*4882a593Smuzhiyun allow_sync_reply(ls, &rc->rc_id);
165*4882a593Smuzhiyun memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
166*4882a593Smuzhiyun
167*4882a593Smuzhiyun send_rcom(ls, mh, rc);
168*4882a593Smuzhiyun
169*4882a593Smuzhiyun error = dlm_wait_function(ls, &rcom_response);
170*4882a593Smuzhiyun disallow_sync_reply(ls);
171*4882a593Smuzhiyun if (error == -ETIMEDOUT)
172*4882a593Smuzhiyun goto retry;
173*4882a593Smuzhiyun if (error)
174*4882a593Smuzhiyun goto out;
175*4882a593Smuzhiyun
176*4882a593Smuzhiyun rc = ls->ls_recover_buf;
177*4882a593Smuzhiyun
178*4882a593Smuzhiyun if (rc->rc_result == -ESRCH) {
179*4882a593Smuzhiyun /* we pretend the remote lockspace exists with 0 status */
180*4882a593Smuzhiyun log_debug(ls, "remote node %d not ready", nodeid);
181*4882a593Smuzhiyun rc->rc_result = 0;
182*4882a593Smuzhiyun error = 0;
183*4882a593Smuzhiyun } else {
184*4882a593Smuzhiyun error = check_rcom_config(ls, rc, nodeid);
185*4882a593Smuzhiyun }
186*4882a593Smuzhiyun
187*4882a593Smuzhiyun /* the caller looks at rc_result for the remote recovery status */
188*4882a593Smuzhiyun out:
189*4882a593Smuzhiyun return error;
190*4882a593Smuzhiyun }
191*4882a593Smuzhiyun
receive_rcom_status(struct dlm_ls * ls,struct dlm_rcom * rc_in)192*4882a593Smuzhiyun static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
193*4882a593Smuzhiyun {
194*4882a593Smuzhiyun struct dlm_rcom *rc;
195*4882a593Smuzhiyun struct dlm_mhandle *mh;
196*4882a593Smuzhiyun struct rcom_status *rs;
197*4882a593Smuzhiyun uint32_t status;
198*4882a593Smuzhiyun int nodeid = rc_in->rc_header.h_nodeid;
199*4882a593Smuzhiyun int len = sizeof(struct rcom_config);
200*4882a593Smuzhiyun int num_slots = 0;
201*4882a593Smuzhiyun int error;
202*4882a593Smuzhiyun
203*4882a593Smuzhiyun if (!dlm_slots_version(&rc_in->rc_header)) {
204*4882a593Smuzhiyun status = dlm_recover_status(ls);
205*4882a593Smuzhiyun goto do_create;
206*4882a593Smuzhiyun }
207*4882a593Smuzhiyun
208*4882a593Smuzhiyun rs = (struct rcom_status *)rc_in->rc_buf;
209*4882a593Smuzhiyun
210*4882a593Smuzhiyun if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
211*4882a593Smuzhiyun status = dlm_recover_status(ls);
212*4882a593Smuzhiyun goto do_create;
213*4882a593Smuzhiyun }
214*4882a593Smuzhiyun
215*4882a593Smuzhiyun spin_lock(&ls->ls_recover_lock);
216*4882a593Smuzhiyun status = ls->ls_recover_status;
217*4882a593Smuzhiyun num_slots = ls->ls_num_slots;
218*4882a593Smuzhiyun spin_unlock(&ls->ls_recover_lock);
219*4882a593Smuzhiyun len += num_slots * sizeof(struct rcom_slot);
220*4882a593Smuzhiyun
221*4882a593Smuzhiyun do_create:
222*4882a593Smuzhiyun error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
223*4882a593Smuzhiyun len, &rc, &mh);
224*4882a593Smuzhiyun if (error)
225*4882a593Smuzhiyun return;
226*4882a593Smuzhiyun
227*4882a593Smuzhiyun rc->rc_id = rc_in->rc_id;
228*4882a593Smuzhiyun rc->rc_seq_reply = rc_in->rc_seq;
229*4882a593Smuzhiyun rc->rc_result = status;
230*4882a593Smuzhiyun
231*4882a593Smuzhiyun set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
232*4882a593Smuzhiyun
233*4882a593Smuzhiyun if (!num_slots)
234*4882a593Smuzhiyun goto do_send;
235*4882a593Smuzhiyun
236*4882a593Smuzhiyun spin_lock(&ls->ls_recover_lock);
237*4882a593Smuzhiyun if (ls->ls_num_slots != num_slots) {
238*4882a593Smuzhiyun spin_unlock(&ls->ls_recover_lock);
239*4882a593Smuzhiyun log_debug(ls, "receive_rcom_status num_slots %d to %d",
240*4882a593Smuzhiyun num_slots, ls->ls_num_slots);
241*4882a593Smuzhiyun rc->rc_result = 0;
242*4882a593Smuzhiyun set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
243*4882a593Smuzhiyun goto do_send;
244*4882a593Smuzhiyun }
245*4882a593Smuzhiyun
246*4882a593Smuzhiyun dlm_slots_copy_out(ls, rc);
247*4882a593Smuzhiyun spin_unlock(&ls->ls_recover_lock);
248*4882a593Smuzhiyun
249*4882a593Smuzhiyun do_send:
250*4882a593Smuzhiyun send_rcom(ls, mh, rc);
251*4882a593Smuzhiyun }
252*4882a593Smuzhiyun
receive_sync_reply(struct dlm_ls * ls,struct dlm_rcom * rc_in)253*4882a593Smuzhiyun static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
254*4882a593Smuzhiyun {
255*4882a593Smuzhiyun spin_lock(&ls->ls_rcom_spin);
256*4882a593Smuzhiyun if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
257*4882a593Smuzhiyun rc_in->rc_id != ls->ls_rcom_seq) {
258*4882a593Smuzhiyun log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
259*4882a593Smuzhiyun rc_in->rc_type, rc_in->rc_header.h_nodeid,
260*4882a593Smuzhiyun (unsigned long long)rc_in->rc_id,
261*4882a593Smuzhiyun (unsigned long long)ls->ls_rcom_seq);
262*4882a593Smuzhiyun goto out;
263*4882a593Smuzhiyun }
264*4882a593Smuzhiyun memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
265*4882a593Smuzhiyun set_bit(LSFL_RCOM_READY, &ls->ls_flags);
266*4882a593Smuzhiyun clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
267*4882a593Smuzhiyun wake_up(&ls->ls_wait_general);
268*4882a593Smuzhiyun out:
269*4882a593Smuzhiyun spin_unlock(&ls->ls_rcom_spin);
270*4882a593Smuzhiyun }
271*4882a593Smuzhiyun
dlm_rcom_names(struct dlm_ls * ls,int nodeid,char * last_name,int last_len)272*4882a593Smuzhiyun int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
273*4882a593Smuzhiyun {
274*4882a593Smuzhiyun struct dlm_rcom *rc;
275*4882a593Smuzhiyun struct dlm_mhandle *mh;
276*4882a593Smuzhiyun int error = 0;
277*4882a593Smuzhiyun
278*4882a593Smuzhiyun ls->ls_recover_nodeid = nodeid;
279*4882a593Smuzhiyun
280*4882a593Smuzhiyun retry:
281*4882a593Smuzhiyun error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
282*4882a593Smuzhiyun if (error)
283*4882a593Smuzhiyun goto out;
284*4882a593Smuzhiyun memcpy(rc->rc_buf, last_name, last_len);
285*4882a593Smuzhiyun
286*4882a593Smuzhiyun allow_sync_reply(ls, &rc->rc_id);
287*4882a593Smuzhiyun memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
288*4882a593Smuzhiyun
289*4882a593Smuzhiyun send_rcom(ls, mh, rc);
290*4882a593Smuzhiyun
291*4882a593Smuzhiyun error = dlm_wait_function(ls, &rcom_response);
292*4882a593Smuzhiyun disallow_sync_reply(ls);
293*4882a593Smuzhiyun if (error == -ETIMEDOUT)
294*4882a593Smuzhiyun goto retry;
295*4882a593Smuzhiyun out:
296*4882a593Smuzhiyun return error;
297*4882a593Smuzhiyun }
298*4882a593Smuzhiyun
receive_rcom_names(struct dlm_ls * ls,struct dlm_rcom * rc_in)299*4882a593Smuzhiyun static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
300*4882a593Smuzhiyun {
301*4882a593Smuzhiyun struct dlm_rcom *rc;
302*4882a593Smuzhiyun struct dlm_mhandle *mh;
303*4882a593Smuzhiyun int error, inlen, outlen, nodeid;
304*4882a593Smuzhiyun
305*4882a593Smuzhiyun nodeid = rc_in->rc_header.h_nodeid;
306*4882a593Smuzhiyun inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
307*4882a593Smuzhiyun outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
308*4882a593Smuzhiyun
309*4882a593Smuzhiyun error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
310*4882a593Smuzhiyun if (error)
311*4882a593Smuzhiyun return;
312*4882a593Smuzhiyun rc->rc_id = rc_in->rc_id;
313*4882a593Smuzhiyun rc->rc_seq_reply = rc_in->rc_seq;
314*4882a593Smuzhiyun
315*4882a593Smuzhiyun dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
316*4882a593Smuzhiyun nodeid);
317*4882a593Smuzhiyun send_rcom(ls, mh, rc);
318*4882a593Smuzhiyun }
319*4882a593Smuzhiyun
dlm_send_rcom_lookup(struct dlm_rsb * r,int dir_nodeid)320*4882a593Smuzhiyun int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
321*4882a593Smuzhiyun {
322*4882a593Smuzhiyun struct dlm_rcom *rc;
323*4882a593Smuzhiyun struct dlm_mhandle *mh;
324*4882a593Smuzhiyun struct dlm_ls *ls = r->res_ls;
325*4882a593Smuzhiyun int error;
326*4882a593Smuzhiyun
327*4882a593Smuzhiyun error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
328*4882a593Smuzhiyun &rc, &mh);
329*4882a593Smuzhiyun if (error)
330*4882a593Smuzhiyun goto out;
331*4882a593Smuzhiyun memcpy(rc->rc_buf, r->res_name, r->res_length);
332*4882a593Smuzhiyun rc->rc_id = (unsigned long) r->res_id;
333*4882a593Smuzhiyun
334*4882a593Smuzhiyun send_rcom(ls, mh, rc);
335*4882a593Smuzhiyun out:
336*4882a593Smuzhiyun return error;
337*4882a593Smuzhiyun }
338*4882a593Smuzhiyun
receive_rcom_lookup(struct dlm_ls * ls,struct dlm_rcom * rc_in)339*4882a593Smuzhiyun static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
340*4882a593Smuzhiyun {
341*4882a593Smuzhiyun struct dlm_rcom *rc;
342*4882a593Smuzhiyun struct dlm_mhandle *mh;
343*4882a593Smuzhiyun int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
344*4882a593Smuzhiyun int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
345*4882a593Smuzhiyun
346*4882a593Smuzhiyun error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
347*4882a593Smuzhiyun if (error)
348*4882a593Smuzhiyun return;
349*4882a593Smuzhiyun
350*4882a593Smuzhiyun /* Old code would send this special id to trigger a debug dump. */
351*4882a593Smuzhiyun if (rc_in->rc_id == 0xFFFFFFFF) {
352*4882a593Smuzhiyun log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
353*4882a593Smuzhiyun dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
354*4882a593Smuzhiyun return;
355*4882a593Smuzhiyun }
356*4882a593Smuzhiyun
357*4882a593Smuzhiyun error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
358*4882a593Smuzhiyun DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
359*4882a593Smuzhiyun if (error)
360*4882a593Smuzhiyun ret_nodeid = error;
361*4882a593Smuzhiyun rc->rc_result = ret_nodeid;
362*4882a593Smuzhiyun rc->rc_id = rc_in->rc_id;
363*4882a593Smuzhiyun rc->rc_seq_reply = rc_in->rc_seq;
364*4882a593Smuzhiyun
365*4882a593Smuzhiyun send_rcom(ls, mh, rc);
366*4882a593Smuzhiyun }
367*4882a593Smuzhiyun
receive_rcom_lookup_reply(struct dlm_ls * ls,struct dlm_rcom * rc_in)368*4882a593Smuzhiyun static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
369*4882a593Smuzhiyun {
370*4882a593Smuzhiyun dlm_recover_master_reply(ls, rc_in);
371*4882a593Smuzhiyun }
372*4882a593Smuzhiyun
pack_rcom_lock(struct dlm_rsb * r,struct dlm_lkb * lkb,struct rcom_lock * rl)373*4882a593Smuzhiyun static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
374*4882a593Smuzhiyun struct rcom_lock *rl)
375*4882a593Smuzhiyun {
376*4882a593Smuzhiyun memset(rl, 0, sizeof(*rl));
377*4882a593Smuzhiyun
378*4882a593Smuzhiyun rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
379*4882a593Smuzhiyun rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
380*4882a593Smuzhiyun rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
381*4882a593Smuzhiyun rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
382*4882a593Smuzhiyun rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
383*4882a593Smuzhiyun rl->rl_rqmode = lkb->lkb_rqmode;
384*4882a593Smuzhiyun rl->rl_grmode = lkb->lkb_grmode;
385*4882a593Smuzhiyun rl->rl_status = lkb->lkb_status;
386*4882a593Smuzhiyun rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
387*4882a593Smuzhiyun
388*4882a593Smuzhiyun if (lkb->lkb_bastfn)
389*4882a593Smuzhiyun rl->rl_asts |= DLM_CB_BAST;
390*4882a593Smuzhiyun if (lkb->lkb_astfn)
391*4882a593Smuzhiyun rl->rl_asts |= DLM_CB_CAST;
392*4882a593Smuzhiyun
393*4882a593Smuzhiyun rl->rl_namelen = cpu_to_le16(r->res_length);
394*4882a593Smuzhiyun memcpy(rl->rl_name, r->res_name, r->res_length);
395*4882a593Smuzhiyun
396*4882a593Smuzhiyun /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
397*4882a593Smuzhiyun If so, receive_rcom_lock_args() won't take this copy. */
398*4882a593Smuzhiyun
399*4882a593Smuzhiyun if (lkb->lkb_lvbptr)
400*4882a593Smuzhiyun memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
401*4882a593Smuzhiyun }
402*4882a593Smuzhiyun
dlm_send_rcom_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)403*4882a593Smuzhiyun int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
404*4882a593Smuzhiyun {
405*4882a593Smuzhiyun struct dlm_ls *ls = r->res_ls;
406*4882a593Smuzhiyun struct dlm_rcom *rc;
407*4882a593Smuzhiyun struct dlm_mhandle *mh;
408*4882a593Smuzhiyun struct rcom_lock *rl;
409*4882a593Smuzhiyun int error, len = sizeof(struct rcom_lock);
410*4882a593Smuzhiyun
411*4882a593Smuzhiyun if (lkb->lkb_lvbptr)
412*4882a593Smuzhiyun len += ls->ls_lvblen;
413*4882a593Smuzhiyun
414*4882a593Smuzhiyun error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
415*4882a593Smuzhiyun if (error)
416*4882a593Smuzhiyun goto out;
417*4882a593Smuzhiyun
418*4882a593Smuzhiyun rl = (struct rcom_lock *) rc->rc_buf;
419*4882a593Smuzhiyun pack_rcom_lock(r, lkb, rl);
420*4882a593Smuzhiyun rc->rc_id = (unsigned long) r;
421*4882a593Smuzhiyun
422*4882a593Smuzhiyun send_rcom(ls, mh, rc);
423*4882a593Smuzhiyun out:
424*4882a593Smuzhiyun return error;
425*4882a593Smuzhiyun }
426*4882a593Smuzhiyun
427*4882a593Smuzhiyun /* needs at least dlm_rcom + rcom_lock */
receive_rcom_lock(struct dlm_ls * ls,struct dlm_rcom * rc_in)428*4882a593Smuzhiyun static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
429*4882a593Smuzhiyun {
430*4882a593Smuzhiyun struct dlm_rcom *rc;
431*4882a593Smuzhiyun struct dlm_mhandle *mh;
432*4882a593Smuzhiyun int error, nodeid = rc_in->rc_header.h_nodeid;
433*4882a593Smuzhiyun
434*4882a593Smuzhiyun dlm_recover_master_copy(ls, rc_in);
435*4882a593Smuzhiyun
436*4882a593Smuzhiyun error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
437*4882a593Smuzhiyun sizeof(struct rcom_lock), &rc, &mh);
438*4882a593Smuzhiyun if (error)
439*4882a593Smuzhiyun return;
440*4882a593Smuzhiyun
441*4882a593Smuzhiyun /* We send back the same rcom_lock struct we received, but
442*4882a593Smuzhiyun dlm_recover_master_copy() has filled in rl_remid and rl_result */
443*4882a593Smuzhiyun
444*4882a593Smuzhiyun memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
445*4882a593Smuzhiyun rc->rc_id = rc_in->rc_id;
446*4882a593Smuzhiyun rc->rc_seq_reply = rc_in->rc_seq;
447*4882a593Smuzhiyun
448*4882a593Smuzhiyun send_rcom(ls, mh, rc);
449*4882a593Smuzhiyun }
450*4882a593Smuzhiyun
451*4882a593Smuzhiyun /* If the lockspace doesn't exist then still send a status message
452*4882a593Smuzhiyun back; it's possible that it just doesn't have its global_id yet. */
453*4882a593Smuzhiyun
dlm_send_ls_not_ready(int nodeid,struct dlm_rcom * rc_in)454*4882a593Smuzhiyun int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
455*4882a593Smuzhiyun {
456*4882a593Smuzhiyun struct dlm_rcom *rc;
457*4882a593Smuzhiyun struct rcom_config *rf;
458*4882a593Smuzhiyun struct dlm_mhandle *mh;
459*4882a593Smuzhiyun char *mb;
460*4882a593Smuzhiyun int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
461*4882a593Smuzhiyun
462*4882a593Smuzhiyun mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
463*4882a593Smuzhiyun if (!mh)
464*4882a593Smuzhiyun return -ENOBUFS;
465*4882a593Smuzhiyun memset(mb, 0, mb_len);
466*4882a593Smuzhiyun
467*4882a593Smuzhiyun rc = (struct dlm_rcom *) mb;
468*4882a593Smuzhiyun
469*4882a593Smuzhiyun rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
470*4882a593Smuzhiyun rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
471*4882a593Smuzhiyun rc->rc_header.h_nodeid = dlm_our_nodeid();
472*4882a593Smuzhiyun rc->rc_header.h_length = mb_len;
473*4882a593Smuzhiyun rc->rc_header.h_cmd = DLM_RCOM;
474*4882a593Smuzhiyun
475*4882a593Smuzhiyun rc->rc_type = DLM_RCOM_STATUS_REPLY;
476*4882a593Smuzhiyun rc->rc_id = rc_in->rc_id;
477*4882a593Smuzhiyun rc->rc_seq_reply = rc_in->rc_seq;
478*4882a593Smuzhiyun rc->rc_result = -ESRCH;
479*4882a593Smuzhiyun
480*4882a593Smuzhiyun rf = (struct rcom_config *) rc->rc_buf;
481*4882a593Smuzhiyun rf->rf_lvblen = cpu_to_le32(~0U);
482*4882a593Smuzhiyun
483*4882a593Smuzhiyun dlm_rcom_out(rc);
484*4882a593Smuzhiyun dlm_lowcomms_commit_buffer(mh);
485*4882a593Smuzhiyun
486*4882a593Smuzhiyun return 0;
487*4882a593Smuzhiyun }
488*4882a593Smuzhiyun
489*4882a593Smuzhiyun /*
490*4882a593Smuzhiyun * Ignore messages for stage Y before we set
491*4882a593Smuzhiyun * recover_status bit for stage X:
492*4882a593Smuzhiyun *
493*4882a593Smuzhiyun * recover_status = 0
494*4882a593Smuzhiyun *
495*4882a593Smuzhiyun * dlm_recover_members()
496*4882a593Smuzhiyun * - send nothing
497*4882a593Smuzhiyun * - recv nothing
498*4882a593Smuzhiyun * - ignore NAMES, NAMES_REPLY
499*4882a593Smuzhiyun * - ignore LOOKUP, LOOKUP_REPLY
500*4882a593Smuzhiyun * - ignore LOCK, LOCK_REPLY
501*4882a593Smuzhiyun *
502*4882a593Smuzhiyun * recover_status |= NODES
503*4882a593Smuzhiyun *
504*4882a593Smuzhiyun * dlm_recover_members_wait()
505*4882a593Smuzhiyun *
506*4882a593Smuzhiyun * dlm_recover_directory()
507*4882a593Smuzhiyun * - send NAMES
508*4882a593Smuzhiyun * - recv NAMES_REPLY
509*4882a593Smuzhiyun * - ignore LOOKUP, LOOKUP_REPLY
510*4882a593Smuzhiyun * - ignore LOCK, LOCK_REPLY
511*4882a593Smuzhiyun *
512*4882a593Smuzhiyun * recover_status |= DIR
513*4882a593Smuzhiyun *
514*4882a593Smuzhiyun * dlm_recover_directory_wait()
515*4882a593Smuzhiyun *
516*4882a593Smuzhiyun * dlm_recover_masters()
517*4882a593Smuzhiyun * - send LOOKUP
518*4882a593Smuzhiyun * - recv LOOKUP_REPLY
519*4882a593Smuzhiyun *
520*4882a593Smuzhiyun * dlm_recover_locks()
521*4882a593Smuzhiyun * - send LOCKS
522*4882a593Smuzhiyun * - recv LOCKS_REPLY
523*4882a593Smuzhiyun *
524*4882a593Smuzhiyun * recover_status |= LOCKS
525*4882a593Smuzhiyun *
526*4882a593Smuzhiyun * dlm_recover_locks_wait()
527*4882a593Smuzhiyun *
528*4882a593Smuzhiyun * recover_status |= DONE
529*4882a593Smuzhiyun */
530*4882a593Smuzhiyun
531*4882a593Smuzhiyun /* Called by dlm_recv; corresponds to dlm_receive_message() but special
532*4882a593Smuzhiyun recovery-only comms are sent through here. */
533*4882a593Smuzhiyun
dlm_receive_rcom(struct dlm_ls * ls,struct dlm_rcom * rc,int nodeid)534*4882a593Smuzhiyun void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
535*4882a593Smuzhiyun {
536*4882a593Smuzhiyun int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
537*4882a593Smuzhiyun int stop, reply = 0, names = 0, lookup = 0, lock = 0;
538*4882a593Smuzhiyun uint32_t status;
539*4882a593Smuzhiyun uint64_t seq;
540*4882a593Smuzhiyun
541*4882a593Smuzhiyun switch (rc->rc_type) {
542*4882a593Smuzhiyun case DLM_RCOM_STATUS_REPLY:
543*4882a593Smuzhiyun reply = 1;
544*4882a593Smuzhiyun break;
545*4882a593Smuzhiyun case DLM_RCOM_NAMES:
546*4882a593Smuzhiyun names = 1;
547*4882a593Smuzhiyun break;
548*4882a593Smuzhiyun case DLM_RCOM_NAMES_REPLY:
549*4882a593Smuzhiyun names = 1;
550*4882a593Smuzhiyun reply = 1;
551*4882a593Smuzhiyun break;
552*4882a593Smuzhiyun case DLM_RCOM_LOOKUP:
553*4882a593Smuzhiyun lookup = 1;
554*4882a593Smuzhiyun break;
555*4882a593Smuzhiyun case DLM_RCOM_LOOKUP_REPLY:
556*4882a593Smuzhiyun lookup = 1;
557*4882a593Smuzhiyun reply = 1;
558*4882a593Smuzhiyun break;
559*4882a593Smuzhiyun case DLM_RCOM_LOCK:
560*4882a593Smuzhiyun lock = 1;
561*4882a593Smuzhiyun break;
562*4882a593Smuzhiyun case DLM_RCOM_LOCK_REPLY:
563*4882a593Smuzhiyun lock = 1;
564*4882a593Smuzhiyun reply = 1;
565*4882a593Smuzhiyun break;
566*4882a593Smuzhiyun }
567*4882a593Smuzhiyun
568*4882a593Smuzhiyun spin_lock(&ls->ls_recover_lock);
569*4882a593Smuzhiyun status = ls->ls_recover_status;
570*4882a593Smuzhiyun stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
571*4882a593Smuzhiyun seq = ls->ls_recover_seq;
572*4882a593Smuzhiyun spin_unlock(&ls->ls_recover_lock);
573*4882a593Smuzhiyun
574*4882a593Smuzhiyun if (stop && (rc->rc_type != DLM_RCOM_STATUS))
575*4882a593Smuzhiyun goto ignore;
576*4882a593Smuzhiyun
577*4882a593Smuzhiyun if (reply && (rc->rc_seq_reply != seq))
578*4882a593Smuzhiyun goto ignore;
579*4882a593Smuzhiyun
580*4882a593Smuzhiyun if (!(status & DLM_RS_NODES) && (names || lookup || lock))
581*4882a593Smuzhiyun goto ignore;
582*4882a593Smuzhiyun
583*4882a593Smuzhiyun if (!(status & DLM_RS_DIR) && (lookup || lock))
584*4882a593Smuzhiyun goto ignore;
585*4882a593Smuzhiyun
586*4882a593Smuzhiyun switch (rc->rc_type) {
587*4882a593Smuzhiyun case DLM_RCOM_STATUS:
588*4882a593Smuzhiyun receive_rcom_status(ls, rc);
589*4882a593Smuzhiyun break;
590*4882a593Smuzhiyun
591*4882a593Smuzhiyun case DLM_RCOM_NAMES:
592*4882a593Smuzhiyun receive_rcom_names(ls, rc);
593*4882a593Smuzhiyun break;
594*4882a593Smuzhiyun
595*4882a593Smuzhiyun case DLM_RCOM_LOOKUP:
596*4882a593Smuzhiyun receive_rcom_lookup(ls, rc);
597*4882a593Smuzhiyun break;
598*4882a593Smuzhiyun
599*4882a593Smuzhiyun case DLM_RCOM_LOCK:
600*4882a593Smuzhiyun if (rc->rc_header.h_length < lock_size)
601*4882a593Smuzhiyun goto Eshort;
602*4882a593Smuzhiyun receive_rcom_lock(ls, rc);
603*4882a593Smuzhiyun break;
604*4882a593Smuzhiyun
605*4882a593Smuzhiyun case DLM_RCOM_STATUS_REPLY:
606*4882a593Smuzhiyun receive_sync_reply(ls, rc);
607*4882a593Smuzhiyun break;
608*4882a593Smuzhiyun
609*4882a593Smuzhiyun case DLM_RCOM_NAMES_REPLY:
610*4882a593Smuzhiyun receive_sync_reply(ls, rc);
611*4882a593Smuzhiyun break;
612*4882a593Smuzhiyun
613*4882a593Smuzhiyun case DLM_RCOM_LOOKUP_REPLY:
614*4882a593Smuzhiyun receive_rcom_lookup_reply(ls, rc);
615*4882a593Smuzhiyun break;
616*4882a593Smuzhiyun
617*4882a593Smuzhiyun case DLM_RCOM_LOCK_REPLY:
618*4882a593Smuzhiyun if (rc->rc_header.h_length < lock_size)
619*4882a593Smuzhiyun goto Eshort;
620*4882a593Smuzhiyun dlm_recover_process_copy(ls, rc);
621*4882a593Smuzhiyun break;
622*4882a593Smuzhiyun
623*4882a593Smuzhiyun default:
624*4882a593Smuzhiyun log_error(ls, "receive_rcom bad type %d", rc->rc_type);
625*4882a593Smuzhiyun }
626*4882a593Smuzhiyun return;
627*4882a593Smuzhiyun
628*4882a593Smuzhiyun ignore:
629*4882a593Smuzhiyun log_limit(ls, "dlm_receive_rcom ignore msg %d "
630*4882a593Smuzhiyun "from %d %llu %llu recover seq %llu sts %x gen %u",
631*4882a593Smuzhiyun rc->rc_type,
632*4882a593Smuzhiyun nodeid,
633*4882a593Smuzhiyun (unsigned long long)rc->rc_seq,
634*4882a593Smuzhiyun (unsigned long long)rc->rc_seq_reply,
635*4882a593Smuzhiyun (unsigned long long)seq,
636*4882a593Smuzhiyun status, ls->ls_generation);
637*4882a593Smuzhiyun return;
638*4882a593Smuzhiyun Eshort:
639*4882a593Smuzhiyun log_error(ls, "recovery message %d from %d is too short",
640*4882a593Smuzhiyun rc->rc_type, nodeid);
641*4882a593Smuzhiyun }
642*4882a593Smuzhiyun
643