xref: /OK3568_Linux_fs/kernel/fs/dlm/rcom.c (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1*4882a593Smuzhiyun // SPDX-License-Identifier: GPL-2.0-only
2*4882a593Smuzhiyun /******************************************************************************
3*4882a593Smuzhiyun *******************************************************************************
4*4882a593Smuzhiyun **
5*4882a593Smuzhiyun **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6*4882a593Smuzhiyun **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
7*4882a593Smuzhiyun **
8*4882a593Smuzhiyun **
9*4882a593Smuzhiyun *******************************************************************************
10*4882a593Smuzhiyun ******************************************************************************/
11*4882a593Smuzhiyun 
12*4882a593Smuzhiyun #include "dlm_internal.h"
13*4882a593Smuzhiyun #include "lockspace.h"
14*4882a593Smuzhiyun #include "member.h"
15*4882a593Smuzhiyun #include "lowcomms.h"
16*4882a593Smuzhiyun #include "midcomms.h"
17*4882a593Smuzhiyun #include "rcom.h"
18*4882a593Smuzhiyun #include "recover.h"
19*4882a593Smuzhiyun #include "dir.h"
20*4882a593Smuzhiyun #include "config.h"
21*4882a593Smuzhiyun #include "memory.h"
22*4882a593Smuzhiyun #include "lock.h"
23*4882a593Smuzhiyun #include "util.h"
24*4882a593Smuzhiyun 
rcom_response(struct dlm_ls * ls)25*4882a593Smuzhiyun static int rcom_response(struct dlm_ls *ls)
26*4882a593Smuzhiyun {
27*4882a593Smuzhiyun 	return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28*4882a593Smuzhiyun }
29*4882a593Smuzhiyun 
create_rcom(struct dlm_ls * ls,int to_nodeid,int type,int len,struct dlm_rcom ** rc_ret,struct dlm_mhandle ** mh_ret)30*4882a593Smuzhiyun static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
31*4882a593Smuzhiyun 		       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
32*4882a593Smuzhiyun {
33*4882a593Smuzhiyun 	struct dlm_rcom *rc;
34*4882a593Smuzhiyun 	struct dlm_mhandle *mh;
35*4882a593Smuzhiyun 	char *mb;
36*4882a593Smuzhiyun 	int mb_len = sizeof(struct dlm_rcom) + len;
37*4882a593Smuzhiyun 
38*4882a593Smuzhiyun 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
39*4882a593Smuzhiyun 	if (!mh) {
40*4882a593Smuzhiyun 		log_print("create_rcom to %d type %d len %d ENOBUFS",
41*4882a593Smuzhiyun 			  to_nodeid, type, len);
42*4882a593Smuzhiyun 		return -ENOBUFS;
43*4882a593Smuzhiyun 	}
44*4882a593Smuzhiyun 	memset(mb, 0, mb_len);
45*4882a593Smuzhiyun 
46*4882a593Smuzhiyun 	rc = (struct dlm_rcom *) mb;
47*4882a593Smuzhiyun 
48*4882a593Smuzhiyun 	rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
49*4882a593Smuzhiyun 	rc->rc_header.h_lockspace = ls->ls_global_id;
50*4882a593Smuzhiyun 	rc->rc_header.h_nodeid = dlm_our_nodeid();
51*4882a593Smuzhiyun 	rc->rc_header.h_length = mb_len;
52*4882a593Smuzhiyun 	rc->rc_header.h_cmd = DLM_RCOM;
53*4882a593Smuzhiyun 
54*4882a593Smuzhiyun 	rc->rc_type = type;
55*4882a593Smuzhiyun 
56*4882a593Smuzhiyun 	spin_lock(&ls->ls_recover_lock);
57*4882a593Smuzhiyun 	rc->rc_seq = ls->ls_recover_seq;
58*4882a593Smuzhiyun 	spin_unlock(&ls->ls_recover_lock);
59*4882a593Smuzhiyun 
60*4882a593Smuzhiyun 	*mh_ret = mh;
61*4882a593Smuzhiyun 	*rc_ret = rc;
62*4882a593Smuzhiyun 	return 0;
63*4882a593Smuzhiyun }
64*4882a593Smuzhiyun 
send_rcom(struct dlm_ls * ls,struct dlm_mhandle * mh,struct dlm_rcom * rc)65*4882a593Smuzhiyun static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
66*4882a593Smuzhiyun 		      struct dlm_rcom *rc)
67*4882a593Smuzhiyun {
68*4882a593Smuzhiyun 	dlm_rcom_out(rc);
69*4882a593Smuzhiyun 	dlm_lowcomms_commit_buffer(mh);
70*4882a593Smuzhiyun }
71*4882a593Smuzhiyun 
set_rcom_status(struct dlm_ls * ls,struct rcom_status * rs,uint32_t flags)72*4882a593Smuzhiyun static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
73*4882a593Smuzhiyun 			    uint32_t flags)
74*4882a593Smuzhiyun {
75*4882a593Smuzhiyun 	rs->rs_flags = cpu_to_le32(flags);
76*4882a593Smuzhiyun }
77*4882a593Smuzhiyun 
78*4882a593Smuzhiyun /* When replying to a status request, a node also sends back its
79*4882a593Smuzhiyun    configuration values.  The requesting node then checks that the remote
80*4882a593Smuzhiyun    node is configured the same way as itself. */
81*4882a593Smuzhiyun 
set_rcom_config(struct dlm_ls * ls,struct rcom_config * rf,uint32_t num_slots)82*4882a593Smuzhiyun static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
83*4882a593Smuzhiyun 			    uint32_t num_slots)
84*4882a593Smuzhiyun {
85*4882a593Smuzhiyun 	rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
86*4882a593Smuzhiyun 	rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
87*4882a593Smuzhiyun 
88*4882a593Smuzhiyun 	rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
89*4882a593Smuzhiyun 	rf->rf_num_slots = cpu_to_le16(num_slots);
90*4882a593Smuzhiyun 	rf->rf_generation =  cpu_to_le32(ls->ls_generation);
91*4882a593Smuzhiyun }
92*4882a593Smuzhiyun 
check_rcom_config(struct dlm_ls * ls,struct dlm_rcom * rc,int nodeid)93*4882a593Smuzhiyun static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
94*4882a593Smuzhiyun {
95*4882a593Smuzhiyun 	struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
96*4882a593Smuzhiyun 
97*4882a593Smuzhiyun 	if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
98*4882a593Smuzhiyun 		log_error(ls, "version mismatch: %x nodeid %d: %x",
99*4882a593Smuzhiyun 			  DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
100*4882a593Smuzhiyun 			  rc->rc_header.h_version);
101*4882a593Smuzhiyun 		return -EPROTO;
102*4882a593Smuzhiyun 	}
103*4882a593Smuzhiyun 
104*4882a593Smuzhiyun 	if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
105*4882a593Smuzhiyun 	    le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
106*4882a593Smuzhiyun 		log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
107*4882a593Smuzhiyun 			  ls->ls_lvblen, ls->ls_exflags, nodeid,
108*4882a593Smuzhiyun 			  le32_to_cpu(rf->rf_lvblen),
109*4882a593Smuzhiyun 			  le32_to_cpu(rf->rf_lsflags));
110*4882a593Smuzhiyun 		return -EPROTO;
111*4882a593Smuzhiyun 	}
112*4882a593Smuzhiyun 	return 0;
113*4882a593Smuzhiyun }
114*4882a593Smuzhiyun 
allow_sync_reply(struct dlm_ls * ls,uint64_t * new_seq)115*4882a593Smuzhiyun static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
116*4882a593Smuzhiyun {
117*4882a593Smuzhiyun 	spin_lock(&ls->ls_rcom_spin);
118*4882a593Smuzhiyun 	*new_seq = ++ls->ls_rcom_seq;
119*4882a593Smuzhiyun 	set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
120*4882a593Smuzhiyun 	spin_unlock(&ls->ls_rcom_spin);
121*4882a593Smuzhiyun }
122*4882a593Smuzhiyun 
disallow_sync_reply(struct dlm_ls * ls)123*4882a593Smuzhiyun static void disallow_sync_reply(struct dlm_ls *ls)
124*4882a593Smuzhiyun {
125*4882a593Smuzhiyun 	spin_lock(&ls->ls_rcom_spin);
126*4882a593Smuzhiyun 	clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
127*4882a593Smuzhiyun 	clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
128*4882a593Smuzhiyun 	spin_unlock(&ls->ls_rcom_spin);
129*4882a593Smuzhiyun }
130*4882a593Smuzhiyun 
131*4882a593Smuzhiyun /*
132*4882a593Smuzhiyun  * low nodeid gathers one slot value at a time from each node.
133*4882a593Smuzhiyun  * it sets need_slots=0, and saves rf_our_slot returned from each
134*4882a593Smuzhiyun  * rcom_config.
135*4882a593Smuzhiyun  *
136*4882a593Smuzhiyun  * other nodes gather all slot values at once from the low nodeid.
137*4882a593Smuzhiyun  * they set need_slots=1, and ignore the rf_our_slot returned from each
138*4882a593Smuzhiyun  * rcom_config.  they use the rf_num_slots returned from the low
139*4882a593Smuzhiyun  * node's rcom_config.
140*4882a593Smuzhiyun  */
141*4882a593Smuzhiyun 
dlm_rcom_status(struct dlm_ls * ls,int nodeid,uint32_t status_flags)142*4882a593Smuzhiyun int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
143*4882a593Smuzhiyun {
144*4882a593Smuzhiyun 	struct dlm_rcom *rc;
145*4882a593Smuzhiyun 	struct dlm_mhandle *mh;
146*4882a593Smuzhiyun 	int error = 0;
147*4882a593Smuzhiyun 
148*4882a593Smuzhiyun 	ls->ls_recover_nodeid = nodeid;
149*4882a593Smuzhiyun 
150*4882a593Smuzhiyun 	if (nodeid == dlm_our_nodeid()) {
151*4882a593Smuzhiyun 		rc = ls->ls_recover_buf;
152*4882a593Smuzhiyun 		rc->rc_result = dlm_recover_status(ls);
153*4882a593Smuzhiyun 		goto out;
154*4882a593Smuzhiyun 	}
155*4882a593Smuzhiyun 
156*4882a593Smuzhiyun retry:
157*4882a593Smuzhiyun 	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
158*4882a593Smuzhiyun 			    sizeof(struct rcom_status), &rc, &mh);
159*4882a593Smuzhiyun 	if (error)
160*4882a593Smuzhiyun 		goto out;
161*4882a593Smuzhiyun 
162*4882a593Smuzhiyun 	set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
163*4882a593Smuzhiyun 
164*4882a593Smuzhiyun 	allow_sync_reply(ls, &rc->rc_id);
165*4882a593Smuzhiyun 	memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
166*4882a593Smuzhiyun 
167*4882a593Smuzhiyun 	send_rcom(ls, mh, rc);
168*4882a593Smuzhiyun 
169*4882a593Smuzhiyun 	error = dlm_wait_function(ls, &rcom_response);
170*4882a593Smuzhiyun 	disallow_sync_reply(ls);
171*4882a593Smuzhiyun 	if (error == -ETIMEDOUT)
172*4882a593Smuzhiyun 		goto retry;
173*4882a593Smuzhiyun 	if (error)
174*4882a593Smuzhiyun 		goto out;
175*4882a593Smuzhiyun 
176*4882a593Smuzhiyun 	rc = ls->ls_recover_buf;
177*4882a593Smuzhiyun 
178*4882a593Smuzhiyun 	if (rc->rc_result == -ESRCH) {
179*4882a593Smuzhiyun 		/* we pretend the remote lockspace exists with 0 status */
180*4882a593Smuzhiyun 		log_debug(ls, "remote node %d not ready", nodeid);
181*4882a593Smuzhiyun 		rc->rc_result = 0;
182*4882a593Smuzhiyun 		error = 0;
183*4882a593Smuzhiyun 	} else {
184*4882a593Smuzhiyun 		error = check_rcom_config(ls, rc, nodeid);
185*4882a593Smuzhiyun 	}
186*4882a593Smuzhiyun 
187*4882a593Smuzhiyun 	/* the caller looks at rc_result for the remote recovery status */
188*4882a593Smuzhiyun  out:
189*4882a593Smuzhiyun 	return error;
190*4882a593Smuzhiyun }
191*4882a593Smuzhiyun 
receive_rcom_status(struct dlm_ls * ls,struct dlm_rcom * rc_in)192*4882a593Smuzhiyun static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
193*4882a593Smuzhiyun {
194*4882a593Smuzhiyun 	struct dlm_rcom *rc;
195*4882a593Smuzhiyun 	struct dlm_mhandle *mh;
196*4882a593Smuzhiyun 	struct rcom_status *rs;
197*4882a593Smuzhiyun 	uint32_t status;
198*4882a593Smuzhiyun 	int nodeid = rc_in->rc_header.h_nodeid;
199*4882a593Smuzhiyun 	int len = sizeof(struct rcom_config);
200*4882a593Smuzhiyun 	int num_slots = 0;
201*4882a593Smuzhiyun 	int error;
202*4882a593Smuzhiyun 
203*4882a593Smuzhiyun 	if (!dlm_slots_version(&rc_in->rc_header)) {
204*4882a593Smuzhiyun 		status = dlm_recover_status(ls);
205*4882a593Smuzhiyun 		goto do_create;
206*4882a593Smuzhiyun 	}
207*4882a593Smuzhiyun 
208*4882a593Smuzhiyun 	rs = (struct rcom_status *)rc_in->rc_buf;
209*4882a593Smuzhiyun 
210*4882a593Smuzhiyun 	if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
211*4882a593Smuzhiyun 		status = dlm_recover_status(ls);
212*4882a593Smuzhiyun 		goto do_create;
213*4882a593Smuzhiyun 	}
214*4882a593Smuzhiyun 
215*4882a593Smuzhiyun 	spin_lock(&ls->ls_recover_lock);
216*4882a593Smuzhiyun 	status = ls->ls_recover_status;
217*4882a593Smuzhiyun 	num_slots = ls->ls_num_slots;
218*4882a593Smuzhiyun 	spin_unlock(&ls->ls_recover_lock);
219*4882a593Smuzhiyun 	len += num_slots * sizeof(struct rcom_slot);
220*4882a593Smuzhiyun 
221*4882a593Smuzhiyun  do_create:
222*4882a593Smuzhiyun 	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
223*4882a593Smuzhiyun 			    len, &rc, &mh);
224*4882a593Smuzhiyun 	if (error)
225*4882a593Smuzhiyun 		return;
226*4882a593Smuzhiyun 
227*4882a593Smuzhiyun 	rc->rc_id = rc_in->rc_id;
228*4882a593Smuzhiyun 	rc->rc_seq_reply = rc_in->rc_seq;
229*4882a593Smuzhiyun 	rc->rc_result = status;
230*4882a593Smuzhiyun 
231*4882a593Smuzhiyun 	set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
232*4882a593Smuzhiyun 
233*4882a593Smuzhiyun 	if (!num_slots)
234*4882a593Smuzhiyun 		goto do_send;
235*4882a593Smuzhiyun 
236*4882a593Smuzhiyun 	spin_lock(&ls->ls_recover_lock);
237*4882a593Smuzhiyun 	if (ls->ls_num_slots != num_slots) {
238*4882a593Smuzhiyun 		spin_unlock(&ls->ls_recover_lock);
239*4882a593Smuzhiyun 		log_debug(ls, "receive_rcom_status num_slots %d to %d",
240*4882a593Smuzhiyun 			  num_slots, ls->ls_num_slots);
241*4882a593Smuzhiyun 		rc->rc_result = 0;
242*4882a593Smuzhiyun 		set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
243*4882a593Smuzhiyun 		goto do_send;
244*4882a593Smuzhiyun 	}
245*4882a593Smuzhiyun 
246*4882a593Smuzhiyun 	dlm_slots_copy_out(ls, rc);
247*4882a593Smuzhiyun 	spin_unlock(&ls->ls_recover_lock);
248*4882a593Smuzhiyun 
249*4882a593Smuzhiyun  do_send:
250*4882a593Smuzhiyun 	send_rcom(ls, mh, rc);
251*4882a593Smuzhiyun }
252*4882a593Smuzhiyun 
receive_sync_reply(struct dlm_ls * ls,struct dlm_rcom * rc_in)253*4882a593Smuzhiyun static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
254*4882a593Smuzhiyun {
255*4882a593Smuzhiyun 	spin_lock(&ls->ls_rcom_spin);
256*4882a593Smuzhiyun 	if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
257*4882a593Smuzhiyun 	    rc_in->rc_id != ls->ls_rcom_seq) {
258*4882a593Smuzhiyun 		log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
259*4882a593Smuzhiyun 			  rc_in->rc_type, rc_in->rc_header.h_nodeid,
260*4882a593Smuzhiyun 			  (unsigned long long)rc_in->rc_id,
261*4882a593Smuzhiyun 			  (unsigned long long)ls->ls_rcom_seq);
262*4882a593Smuzhiyun 		goto out;
263*4882a593Smuzhiyun 	}
264*4882a593Smuzhiyun 	memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
265*4882a593Smuzhiyun 	set_bit(LSFL_RCOM_READY, &ls->ls_flags);
266*4882a593Smuzhiyun 	clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
267*4882a593Smuzhiyun 	wake_up(&ls->ls_wait_general);
268*4882a593Smuzhiyun  out:
269*4882a593Smuzhiyun 	spin_unlock(&ls->ls_rcom_spin);
270*4882a593Smuzhiyun }
271*4882a593Smuzhiyun 
dlm_rcom_names(struct dlm_ls * ls,int nodeid,char * last_name,int last_len)272*4882a593Smuzhiyun int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
273*4882a593Smuzhiyun {
274*4882a593Smuzhiyun 	struct dlm_rcom *rc;
275*4882a593Smuzhiyun 	struct dlm_mhandle *mh;
276*4882a593Smuzhiyun 	int error = 0;
277*4882a593Smuzhiyun 
278*4882a593Smuzhiyun 	ls->ls_recover_nodeid = nodeid;
279*4882a593Smuzhiyun 
280*4882a593Smuzhiyun retry:
281*4882a593Smuzhiyun 	error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
282*4882a593Smuzhiyun 	if (error)
283*4882a593Smuzhiyun 		goto out;
284*4882a593Smuzhiyun 	memcpy(rc->rc_buf, last_name, last_len);
285*4882a593Smuzhiyun 
286*4882a593Smuzhiyun 	allow_sync_reply(ls, &rc->rc_id);
287*4882a593Smuzhiyun 	memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
288*4882a593Smuzhiyun 
289*4882a593Smuzhiyun 	send_rcom(ls, mh, rc);
290*4882a593Smuzhiyun 
291*4882a593Smuzhiyun 	error = dlm_wait_function(ls, &rcom_response);
292*4882a593Smuzhiyun 	disallow_sync_reply(ls);
293*4882a593Smuzhiyun 	if (error == -ETIMEDOUT)
294*4882a593Smuzhiyun 		goto retry;
295*4882a593Smuzhiyun  out:
296*4882a593Smuzhiyun 	return error;
297*4882a593Smuzhiyun }
298*4882a593Smuzhiyun 
receive_rcom_names(struct dlm_ls * ls,struct dlm_rcom * rc_in)299*4882a593Smuzhiyun static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
300*4882a593Smuzhiyun {
301*4882a593Smuzhiyun 	struct dlm_rcom *rc;
302*4882a593Smuzhiyun 	struct dlm_mhandle *mh;
303*4882a593Smuzhiyun 	int error, inlen, outlen, nodeid;
304*4882a593Smuzhiyun 
305*4882a593Smuzhiyun 	nodeid = rc_in->rc_header.h_nodeid;
306*4882a593Smuzhiyun 	inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
307*4882a593Smuzhiyun 	outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
308*4882a593Smuzhiyun 
309*4882a593Smuzhiyun 	error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
310*4882a593Smuzhiyun 	if (error)
311*4882a593Smuzhiyun 		return;
312*4882a593Smuzhiyun 	rc->rc_id = rc_in->rc_id;
313*4882a593Smuzhiyun 	rc->rc_seq_reply = rc_in->rc_seq;
314*4882a593Smuzhiyun 
315*4882a593Smuzhiyun 	dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
316*4882a593Smuzhiyun 			      nodeid);
317*4882a593Smuzhiyun 	send_rcom(ls, mh, rc);
318*4882a593Smuzhiyun }
319*4882a593Smuzhiyun 
dlm_send_rcom_lookup(struct dlm_rsb * r,int dir_nodeid)320*4882a593Smuzhiyun int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
321*4882a593Smuzhiyun {
322*4882a593Smuzhiyun 	struct dlm_rcom *rc;
323*4882a593Smuzhiyun 	struct dlm_mhandle *mh;
324*4882a593Smuzhiyun 	struct dlm_ls *ls = r->res_ls;
325*4882a593Smuzhiyun 	int error;
326*4882a593Smuzhiyun 
327*4882a593Smuzhiyun 	error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
328*4882a593Smuzhiyun 			    &rc, &mh);
329*4882a593Smuzhiyun 	if (error)
330*4882a593Smuzhiyun 		goto out;
331*4882a593Smuzhiyun 	memcpy(rc->rc_buf, r->res_name, r->res_length);
332*4882a593Smuzhiyun 	rc->rc_id = (unsigned long) r->res_id;
333*4882a593Smuzhiyun 
334*4882a593Smuzhiyun 	send_rcom(ls, mh, rc);
335*4882a593Smuzhiyun  out:
336*4882a593Smuzhiyun 	return error;
337*4882a593Smuzhiyun }
338*4882a593Smuzhiyun 
receive_rcom_lookup(struct dlm_ls * ls,struct dlm_rcom * rc_in)339*4882a593Smuzhiyun static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
340*4882a593Smuzhiyun {
341*4882a593Smuzhiyun 	struct dlm_rcom *rc;
342*4882a593Smuzhiyun 	struct dlm_mhandle *mh;
343*4882a593Smuzhiyun 	int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
344*4882a593Smuzhiyun 	int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
345*4882a593Smuzhiyun 
346*4882a593Smuzhiyun 	error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
347*4882a593Smuzhiyun 	if (error)
348*4882a593Smuzhiyun 		return;
349*4882a593Smuzhiyun 
350*4882a593Smuzhiyun 	/* Old code would send this special id to trigger a debug dump. */
351*4882a593Smuzhiyun 	if (rc_in->rc_id == 0xFFFFFFFF) {
352*4882a593Smuzhiyun 		log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
353*4882a593Smuzhiyun 		dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
354*4882a593Smuzhiyun 		return;
355*4882a593Smuzhiyun 	}
356*4882a593Smuzhiyun 
357*4882a593Smuzhiyun 	error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
358*4882a593Smuzhiyun 				  DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
359*4882a593Smuzhiyun 	if (error)
360*4882a593Smuzhiyun 		ret_nodeid = error;
361*4882a593Smuzhiyun 	rc->rc_result = ret_nodeid;
362*4882a593Smuzhiyun 	rc->rc_id = rc_in->rc_id;
363*4882a593Smuzhiyun 	rc->rc_seq_reply = rc_in->rc_seq;
364*4882a593Smuzhiyun 
365*4882a593Smuzhiyun 	send_rcom(ls, mh, rc);
366*4882a593Smuzhiyun }
367*4882a593Smuzhiyun 
receive_rcom_lookup_reply(struct dlm_ls * ls,struct dlm_rcom * rc_in)368*4882a593Smuzhiyun static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
369*4882a593Smuzhiyun {
370*4882a593Smuzhiyun 	dlm_recover_master_reply(ls, rc_in);
371*4882a593Smuzhiyun }
372*4882a593Smuzhiyun 
pack_rcom_lock(struct dlm_rsb * r,struct dlm_lkb * lkb,struct rcom_lock * rl)373*4882a593Smuzhiyun static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
374*4882a593Smuzhiyun 			   struct rcom_lock *rl)
375*4882a593Smuzhiyun {
376*4882a593Smuzhiyun 	memset(rl, 0, sizeof(*rl));
377*4882a593Smuzhiyun 
378*4882a593Smuzhiyun 	rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
379*4882a593Smuzhiyun 	rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
380*4882a593Smuzhiyun 	rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
381*4882a593Smuzhiyun 	rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
382*4882a593Smuzhiyun 	rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
383*4882a593Smuzhiyun 	rl->rl_rqmode = lkb->lkb_rqmode;
384*4882a593Smuzhiyun 	rl->rl_grmode = lkb->lkb_grmode;
385*4882a593Smuzhiyun 	rl->rl_status = lkb->lkb_status;
386*4882a593Smuzhiyun 	rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
387*4882a593Smuzhiyun 
388*4882a593Smuzhiyun 	if (lkb->lkb_bastfn)
389*4882a593Smuzhiyun 		rl->rl_asts |= DLM_CB_BAST;
390*4882a593Smuzhiyun 	if (lkb->lkb_astfn)
391*4882a593Smuzhiyun 		rl->rl_asts |= DLM_CB_CAST;
392*4882a593Smuzhiyun 
393*4882a593Smuzhiyun 	rl->rl_namelen = cpu_to_le16(r->res_length);
394*4882a593Smuzhiyun 	memcpy(rl->rl_name, r->res_name, r->res_length);
395*4882a593Smuzhiyun 
396*4882a593Smuzhiyun 	/* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
397*4882a593Smuzhiyun 	   If so, receive_rcom_lock_args() won't take this copy. */
398*4882a593Smuzhiyun 
399*4882a593Smuzhiyun 	if (lkb->lkb_lvbptr)
400*4882a593Smuzhiyun 		memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
401*4882a593Smuzhiyun }
402*4882a593Smuzhiyun 
dlm_send_rcom_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)403*4882a593Smuzhiyun int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
404*4882a593Smuzhiyun {
405*4882a593Smuzhiyun 	struct dlm_ls *ls = r->res_ls;
406*4882a593Smuzhiyun 	struct dlm_rcom *rc;
407*4882a593Smuzhiyun 	struct dlm_mhandle *mh;
408*4882a593Smuzhiyun 	struct rcom_lock *rl;
409*4882a593Smuzhiyun 	int error, len = sizeof(struct rcom_lock);
410*4882a593Smuzhiyun 
411*4882a593Smuzhiyun 	if (lkb->lkb_lvbptr)
412*4882a593Smuzhiyun 		len += ls->ls_lvblen;
413*4882a593Smuzhiyun 
414*4882a593Smuzhiyun 	error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
415*4882a593Smuzhiyun 	if (error)
416*4882a593Smuzhiyun 		goto out;
417*4882a593Smuzhiyun 
418*4882a593Smuzhiyun 	rl = (struct rcom_lock *) rc->rc_buf;
419*4882a593Smuzhiyun 	pack_rcom_lock(r, lkb, rl);
420*4882a593Smuzhiyun 	rc->rc_id = (unsigned long) r;
421*4882a593Smuzhiyun 
422*4882a593Smuzhiyun 	send_rcom(ls, mh, rc);
423*4882a593Smuzhiyun  out:
424*4882a593Smuzhiyun 	return error;
425*4882a593Smuzhiyun }
426*4882a593Smuzhiyun 
427*4882a593Smuzhiyun /* needs at least dlm_rcom + rcom_lock */
receive_rcom_lock(struct dlm_ls * ls,struct dlm_rcom * rc_in)428*4882a593Smuzhiyun static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
429*4882a593Smuzhiyun {
430*4882a593Smuzhiyun 	struct dlm_rcom *rc;
431*4882a593Smuzhiyun 	struct dlm_mhandle *mh;
432*4882a593Smuzhiyun 	int error, nodeid = rc_in->rc_header.h_nodeid;
433*4882a593Smuzhiyun 
434*4882a593Smuzhiyun 	dlm_recover_master_copy(ls, rc_in);
435*4882a593Smuzhiyun 
436*4882a593Smuzhiyun 	error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
437*4882a593Smuzhiyun 			    sizeof(struct rcom_lock), &rc, &mh);
438*4882a593Smuzhiyun 	if (error)
439*4882a593Smuzhiyun 		return;
440*4882a593Smuzhiyun 
441*4882a593Smuzhiyun 	/* We send back the same rcom_lock struct we received, but
442*4882a593Smuzhiyun 	   dlm_recover_master_copy() has filled in rl_remid and rl_result */
443*4882a593Smuzhiyun 
444*4882a593Smuzhiyun 	memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
445*4882a593Smuzhiyun 	rc->rc_id = rc_in->rc_id;
446*4882a593Smuzhiyun 	rc->rc_seq_reply = rc_in->rc_seq;
447*4882a593Smuzhiyun 
448*4882a593Smuzhiyun 	send_rcom(ls, mh, rc);
449*4882a593Smuzhiyun }
450*4882a593Smuzhiyun 
451*4882a593Smuzhiyun /* If the lockspace doesn't exist then still send a status message
452*4882a593Smuzhiyun    back; it's possible that it just doesn't have its global_id yet. */
453*4882a593Smuzhiyun 
dlm_send_ls_not_ready(int nodeid,struct dlm_rcom * rc_in)454*4882a593Smuzhiyun int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
455*4882a593Smuzhiyun {
456*4882a593Smuzhiyun 	struct dlm_rcom *rc;
457*4882a593Smuzhiyun 	struct rcom_config *rf;
458*4882a593Smuzhiyun 	struct dlm_mhandle *mh;
459*4882a593Smuzhiyun 	char *mb;
460*4882a593Smuzhiyun 	int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
461*4882a593Smuzhiyun 
462*4882a593Smuzhiyun 	mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
463*4882a593Smuzhiyun 	if (!mh)
464*4882a593Smuzhiyun 		return -ENOBUFS;
465*4882a593Smuzhiyun 	memset(mb, 0, mb_len);
466*4882a593Smuzhiyun 
467*4882a593Smuzhiyun 	rc = (struct dlm_rcom *) mb;
468*4882a593Smuzhiyun 
469*4882a593Smuzhiyun 	rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
470*4882a593Smuzhiyun 	rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
471*4882a593Smuzhiyun 	rc->rc_header.h_nodeid = dlm_our_nodeid();
472*4882a593Smuzhiyun 	rc->rc_header.h_length = mb_len;
473*4882a593Smuzhiyun 	rc->rc_header.h_cmd = DLM_RCOM;
474*4882a593Smuzhiyun 
475*4882a593Smuzhiyun 	rc->rc_type = DLM_RCOM_STATUS_REPLY;
476*4882a593Smuzhiyun 	rc->rc_id = rc_in->rc_id;
477*4882a593Smuzhiyun 	rc->rc_seq_reply = rc_in->rc_seq;
478*4882a593Smuzhiyun 	rc->rc_result = -ESRCH;
479*4882a593Smuzhiyun 
480*4882a593Smuzhiyun 	rf = (struct rcom_config *) rc->rc_buf;
481*4882a593Smuzhiyun 	rf->rf_lvblen = cpu_to_le32(~0U);
482*4882a593Smuzhiyun 
483*4882a593Smuzhiyun 	dlm_rcom_out(rc);
484*4882a593Smuzhiyun 	dlm_lowcomms_commit_buffer(mh);
485*4882a593Smuzhiyun 
486*4882a593Smuzhiyun 	return 0;
487*4882a593Smuzhiyun }
488*4882a593Smuzhiyun 
489*4882a593Smuzhiyun /*
490*4882a593Smuzhiyun  * Ignore messages for stage Y before we set
491*4882a593Smuzhiyun  * recover_status bit for stage X:
492*4882a593Smuzhiyun  *
493*4882a593Smuzhiyun  * recover_status = 0
494*4882a593Smuzhiyun  *
495*4882a593Smuzhiyun  * dlm_recover_members()
496*4882a593Smuzhiyun  * - send nothing
497*4882a593Smuzhiyun  * - recv nothing
498*4882a593Smuzhiyun  * - ignore NAMES, NAMES_REPLY
499*4882a593Smuzhiyun  * - ignore LOOKUP, LOOKUP_REPLY
500*4882a593Smuzhiyun  * - ignore LOCK, LOCK_REPLY
501*4882a593Smuzhiyun  *
502*4882a593Smuzhiyun  * recover_status |= NODES
503*4882a593Smuzhiyun  *
504*4882a593Smuzhiyun  * dlm_recover_members_wait()
505*4882a593Smuzhiyun  *
506*4882a593Smuzhiyun  * dlm_recover_directory()
507*4882a593Smuzhiyun  * - send NAMES
508*4882a593Smuzhiyun  * - recv NAMES_REPLY
509*4882a593Smuzhiyun  * - ignore LOOKUP, LOOKUP_REPLY
510*4882a593Smuzhiyun  * - ignore LOCK, LOCK_REPLY
511*4882a593Smuzhiyun  *
512*4882a593Smuzhiyun  * recover_status |= DIR
513*4882a593Smuzhiyun  *
514*4882a593Smuzhiyun  * dlm_recover_directory_wait()
515*4882a593Smuzhiyun  *
516*4882a593Smuzhiyun  * dlm_recover_masters()
517*4882a593Smuzhiyun  * - send LOOKUP
518*4882a593Smuzhiyun  * - recv LOOKUP_REPLY
519*4882a593Smuzhiyun  *
520*4882a593Smuzhiyun  * dlm_recover_locks()
521*4882a593Smuzhiyun  * - send LOCKS
522*4882a593Smuzhiyun  * - recv LOCKS_REPLY
523*4882a593Smuzhiyun  *
524*4882a593Smuzhiyun  * recover_status |= LOCKS
525*4882a593Smuzhiyun  *
526*4882a593Smuzhiyun  * dlm_recover_locks_wait()
527*4882a593Smuzhiyun  *
528*4882a593Smuzhiyun  * recover_status |= DONE
529*4882a593Smuzhiyun  */
530*4882a593Smuzhiyun 
531*4882a593Smuzhiyun /* Called by dlm_recv; corresponds to dlm_receive_message() but special
532*4882a593Smuzhiyun    recovery-only comms are sent through here. */
533*4882a593Smuzhiyun 
dlm_receive_rcom(struct dlm_ls * ls,struct dlm_rcom * rc,int nodeid)534*4882a593Smuzhiyun void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
535*4882a593Smuzhiyun {
536*4882a593Smuzhiyun 	int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
537*4882a593Smuzhiyun 	int stop, reply = 0, names = 0, lookup = 0, lock = 0;
538*4882a593Smuzhiyun 	uint32_t status;
539*4882a593Smuzhiyun 	uint64_t seq;
540*4882a593Smuzhiyun 
541*4882a593Smuzhiyun 	switch (rc->rc_type) {
542*4882a593Smuzhiyun 	case DLM_RCOM_STATUS_REPLY:
543*4882a593Smuzhiyun 		reply = 1;
544*4882a593Smuzhiyun 		break;
545*4882a593Smuzhiyun 	case DLM_RCOM_NAMES:
546*4882a593Smuzhiyun 		names = 1;
547*4882a593Smuzhiyun 		break;
548*4882a593Smuzhiyun 	case DLM_RCOM_NAMES_REPLY:
549*4882a593Smuzhiyun 		names = 1;
550*4882a593Smuzhiyun 		reply = 1;
551*4882a593Smuzhiyun 		break;
552*4882a593Smuzhiyun 	case DLM_RCOM_LOOKUP:
553*4882a593Smuzhiyun 		lookup = 1;
554*4882a593Smuzhiyun 		break;
555*4882a593Smuzhiyun 	case DLM_RCOM_LOOKUP_REPLY:
556*4882a593Smuzhiyun 		lookup = 1;
557*4882a593Smuzhiyun 		reply = 1;
558*4882a593Smuzhiyun 		break;
559*4882a593Smuzhiyun 	case DLM_RCOM_LOCK:
560*4882a593Smuzhiyun 		lock = 1;
561*4882a593Smuzhiyun 		break;
562*4882a593Smuzhiyun 	case DLM_RCOM_LOCK_REPLY:
563*4882a593Smuzhiyun 		lock = 1;
564*4882a593Smuzhiyun 		reply = 1;
565*4882a593Smuzhiyun 		break;
566*4882a593Smuzhiyun 	}
567*4882a593Smuzhiyun 
568*4882a593Smuzhiyun 	spin_lock(&ls->ls_recover_lock);
569*4882a593Smuzhiyun 	status = ls->ls_recover_status;
570*4882a593Smuzhiyun 	stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
571*4882a593Smuzhiyun 	seq = ls->ls_recover_seq;
572*4882a593Smuzhiyun 	spin_unlock(&ls->ls_recover_lock);
573*4882a593Smuzhiyun 
574*4882a593Smuzhiyun 	if (stop && (rc->rc_type != DLM_RCOM_STATUS))
575*4882a593Smuzhiyun 		goto ignore;
576*4882a593Smuzhiyun 
577*4882a593Smuzhiyun 	if (reply && (rc->rc_seq_reply != seq))
578*4882a593Smuzhiyun 		goto ignore;
579*4882a593Smuzhiyun 
580*4882a593Smuzhiyun 	if (!(status & DLM_RS_NODES) && (names || lookup || lock))
581*4882a593Smuzhiyun 		goto ignore;
582*4882a593Smuzhiyun 
583*4882a593Smuzhiyun 	if (!(status & DLM_RS_DIR) && (lookup || lock))
584*4882a593Smuzhiyun 		goto ignore;
585*4882a593Smuzhiyun 
586*4882a593Smuzhiyun 	switch (rc->rc_type) {
587*4882a593Smuzhiyun 	case DLM_RCOM_STATUS:
588*4882a593Smuzhiyun 		receive_rcom_status(ls, rc);
589*4882a593Smuzhiyun 		break;
590*4882a593Smuzhiyun 
591*4882a593Smuzhiyun 	case DLM_RCOM_NAMES:
592*4882a593Smuzhiyun 		receive_rcom_names(ls, rc);
593*4882a593Smuzhiyun 		break;
594*4882a593Smuzhiyun 
595*4882a593Smuzhiyun 	case DLM_RCOM_LOOKUP:
596*4882a593Smuzhiyun 		receive_rcom_lookup(ls, rc);
597*4882a593Smuzhiyun 		break;
598*4882a593Smuzhiyun 
599*4882a593Smuzhiyun 	case DLM_RCOM_LOCK:
600*4882a593Smuzhiyun 		if (rc->rc_header.h_length < lock_size)
601*4882a593Smuzhiyun 			goto Eshort;
602*4882a593Smuzhiyun 		receive_rcom_lock(ls, rc);
603*4882a593Smuzhiyun 		break;
604*4882a593Smuzhiyun 
605*4882a593Smuzhiyun 	case DLM_RCOM_STATUS_REPLY:
606*4882a593Smuzhiyun 		receive_sync_reply(ls, rc);
607*4882a593Smuzhiyun 		break;
608*4882a593Smuzhiyun 
609*4882a593Smuzhiyun 	case DLM_RCOM_NAMES_REPLY:
610*4882a593Smuzhiyun 		receive_sync_reply(ls, rc);
611*4882a593Smuzhiyun 		break;
612*4882a593Smuzhiyun 
613*4882a593Smuzhiyun 	case DLM_RCOM_LOOKUP_REPLY:
614*4882a593Smuzhiyun 		receive_rcom_lookup_reply(ls, rc);
615*4882a593Smuzhiyun 		break;
616*4882a593Smuzhiyun 
617*4882a593Smuzhiyun 	case DLM_RCOM_LOCK_REPLY:
618*4882a593Smuzhiyun 		if (rc->rc_header.h_length < lock_size)
619*4882a593Smuzhiyun 			goto Eshort;
620*4882a593Smuzhiyun 		dlm_recover_process_copy(ls, rc);
621*4882a593Smuzhiyun 		break;
622*4882a593Smuzhiyun 
623*4882a593Smuzhiyun 	default:
624*4882a593Smuzhiyun 		log_error(ls, "receive_rcom bad type %d", rc->rc_type);
625*4882a593Smuzhiyun 	}
626*4882a593Smuzhiyun 	return;
627*4882a593Smuzhiyun 
628*4882a593Smuzhiyun ignore:
629*4882a593Smuzhiyun 	log_limit(ls, "dlm_receive_rcom ignore msg %d "
630*4882a593Smuzhiyun 		  "from %d %llu %llu recover seq %llu sts %x gen %u",
631*4882a593Smuzhiyun 		   rc->rc_type,
632*4882a593Smuzhiyun 		   nodeid,
633*4882a593Smuzhiyun 		   (unsigned long long)rc->rc_seq,
634*4882a593Smuzhiyun 		   (unsigned long long)rc->rc_seq_reply,
635*4882a593Smuzhiyun 		   (unsigned long long)seq,
636*4882a593Smuzhiyun 		   status, ls->ls_generation);
637*4882a593Smuzhiyun 	return;
638*4882a593Smuzhiyun Eshort:
639*4882a593Smuzhiyun 	log_error(ls, "recovery message %d from %d is too short",
640*4882a593Smuzhiyun 		  rc->rc_type, nodeid);
641*4882a593Smuzhiyun }
642*4882a593Smuzhiyun 
643