1 // Copyright 2006 Google Inc. All Rights Reserved.
2 
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 // worker.cc : individual tasks that can be run in combination to
16 // stress the system
17 
18 #include <errno.h>
19 #include <pthread.h>
20 #include <sched.h>
21 #include <signal.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <string.h>
26 #include <time.h>
27 #include <unistd.h>
28 
29 #include <sys/select.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <sys/times.h>
33 
34 // These are necessary, but on by default
35 // #define __USE_GNU
36 // #define __USE_LARGEFILE64
37 #include <fcntl.h>
38 #include <sys/socket.h>
39 #include <netdb.h>
40 #include <arpa/inet.h>
41 #include <linux/unistd.h>  // for gettid
42 
43 // For size of block device
44 #include <sys/ioctl.h>
45 #include <linux/fs.h>
46 // For asynchronous I/O
47 #ifdef HAVE_LIBAIO_H
48 #include <libaio.h>
49 #endif
50 
51 #include <sys/syscall.h>
52 
53 #include <set>
54 #include <string>
55 
56 // This file must work with autoconf on its public version,
57 // so these includes are correct.
58 #include "error_diag.h"  // NOLINT
59 #include "os.h"          // NOLINT
60 #include "pattern.h"     // NOLINT
61 #include "queue.h"       // NOLINT
62 #include "sat.h"         // NOLINT
63 #include "sattypes.h"    // NOLINT
64 #include "worker.h"      // NOLINT
65 
66 // Syscalls
67 // Why ubuntu, do you hate gettid so bad?
68 #if !defined(__NR_gettid)
69   #define __NR_gettid             224
70 #endif
71 
72 #define gettid() syscall(__NR_gettid)
73 #if !defined(CPU_SETSIZE)
74 _syscall3(int, sched_getaffinity, pid_t, pid,
75           unsigned int, len, cpu_set_t*, mask)
76 _syscall3(int, sched_setaffinity, pid_t, pid,
77           unsigned int, len, cpu_set_t*, mask)
78 #endif
79 
80 namespace {
81   // Work around the sad fact that there are two (gnu, xsi) incompatible
82   // versions of strerror_r floating around google. Awesome.
sat_strerror(int err,char * buf,int len)83   bool sat_strerror(int err, char *buf, int len) {
84     buf[0] = 0;
85     char *errmsg = reinterpret_cast<char*>(strerror_r(err, buf, len));
86     int retval = reinterpret_cast<int64>(errmsg);
87     if (retval == 0)
88       return true;
89     if (retval == -1)
90       return false;
91     if (errmsg != buf) {
92       strncpy(buf, errmsg, len);
93       buf[len - 1] = 0;
94     }
95     return true;
96   }
97 
98 
addr_to_tag(void * address)99   inline uint64 addr_to_tag(void *address) {
100     return reinterpret_cast<uint64>(address);
101   }
102 }  // namespace
103 
104 #if !defined(O_DIRECT)
105 // Sometimes this isn't available.
106 // Disregard if it's not defined.
107   #define O_DIRECT            0
108 #endif
109 
110 // A struct to hold captured errors, for later reporting.
111 struct ErrorRecord {
112   uint64 actual;  // This is the actual value read.
113   uint64 reread;  // This is the actual value, reread.
114   uint64 expected;  // This is what it should have been.
115   uint64 *vaddr;  // This is where it was (or wasn't).
116   char *vbyteaddr;  // This is byte specific where the data was (or wasn't).
117   uint64 paddr;  // This is the bus address, if available.
118   uint64 *tagvaddr;  // This holds the tag value if this data was tagged.
119   uint64 tagpaddr;  // This holds the physical address corresponding to the tag.
120   uint32 lastcpu;  // This holds the CPU recorded as probably writing this data.
121   const char *patternname;  // This holds the pattern name of the expected data.
122 };
123 
124 // This is a helper function to create new threads with pthreads.
ThreadSpawnerGeneric(void * ptr)125 static void *ThreadSpawnerGeneric(void *ptr) {
126   WorkerThread *worker = static_cast<WorkerThread*>(ptr);
127   worker->StartRoutine();
128   return NULL;
129 }
130 
Initialize()131 void WorkerStatus::Initialize() {
132   sat_assert(0 == pthread_mutex_init(&num_workers_mutex_, NULL));
133 
134   pthread_rwlockattr_t attrs;
135   sat_assert(0 == pthread_rwlockattr_init(&attrs));
136   // Avoid writer lock starvation.
137   sat_assert(0 == pthread_rwlockattr_setkind_np(
138                       &attrs, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP));
139   sat_assert(0 == pthread_rwlock_init(&status_rwlock_, &attrs));
140 
141 #ifdef HAVE_PTHREAD_BARRIERS
142   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL,
143                                        num_workers_ + 1));
144   sat_assert(0 == pthread_rwlock_init(&pause_rwlock_, &attrs));
145 #endif
146 
147   sat_assert(0 == pthread_rwlockattr_destroy(&attrs));
148 }
149 
Destroy()150 void WorkerStatus::Destroy() {
151   sat_assert(0 == pthread_mutex_destroy(&num_workers_mutex_));
152   sat_assert(0 == pthread_rwlock_destroy(&status_rwlock_));
153 #ifdef HAVE_PTHREAD_BARRIERS
154   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
155 #endif
156 }
157 
PauseWorkers()158 void WorkerStatus::PauseWorkers() {
159   if (SetStatus(PAUSE) != PAUSE)
160     WaitOnPauseBarrier();
161 }
162 
ResumeWorkers()163 void WorkerStatus::ResumeWorkers() {
164   if (SetStatus(RUN) == PAUSE)
165     WaitOnPauseBarrier();
166 }
167 
StopWorkers()168 void WorkerStatus::StopWorkers() {
169   if (SetStatus(STOP) == PAUSE)
170     WaitOnPauseBarrier();
171 }
172 
ContinueRunning(bool * paused)173 bool WorkerStatus::ContinueRunning(bool *paused) {
174   // This loop is an optimization.  We use it to immediately re-check the status
175   // after resuming from a pause, instead of returning and waiting for the next
176   // call to this function.
177   if (paused) {
178     *paused = false;
179   }
180   for (;;) {
181     switch (GetStatus()) {
182       case RUN:
183         return true;
184       case PAUSE:
185         // Wait for the other workers to call this function so that
186         // PauseWorkers() can return.
187         WaitOnPauseBarrier();
188         // Wait for ResumeWorkers() to be called.
189         WaitOnPauseBarrier();
190         // Indicate that a pause occurred.
191         if (paused) {
192           *paused = true;
193         }
194         break;
195       case STOP:
196         return false;
197     }
198   }
199 }
200 
ContinueRunningNoPause()201 bool WorkerStatus::ContinueRunningNoPause() {
202   return (GetStatus() != STOP);
203 }
204 
RemoveSelf()205 void WorkerStatus::RemoveSelf() {
206   // Acquire a read lock on status_rwlock_ while (status_ != PAUSE).
207   for (;;) {
208     AcquireStatusReadLock();
209     if (status_ != PAUSE)
210       break;
211     // We need to obey PauseWorkers() just like ContinueRunning() would, so that
212     // the other threads won't wait on pause_barrier_ forever.
213     ReleaseStatusLock();
214     // Wait for the other workers to call this function so that PauseWorkers()
215     // can return.
216     WaitOnPauseBarrier();
217     // Wait for ResumeWorkers() to be called.
218     WaitOnPauseBarrier();
219   }
220 
221   // This lock would be unnecessary if we held a write lock instead of a read
222   // lock on status_rwlock_, but that would also force all threads calling
223   // ContinueRunning() to wait on this one.  Using a separate lock avoids that.
224   AcquireNumWorkersLock();
225   // Decrement num_workers_ and reinitialize pause_barrier_, which we know isn't
226   // in use because (status != PAUSE).
227 #ifdef HAVE_PTHREAD_BARRIERS
228   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
229   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL, num_workers_));
230 #endif
231   --num_workers_;
232   ReleaseNumWorkersLock();
233 
234   // Release status_rwlock_.
235   ReleaseStatusLock();
236 }
237 
238 
239 // Parent thread class.
WorkerThread()240 WorkerThread::WorkerThread() {
241   status_ = false;
242   pages_copied_ = 0;
243   errorcount_ = 0;
244   runduration_usec_ = 1;
245   priority_ = Normal;
246   worker_status_ = NULL;
247   thread_spawner_ = &ThreadSpawnerGeneric;
248   tag_mode_ = false;
249 }
250 
~WorkerThread()251 WorkerThread::~WorkerThread() {}
252 
253 // Constructors. Just init some default values.
FillThread()254 FillThread::FillThread() {
255   num_pages_to_fill_ = 0;
256 }
257 
258 // Initialize file name to empty.
FileThread()259 FileThread::FileThread() {
260   filename_ = "";
261   devicename_ = "";
262   pass_ = 0;
263   page_io_ = true;
264   crc_page_ = -1;
265   local_page_ = NULL;
266 }
267 
268 // If file thread used bounce buffer in memory, account for the extra
269 // copy for memory bandwidth calculation.
GetMemoryCopiedData()270 float FileThread::GetMemoryCopiedData() {
271   if (!os_->normal_mem())
272     return GetCopiedData();
273   else
274     return 0;
275 }
276 
277 // Initialize target hostname to be invalid.
NetworkThread()278 NetworkThread::NetworkThread() {
279   snprintf(ipaddr_, sizeof(ipaddr_), "Unknown");
280   sock_ = 0;
281 }
282 
283 // Initialize?
NetworkSlaveThread()284 NetworkSlaveThread::NetworkSlaveThread() {
285 }
286 
287 // Initialize?
NetworkListenThread()288 NetworkListenThread::NetworkListenThread() {
289 }
290 
291 // Init member variables.
InitThread(int thread_num_init,class Sat * sat_init,class OsLayer * os_init,class PatternList * patternlist_init,WorkerStatus * worker_status)292 void WorkerThread::InitThread(int thread_num_init,
293                               class Sat *sat_init,
294                               class OsLayer *os_init,
295                               class PatternList *patternlist_init,
296                               WorkerStatus *worker_status) {
297   sat_assert(worker_status);
298   worker_status->AddWorkers(1);
299 
300   thread_num_ = thread_num_init;
301   sat_ = sat_init;
302   os_ = os_init;
303   patternlist_ = patternlist_init;
304   worker_status_ = worker_status;
305 
306   AvailableCpus(&cpu_mask_);
307   tag_ = 0xffffffff;
308 
309   tag_mode_ = sat_->tag_mode();
310 }
311 
312 
313 // Use pthreads to prioritize a system thread.
InitPriority()314 bool WorkerThread::InitPriority() {
315   // This doesn't affect performance that much, and may not be too safe.
316 
317   bool ret = BindToCpus(&cpu_mask_);
318   if (!ret)
319     logprintf(11, "Log: Bind to %s failed.\n",
320               cpuset_format(&cpu_mask_).c_str());
321 
322   logprintf(11, "Log: Thread %d running on core ID %d mask %s (%s).\n",
323             thread_num_, sched_getcpu(),
324             CurrentCpusFormat().c_str(),
325             cpuset_format(&cpu_mask_).c_str());
326 #if 0
327   if (priority_ == High) {
328     sched_param param;
329     param.sched_priority = 1;
330     // Set the priority; others are unchanged.
331     logprintf(0, "Log: Changing priority to SCHED_FIFO %d\n",
332               param.sched_priority);
333     if (sched_setscheduler(0, SCHED_FIFO, &param)) {
334       char buf[256];
335       sat_strerror(errno, buf, sizeof(buf));
336       logprintf(0, "Process Error: sched_setscheduler "
337                    "failed - error %d %s\n",
338                 errno, buf);
339     }
340   }
341 #endif
342   return true;
343 }
344 
345 // Use pthreads to create a system thread.
SpawnThread()346 int WorkerThread::SpawnThread() {
347   // Create the new thread.
348   int result = pthread_create(&thread_, NULL, thread_spawner_, this);
349   if (result) {
350     char buf[256];
351     sat_strerror(result, buf, sizeof(buf));
352     logprintf(0, "Process Error: pthread_create "
353                   "failed - error %d %s\n", result,
354               buf);
355     status_ = false;
356     return false;
357   }
358 
359   // 0 is pthreads success.
360   return true;
361 }
362 
363 // Kill the worker thread with SIGINT.
KillThread()364 bool WorkerThread::KillThread() {
365   return (pthread_kill(thread_, SIGINT) == 0);
366 }
367 
368 // Block until thread has exited.
JoinThread()369 bool WorkerThread::JoinThread() {
370   int result = pthread_join(thread_, NULL);
371 
372   if (result) {
373     logprintf(0, "Process Error: pthread_join failed - error %d\n", result);
374     status_ = false;
375   }
376 
377   // 0 is pthreads success.
378   return (!result);
379 }
380 
381 
StartRoutine()382 void WorkerThread::StartRoutine() {
383   InitPriority();
384   StartThreadTimer();
385   Work();
386   StopThreadTimer();
387   worker_status_->RemoveSelf();
388 }
389 
390 
391 // Thread work loop. Execute until marked finished.
Work()392 bool WorkerThread::Work() {
393   do {
394     logprintf(9, "Log: ...\n");
395     // Sleep for 1 second.
396     sat_sleep(1);
397   } while (IsReadyToRun());
398 
399   return false;
400 }
401 
402 
403 // Returns CPU mask of CPUs available to this process,
404 // Conceptually, each bit represents a logical CPU, ie:
405 //   mask = 3  (11b):   cpu0, 1
406 //   mask = 13 (1101b): cpu0, 2, 3
AvailableCpus(cpu_set_t * cpuset)407 bool WorkerThread::AvailableCpus(cpu_set_t *cpuset) {
408   CPU_ZERO(cpuset);
409 #ifdef HAVE_SCHED_GETAFFINITY
410   return sched_getaffinity(getppid(), sizeof(*cpuset), cpuset) == 0;
411 #else
412   return 0;
413 #endif
414 }
415 
416 
417 // Returns CPU mask of CPUs this thread is bound to,
418 // Conceptually, each bit represents a logical CPU, ie:
419 //   mask = 3  (11b):   cpu0, 1
420 //   mask = 13 (1101b): cpu0, 2, 3
CurrentCpus(cpu_set_t * cpuset)421 bool WorkerThread::CurrentCpus(cpu_set_t *cpuset) {
422   CPU_ZERO(cpuset);
423 #ifdef HAVE_SCHED_GETAFFINITY
424   return sched_getaffinity(0, sizeof(*cpuset), cpuset) == 0;
425 #else
426   return 0;
427 #endif
428 }
429 
430 
431 // Bind worker thread to specified CPU(s)
432 //   Args:
433 //     thread_mask: cpu_set_t representing CPUs, ie
434 //                  mask = 1  (01b):   cpu0
435 //                  mask = 3  (11b):   cpu0, 1
436 //                  mask = 13 (1101b): cpu0, 2, 3
437 //
438 //   Returns true on success, false otherwise.
BindToCpus(const cpu_set_t * thread_mask)439 bool WorkerThread::BindToCpus(const cpu_set_t *thread_mask) {
440   cpu_set_t process_mask;
441   AvailableCpus(&process_mask);
442   if (cpuset_isequal(thread_mask, &process_mask))
443     return true;
444 
445   logprintf(11, "Log: available CPU mask - %s\n",
446             cpuset_format(&process_mask).c_str());
447   if (!cpuset_issubset(thread_mask, &process_mask)) {
448     // Invalid cpu_mask, ie cpu not allocated to this process or doesn't exist.
449     logprintf(0, "Log: requested CPUs %s not a subset of available %s\n",
450               cpuset_format(thread_mask).c_str(),
451               cpuset_format(&process_mask).c_str());
452     return false;
453   }
454 #ifdef HAVE_SCHED_GETAFFINITY
455   return (sched_setaffinity(gettid(), sizeof(*thread_mask), thread_mask) == 0);
456 #else
457   return 0;
458 #endif
459 }
460 
461 
462 // A worker thread can yield itself to give up CPU until it's scheduled again.
463 //   Returns true on success, false on error.
YieldSelf()464 bool WorkerThread::YieldSelf() {
465   return (sched_yield() == 0);
466 }
467 
468 
469 // Fill this page with its pattern.
FillPage(struct page_entry * pe)470 bool WorkerThread::FillPage(struct page_entry *pe) {
471   // Error check arguments.
472   if (pe == 0) {
473     logprintf(0, "Process Error: Fill Page entry null\n");
474     return 0;
475   }
476 
477   // Tag this page as written from the current CPU.
478   pe->lastcpu = sched_getcpu();
479 
480   // Mask is the bitmask of indexes used by the pattern.
481   // It is the pattern size -1. Size is always a power of 2.
482   uint64 *memwords = static_cast<uint64*>(pe->addr);
483   int length = sat_->page_length();
484 
485   if (tag_mode_) {
486     // Select tag or data as appropriate.
487     for (int i = 0; i < length / wordsize_; i++) {
488       datacast_t data;
489 
490       if ((i & 0x7) == 0) {
491         data.l64 = addr_to_tag(&memwords[i]);
492       } else {
493         data.l32.l = pe->pattern->pattern(i << 1);
494         data.l32.h = pe->pattern->pattern((i << 1) + 1);
495       }
496       memwords[i] = data.l64;
497     }
498   } else {
499     // Just fill in untagged data directly.
500     for (int i = 0; i < length / wordsize_; i++) {
501       datacast_t data;
502 
503       data.l32.l = pe->pattern->pattern(i << 1);
504       data.l32.h = pe->pattern->pattern((i << 1) + 1);
505       memwords[i] = data.l64;
506     }
507   }
508 
509   return 1;
510 }
511 
512 
513 // Tell the thread how many pages to fill.
SetFillPages(int64 num_pages_to_fill_init)514 void FillThread::SetFillPages(int64 num_pages_to_fill_init) {
515   num_pages_to_fill_ = num_pages_to_fill_init;
516 }
517 
518 // Fill this page with a random pattern.
FillPageRandom(struct page_entry * pe)519 bool FillThread::FillPageRandom(struct page_entry *pe) {
520   // Error check arguments.
521   if (pe == 0) {
522     logprintf(0, "Process Error: Fill Page entry null\n");
523     return 0;
524   }
525   if ((patternlist_ == 0) || (patternlist_->Size() == 0)) {
526     logprintf(0, "Process Error: No data patterns available\n");
527     return 0;
528   }
529 
530   // Choose a random pattern for this block.
531   pe->pattern = patternlist_->GetRandomPattern();
532   pe->lastcpu = sched_getcpu();
533 
534   if (pe->pattern == 0) {
535     logprintf(0, "Process Error: Null data pattern\n");
536     return 0;
537   }
538 
539   // Actually fill the page.
540   return FillPage(pe);
541 }
542 
543 
544 // Memory fill work loop. Execute until alloted pages filled.
Work()545 bool FillThread::Work() {
546   bool result = true;
547 
548   logprintf(9, "Log: Starting fill thread %d\n", thread_num_);
549 
550   // We want to fill num_pages_to_fill pages, and
551   // stop when we've filled that many.
552   // We also want to capture early break
553   struct page_entry pe;
554   int64 loops = 0;
555   while (IsReadyToRun() && (loops < num_pages_to_fill_)) {
556     result = result && sat_->GetEmpty(&pe);
557     if (!result) {
558       logprintf(0, "Process Error: fill_thread failed to pop pages, "
559                 "bailing\n");
560       break;
561     }
562 
563     // Fill the page with pattern
564     result = result && FillPageRandom(&pe);
565     if (!result) break;
566 
567     // Put the page back on the queue.
568     result = result && sat_->PutValid(&pe);
569     if (!result) {
570       logprintf(0, "Process Error: fill_thread failed to push pages, "
571                 "bailing\n");
572       break;
573     }
574     loops++;
575   }
576 
577   // Fill in thread status.
578   pages_copied_ = loops;
579   status_ = result;
580   logprintf(9, "Log: Completed %d: Fill thread. Status %d, %d pages filled\n",
581             thread_num_, status_, pages_copied_);
582   return result;
583 }
584 
585 
586 // Print error information about a data miscompare.
ProcessError(struct ErrorRecord * error,int priority,const char * message)587 void WorkerThread::ProcessError(struct ErrorRecord *error,
588                                 int priority,
589                                 const char *message) {
590   char dimm_string[256] = "";
591 
592   int core_id = sched_getcpu();
593 
594   // Determine if this is a write or read error.
595   os_->Flush(error->vaddr);
596   error->reread = *(error->vaddr);
597 
598   char *good = reinterpret_cast<char*>(&(error->expected));
599   char *bad = reinterpret_cast<char*>(&(error->actual));
600 
601   sat_assert(error->expected != error->actual);
602   unsigned int offset = 0;
603   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
604     if (good[offset] != bad[offset])
605       break;
606   }
607 
608   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
609 
610   // Find physical address if possible.
611   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
612 
613   // Pretty print DIMM mapping if available.
614   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
615 
616   // Report parseable error.
617   if (priority < 5) {
618     // Run miscompare error through diagnoser for logging and reporting.
619     os_->error_diagnoser_->AddMiscompareError(dimm_string,
620                                               reinterpret_cast<uint64>
621                                               (error->vaddr), 1);
622 
623     logprintf(priority,
624               "%s: miscompare on CPU %d(<-%d) at %p(0x%llx:%s): "
625               "read:0x%016llx, reread:0x%016llx expected:0x%016llx. '%s'%s.\n",
626               message,
627               core_id,
628               error->lastcpu,
629               error->vaddr,
630               error->paddr,
631               dimm_string,
632               error->actual,
633               error->reread,
634               error->expected,
635               (error->patternname) ? error->patternname : "None",
636               (error->reread == error->expected) ? " read error" : "");
637   }
638 
639 
640   // Overwrite incorrect data with correct data to prevent
641   // future miscompares when this data is reused.
642   *(error->vaddr) = error->expected;
643   os_->Flush(error->vaddr);
644 }
645 
646 
647 
648 // Print error information about a data miscompare.
ProcessError(struct ErrorRecord * error,int priority,const char * message)649 void FileThread::ProcessError(struct ErrorRecord *error,
650                               int priority,
651                               const char *message) {
652   char dimm_string[256] = "";
653 
654   // Determine if this is a write or read error.
655   os_->Flush(error->vaddr);
656   error->reread = *(error->vaddr);
657 
658   char *good = reinterpret_cast<char*>(&(error->expected));
659   char *bad = reinterpret_cast<char*>(&(error->actual));
660 
661   sat_assert(error->expected != error->actual);
662   unsigned int offset = 0;
663   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
664     if (good[offset] != bad[offset])
665       break;
666   }
667 
668   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
669 
670   // Find physical address if possible.
671   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
672 
673   // Pretty print DIMM mapping if available.
674   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
675 
676   // If crc_page_ is valid, ie checking content read back from file,
677   // track src/dst memory addresses. Otherwise catagorize as general
678   // mememory miscompare for CRC checking everywhere else.
679   if (crc_page_ != -1) {
680     int miscompare_byteoffset = static_cast<char*>(error->vbyteaddr) -
681                                 static_cast<char*>(page_recs_[crc_page_].dst);
682     os_->error_diagnoser_->AddHDDMiscompareError(devicename_,
683                                                  crc_page_,
684                                                  miscompare_byteoffset,
685                                                  page_recs_[crc_page_].src,
686                                                  page_recs_[crc_page_].dst);
687   } else {
688     os_->error_diagnoser_->AddMiscompareError(dimm_string,
689                                               reinterpret_cast<uint64>
690                                               (error->vaddr), 1);
691   }
692 
693   logprintf(priority,
694             "%s: miscompare on %s at %p(0x%llx:%s): read:0x%016llx, "
695             "reread:0x%016llx expected:0x%016llx\n",
696             message,
697             devicename_.c_str(),
698             error->vaddr,
699             error->paddr,
700             dimm_string,
701             error->actual,
702             error->reread,
703             error->expected,
704             (error->patternname) ? error->patternname : "None");
705 
706   // Overwrite incorrect data with correct data to prevent
707   // future miscompares when this data is reused.
708   *(error->vaddr) = error->expected;
709   os_->Flush(error->vaddr);
710 }
711 
712 
713 // Do a word by word result check of a region.
714 // Print errors on mismatches.
CheckRegion(void * addr,class Pattern * pattern,uint32 lastcpu,int64 length,int offset,int64 pattern_offset)715 int WorkerThread::CheckRegion(void *addr,
716                               class Pattern *pattern,
717                               uint32 lastcpu,
718                               int64 length,
719                               int offset,
720                               int64 pattern_offset) {
721   uint64 *memblock = static_cast<uint64*>(addr);
722   const int kErrorLimit = 128;
723   int errors = 0;
724   int overflowerrors = 0;  // Count of overflowed errors.
725   bool page_error = false;
726   string errormessage("Hardware Error");
727   struct ErrorRecord
728     recorded[kErrorLimit];  // Queued errors for later printing.
729 
730   // For each word in the data region.
731   for (int i = 0; i < length / wordsize_; i++) {
732     uint64 actual = memblock[i];
733     uint64 expected;
734 
735     // Determine the value that should be there.
736     datacast_t data;
737     int index = 2 * i + pattern_offset;
738     data.l32.l = pattern->pattern(index);
739     data.l32.h = pattern->pattern(index + 1);
740     expected = data.l64;
741     // Check tags if necessary.
742     if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
743       expected = addr_to_tag(&memblock[i]);
744     }
745 
746 
747     // If the value is incorrect, save an error record for later printing.
748     if (actual != expected) {
749       if (errors < kErrorLimit) {
750         recorded[errors].actual = actual;
751         recorded[errors].expected = expected;
752         recorded[errors].vaddr = &memblock[i];
753         recorded[errors].patternname = pattern->name();
754         recorded[errors].lastcpu = lastcpu;
755         errors++;
756       } else {
757         page_error = true;
758         // If we have overflowed the error queue, just print the errors now.
759         logprintf(10, "Log: Error record overflow, too many miscompares!\n");
760         errormessage = "Page Error";
761         break;
762       }
763     }
764   }
765 
766   // Find if this is a whole block corruption.
767   if (page_error && !tag_mode_) {
768     int patsize = patternlist_->Size();
769     for (int pat = 0; pat < patsize; pat++) {
770       class Pattern *altpattern = patternlist_->GetPattern(pat);
771       const int kGood = 0;
772       const int kBad = 1;
773       const int kGoodAgain = 2;
774       const int kNoMatch = 3;
775       int state = kGood;
776       unsigned int badstart = 0;
777       unsigned int badend = 0;
778 
779       // Don't match against ourself!
780       if (pattern == altpattern)
781         continue;
782 
783       for (int i = 0; i < length / wordsize_; i++) {
784         uint64 actual = memblock[i];
785         datacast_t expected;
786         datacast_t possible;
787 
788         // Determine the value that should be there.
789         int index = 2 * i + pattern_offset;
790 
791         expected.l32.l = pattern->pattern(index);
792         expected.l32.h = pattern->pattern(index + 1);
793 
794         possible.l32.l = pattern->pattern(index);
795         possible.l32.h = pattern->pattern(index + 1);
796 
797         if (state == kGood) {
798           if (actual == expected.l64) {
799             continue;
800           } else if (actual == possible.l64) {
801             badstart = i;
802             badend = i;
803             state = kBad;
804             continue;
805           } else {
806             state = kNoMatch;
807             break;
808           }
809         } else if (state == kBad) {
810           if (actual == possible.l64) {
811             badend = i;
812             continue;
813           } else if (actual == expected.l64) {
814             state = kGoodAgain;
815             continue;
816           } else {
817             state = kNoMatch;
818             break;
819           }
820         } else if (state == kGoodAgain) {
821           if (actual == expected.l64) {
822             continue;
823           } else {
824             state = kNoMatch;
825             break;
826           }
827         }
828       }
829 
830       if ((state == kGoodAgain) || (state == kBad)) {
831         unsigned int blockerrors = badend - badstart + 1;
832         errormessage = "Block Error";
833         // It's okay for the 1st entry to be corrected multiple times,
834         // it will simply be reported twice. Once here and once below
835         // when processing the error queue.
836         ProcessError(&recorded[0], 0, errormessage.c_str());
837         logprintf(0, "Block Error: (%p) pattern %s instead of %s, "
838                   "%d bytes from offset 0x%x to 0x%x\n",
839                   &memblock[badstart],
840                   altpattern->name(), pattern->name(),
841                   blockerrors * wordsize_,
842                   offset + badstart * wordsize_,
843                   offset + badend * wordsize_);
844       }
845     }
846   }
847 
848 
849   // Process error queue after all errors have been recorded.
850   for (int err = 0; err < errors; err++) {
851     int priority = 5;
852     if (errorcount_ + err < 30)
853       priority = 0;  // Bump up the priority for the first few errors.
854     ProcessError(&recorded[err], priority, errormessage.c_str());
855   }
856 
857   if (page_error) {
858     // For each word in the data region.
859     for (int i = 0; i < length / wordsize_; i++) {
860       uint64 actual = memblock[i];
861       uint64 expected;
862       datacast_t data;
863       // Determine the value that should be there.
864       int index = 2 * i + pattern_offset;
865 
866       data.l32.l = pattern->pattern(index);
867       data.l32.h = pattern->pattern(index + 1);
868       expected = data.l64;
869 
870       // Check tags if necessary.
871       if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
872         expected = addr_to_tag(&memblock[i]);
873       }
874 
875       // If the value is incorrect, save an error record for later printing.
876       if (actual != expected) {
877         // If we have overflowed the error queue, print the errors now.
878         struct ErrorRecord er;
879         er.actual = actual;
880         er.expected = expected;
881         er.vaddr = &memblock[i];
882 
883         // Do the error printout. This will take a long time and
884         // likely change the machine state.
885         ProcessError(&er, 12, errormessage.c_str());
886         overflowerrors++;
887       }
888     }
889   }
890 
891   // Keep track of observed errors.
892   errorcount_ += errors + overflowerrors;
893   return errors + overflowerrors;
894 }
895 
GetCopiedData()896 float WorkerThread::GetCopiedData() {
897   return pages_copied_ * sat_->page_length() / kMegabyte;
898 }
899 
900 // Calculate the CRC of a region.
901 // Result check if the CRC mismatches.
CrcCheckPage(struct page_entry * srcpe)902 int WorkerThread::CrcCheckPage(struct page_entry *srcpe) {
903   const int blocksize = 4096;
904   const int blockwords = blocksize / wordsize_;
905   int errors = 0;
906 
907   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
908   uint64 *memblock = static_cast<uint64*>(srcpe->addr);
909   int blocks = sat_->page_length() / blocksize;
910   for (int currentblock = 0; currentblock < blocks; currentblock++) {
911     uint64 *memslice = memblock + currentblock * blockwords;
912 
913     AdlerChecksum crc;
914     if (tag_mode_) {
915       AdlerAddrCrcC(memslice, blocksize, &crc, srcpe);
916     } else {
917       CalculateAdlerChecksum(memslice, blocksize, &crc);
918     }
919 
920     // If the CRC does not match, we'd better look closer.
921     if (!crc.Equals(*expectedcrc)) {
922       logprintf(11, "Log: CrcCheckPage Falling through to slow compare, "
923                 "CRC mismatch %s != %s\n",
924                 crc.ToHexString().c_str(),
925                 expectedcrc->ToHexString().c_str());
926       int errorcount = CheckRegion(memslice,
927                                    srcpe->pattern,
928                                    srcpe->lastcpu,
929                                    blocksize,
930                                    currentblock * blocksize, 0);
931       if (errorcount == 0) {
932         logprintf(0, "Log: CrcCheckPage CRC mismatch %s != %s, "
933                      "but no miscompares found.\n",
934                   crc.ToHexString().c_str(),
935                   expectedcrc->ToHexString().c_str());
936       }
937       errors += errorcount;
938     }
939   }
940 
941   // For odd length transfers, we should never hit this.
942   int leftovers = sat_->page_length() % blocksize;
943   if (leftovers) {
944     uint64 *memslice = memblock + blocks * blockwords;
945     errors += CheckRegion(memslice,
946                           srcpe->pattern,
947                           srcpe->lastcpu,
948                           leftovers,
949                           blocks * blocksize, 0);
950   }
951   return errors;
952 }
953 
954 
955 // Print error information about a data miscompare.
ProcessTagError(struct ErrorRecord * error,int priority,const char * message)956 void WorkerThread::ProcessTagError(struct ErrorRecord *error,
957                                    int priority,
958                                    const char *message) {
959   char dimm_string[256] = "";
960   char tag_dimm_string[256] = "";
961   bool read_error = false;
962 
963   int core_id = sched_getcpu();
964 
965   // Determine if this is a write or read error.
966   os_->Flush(error->vaddr);
967   error->reread = *(error->vaddr);
968 
969   // Distinguish read and write errors.
970   if (error->actual != error->reread) {
971     read_error = true;
972   }
973 
974   sat_assert(error->expected != error->actual);
975 
976   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr);
977 
978   // Find physical address if possible.
979   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
980   error->tagpaddr = os_->VirtualToPhysical(error->tagvaddr);
981 
982   // Pretty print DIMM mapping if available.
983   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
984   // Pretty print DIMM mapping if available.
985   os_->FindDimm(error->tagpaddr, tag_dimm_string, sizeof(tag_dimm_string));
986 
987   // Report parseable error.
988   if (priority < 5) {
989     logprintf(priority,
990               "%s: Tag from %p(0x%llx:%s) (%s) "
991               "miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
992               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
993               message,
994               error->tagvaddr, error->tagpaddr,
995               tag_dimm_string,
996               read_error ? "read error" : "write error",
997               core_id,
998               CurrentCpusFormat().c_str(),
999               error->vaddr,
1000               error->paddr,
1001               dimm_string,
1002               error->actual,
1003               error->reread,
1004               error->expected);
1005   }
1006 
1007   errorcount_ += 1;
1008 
1009   // Overwrite incorrect data with correct data to prevent
1010   // future miscompares when this data is reused.
1011   *(error->vaddr) = error->expected;
1012   os_->Flush(error->vaddr);
1013 }
1014 
1015 
1016 // Print out and log a tag error.
ReportTagError(uint64 * mem64,uint64 actual,uint64 tag)1017 bool WorkerThread::ReportTagError(
1018     uint64 *mem64,
1019     uint64 actual,
1020     uint64 tag) {
1021   struct ErrorRecord er;
1022   er.actual = actual;
1023 
1024   er.expected = tag;
1025   er.vaddr = mem64;
1026 
1027   // Generate vaddr from tag.
1028   er.tagvaddr = reinterpret_cast<uint64*>(actual);
1029 
1030   ProcessTagError(&er, 0, "Hardware Error");
1031   return true;
1032 }
1033 
1034 // C implementation of Adler memory copy, with memory tagging.
AdlerAddrMemcpyC(uint64 * dstmem64,uint64 * srcmem64,unsigned int size_in_bytes,AdlerChecksum * checksum,struct page_entry * pe)1035 bool WorkerThread::AdlerAddrMemcpyC(uint64 *dstmem64,
1036                                     uint64 *srcmem64,
1037                                     unsigned int size_in_bytes,
1038                                     AdlerChecksum *checksum,
1039                                     struct page_entry *pe) {
1040   // Use this data wrapper to access memory with 64bit read/write.
1041   datacast_t data;
1042   datacast_t dstdata;
1043   unsigned int count = size_in_bytes / sizeof(data);
1044 
1045   if (count > ((1U) << 19)) {
1046     // Size is too large, must be strictly less than 512 KB.
1047     return false;
1048   }
1049 
1050   uint64 a1 = 1;
1051   uint64 a2 = 1;
1052   uint64 b1 = 0;
1053   uint64 b2 = 0;
1054 
1055   class Pattern *pattern = pe->pattern;
1056 
1057   unsigned int i = 0;
1058   while (i < count) {
1059     // Process 64 bits at a time.
1060     if ((i & 0x7) == 0) {
1061       data.l64 = srcmem64[i];
1062       dstdata.l64 = dstmem64[i];
1063       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1064       uint64 dst_tag = addr_to_tag(&dstmem64[i]);
1065       // Detect if tags have been corrupted.
1066       if (data.l64 != src_tag)
1067         ReportTagError(&srcmem64[i], data.l64, src_tag);
1068       if (dstdata.l64 != dst_tag)
1069         ReportTagError(&dstmem64[i], dstdata.l64, dst_tag);
1070 
1071       data.l32.l = pattern->pattern(i << 1);
1072       data.l32.h = pattern->pattern((i << 1) + 1);
1073       a1 = a1 + data.l32.l;
1074       b1 = b1 + a1;
1075       a1 = a1 + data.l32.h;
1076       b1 = b1 + a1;
1077 
1078       data.l64  = dst_tag;
1079       dstmem64[i] = data.l64;
1080 
1081     } else {
1082       data.l64 = srcmem64[i];
1083       a1 = a1 + data.l32.l;
1084       b1 = b1 + a1;
1085       a1 = a1 + data.l32.h;
1086       b1 = b1 + a1;
1087       dstmem64[i] = data.l64;
1088     }
1089     i++;
1090 
1091     data.l64 = srcmem64[i];
1092     a2 = a2 + data.l32.l;
1093     b2 = b2 + a2;
1094     a2 = a2 + data.l32.h;
1095     b2 = b2 + a2;
1096     dstmem64[i] = data.l64;
1097     i++;
1098   }
1099   checksum->Set(a1, a2, b1, b2);
1100   return true;
1101 }
1102 
1103 // x86_64 SSE2 assembly implementation of Adler memory copy, with address
1104 // tagging added as a second step. This is useful for debugging failures
1105 // that only occur when SSE / nontemporal writes are used.
AdlerAddrMemcpyWarm(uint64 * dstmem64,uint64 * srcmem64,unsigned int size_in_bytes,AdlerChecksum * checksum,struct page_entry * pe)1106 bool WorkerThread::AdlerAddrMemcpyWarm(uint64 *dstmem64,
1107                                        uint64 *srcmem64,
1108                                        unsigned int size_in_bytes,
1109                                        AdlerChecksum *checksum,
1110                                        struct page_entry *pe) {
1111   // Do ASM copy, ignore checksum.
1112   AdlerChecksum ignored_checksum;
1113   os_->AdlerMemcpyWarm(dstmem64, srcmem64, size_in_bytes, &ignored_checksum);
1114 
1115   // Force cache flush of both the source and destination addresses.
1116   //  length - length of block to flush in cachelines.
1117   //  mem_increment - number of dstmem/srcmem values per cacheline.
1118   int length = size_in_bytes / kCacheLineSize;
1119   int mem_increment = kCacheLineSize / sizeof(*dstmem64);
1120   OsLayer::FastFlushSync();
1121   for (int i = 0; i < length; ++i) {
1122     OsLayer::FastFlushHint(dstmem64 + (i * mem_increment));
1123     OsLayer::FastFlushHint(srcmem64 + (i * mem_increment));
1124   }
1125   OsLayer::FastFlushSync();
1126 
1127   // Check results.
1128   AdlerAddrCrcC(srcmem64, size_in_bytes, checksum, pe);
1129   // Patch up address tags.
1130   TagAddrC(dstmem64, size_in_bytes);
1131   return true;
1132 }
1133 
1134 // Retag pages..
TagAddrC(uint64 * memwords,unsigned int size_in_bytes)1135 bool WorkerThread::TagAddrC(uint64 *memwords,
1136                             unsigned int size_in_bytes) {
1137   // Mask is the bitmask of indexes used by the pattern.
1138   // It is the pattern size -1. Size is always a power of 2.
1139 
1140   // Select tag or data as appropriate.
1141   int length = size_in_bytes / wordsize_;
1142   for (int i = 0; i < length; i += 8) {
1143     datacast_t data;
1144     data.l64 = addr_to_tag(&memwords[i]);
1145     memwords[i] = data.l64;
1146   }
1147   return true;
1148 }
1149 
1150 // C implementation of Adler memory crc.
AdlerAddrCrcC(uint64 * srcmem64,unsigned int size_in_bytes,AdlerChecksum * checksum,struct page_entry * pe)1151 bool WorkerThread::AdlerAddrCrcC(uint64 *srcmem64,
1152                                  unsigned int size_in_bytes,
1153                                  AdlerChecksum *checksum,
1154                                  struct page_entry *pe) {
1155   // Use this data wrapper to access memory with 64bit read/write.
1156   datacast_t data;
1157   unsigned int count = size_in_bytes / sizeof(data);
1158 
1159   if (count > ((1U) << 19)) {
1160     // Size is too large, must be strictly less than 512 KB.
1161     return false;
1162   }
1163 
1164   uint64 a1 = 1;
1165   uint64 a2 = 1;
1166   uint64 b1 = 0;
1167   uint64 b2 = 0;
1168 
1169   class Pattern *pattern = pe->pattern;
1170 
1171   unsigned int i = 0;
1172   while (i < count) {
1173     // Process 64 bits at a time.
1174     if ((i & 0x7) == 0) {
1175       data.l64 = srcmem64[i];
1176       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1177       // Check that tags match expected.
1178       if (data.l64 != src_tag)
1179         ReportTagError(&srcmem64[i], data.l64, src_tag);
1180 
1181       data.l32.l = pattern->pattern(i << 1);
1182       data.l32.h = pattern->pattern((i << 1) + 1);
1183       a1 = a1 + data.l32.l;
1184       b1 = b1 + a1;
1185       a1 = a1 + data.l32.h;
1186       b1 = b1 + a1;
1187     } else {
1188       data.l64 = srcmem64[i];
1189       a1 = a1 + data.l32.l;
1190       b1 = b1 + a1;
1191       a1 = a1 + data.l32.h;
1192       b1 = b1 + a1;
1193     }
1194     i++;
1195 
1196     data.l64 = srcmem64[i];
1197     a2 = a2 + data.l32.l;
1198     b2 = b2 + a2;
1199     a2 = a2 + data.l32.h;
1200     b2 = b2 + a2;
1201     i++;
1202   }
1203   checksum->Set(a1, a2, b1, b2);
1204   return true;
1205 }
1206 
1207 // Copy a block of memory quickly, while keeping a CRC of the data.
1208 // Result check if the CRC mismatches.
CrcCopyPage(struct page_entry * dstpe,struct page_entry * srcpe)1209 int WorkerThread::CrcCopyPage(struct page_entry *dstpe,
1210                               struct page_entry *srcpe) {
1211   int errors = 0;
1212   const int blocksize = 4096;
1213   const int blockwords = blocksize / wordsize_;
1214   int blocks = sat_->page_length() / blocksize;
1215 
1216   // Base addresses for memory copy
1217   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1218   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1219   // Remember the expected CRC
1220   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1221 
1222   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1223     uint64 *targetmem = targetmembase + currentblock * blockwords;
1224     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1225 
1226     AdlerChecksum crc;
1227     if (tag_mode_) {
1228       AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1229     } else {
1230       AdlerMemcpyC(targetmem, sourcemem, blocksize, &crc);
1231     }
1232 
1233     // Investigate miscompares.
1234     if (!crc.Equals(*expectedcrc)) {
1235       logprintf(11, "Log: CrcCopyPage Falling through to slow compare, "
1236                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1237                 expectedcrc->ToHexString().c_str());
1238       int errorcount = CheckRegion(sourcemem,
1239                                    srcpe->pattern,
1240                                    srcpe->lastcpu,
1241                                    blocksize,
1242                                    currentblock * blocksize, 0);
1243       if (errorcount == 0) {
1244         logprintf(0, "Log: CrcCopyPage CRC mismatch %s != %s, "
1245                      "but no miscompares found. Retrying with fresh data.\n",
1246                   crc.ToHexString().c_str(),
1247                   expectedcrc->ToHexString().c_str());
1248         if (!tag_mode_) {
1249           // Copy the data originally read from this region back again.
1250           // This data should have any corruption read originally while
1251           // calculating the CRC.
1252           memcpy(sourcemem, targetmem, blocksize);
1253           errorcount = CheckRegion(sourcemem,
1254                                    srcpe->pattern,
1255                                    srcpe->lastcpu,
1256                                    blocksize,
1257                                    currentblock * blocksize, 0);
1258           if (errorcount == 0) {
1259             int core_id = sched_getcpu();
1260             logprintf(0, "Process Error: CPU %d(0x%s) CrcCopyPage "
1261                          "CRC mismatch %s != %s, "
1262                          "but no miscompares found on second pass.\n",
1263                       core_id, CurrentCpusFormat().c_str(),
1264                       crc.ToHexString().c_str(),
1265                       expectedcrc->ToHexString().c_str());
1266             struct ErrorRecord er;
1267             er.actual = sourcemem[0];
1268             er.expected = 0xbad00000ull << 32;
1269             er.vaddr = sourcemem;
1270             er.lastcpu = srcpe->lastcpu;
1271             logprintf(0, "Process Error: lastCPU %d\n", srcpe->lastcpu);
1272             er.patternname = srcpe->pattern->name();
1273             ProcessError(&er, 0, "Hardware Error");
1274             errors += 1;
1275             errorcount_ ++;
1276           }
1277         }
1278       }
1279       errors += errorcount;
1280     }
1281   }
1282 
1283   // For odd length transfers, we should never hit this.
1284   int leftovers = sat_->page_length() % blocksize;
1285   if (leftovers) {
1286     uint64 *targetmem = targetmembase + blocks * blockwords;
1287     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1288 
1289     errors += CheckRegion(sourcemem,
1290                           srcpe->pattern,
1291                           srcpe->lastcpu,
1292                           leftovers,
1293                           blocks * blocksize, 0);
1294     int leftoverwords = leftovers / wordsize_;
1295     for (int i = 0; i < leftoverwords; i++) {
1296       targetmem[i] = sourcemem[i];
1297     }
1298   }
1299 
1300   // Update pattern reference to reflect new contents.
1301   dstpe->pattern = srcpe->pattern;
1302   dstpe->lastcpu = sched_getcpu();
1303 
1304   // Clean clean clean the errors away.
1305   if (errors) {
1306     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1307     // cause bad data to be propogated across the page.
1308     FillPage(dstpe);
1309   }
1310   return errors;
1311 }
1312 
1313 
1314 
1315 // Invert a block of memory quickly, traversing downwards.
InvertPageDown(struct page_entry * srcpe)1316 int InvertThread::InvertPageDown(struct page_entry *srcpe) {
1317   const int blocksize = 4096;
1318   const int blockwords = blocksize / wordsize_;
1319   int blocks = sat_->page_length() / blocksize;
1320 
1321   // Base addresses for memory copy
1322   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1323 
1324   for (int currentblock = blocks-1; currentblock >= 0; currentblock--) {
1325     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1326     for (int i = blockwords - 32; i >= 0; i -= 32) {
1327       for (int index = i + 31; index >= i; --index) {
1328         unsigned int actual = sourcemem[index];
1329         sourcemem[index] = ~actual;
1330       }
1331       OsLayer::FastFlush(&sourcemem[i]);
1332     }
1333   }
1334 
1335   srcpe->lastcpu = sched_getcpu();
1336   return 0;
1337 }
1338 
1339 // Invert a block of memory, traversing upwards.
InvertPageUp(struct page_entry * srcpe)1340 int InvertThread::InvertPageUp(struct page_entry *srcpe) {
1341   const int blocksize = 4096;
1342   const int blockwords = blocksize / wordsize_;
1343   int blocks = sat_->page_length() / blocksize;
1344 
1345   // Base addresses for memory copy
1346   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1347 
1348   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1349     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1350     for (int i = 0; i < blockwords; i += 32) {
1351       for (int index = i; index <= i + 31; ++index) {
1352         unsigned int actual = sourcemem[index];
1353         sourcemem[index] = ~actual;
1354       }
1355       OsLayer::FastFlush(&sourcemem[i]);
1356     }
1357   }
1358 
1359   srcpe->lastcpu = sched_getcpu();
1360   return 0;
1361 }
1362 
1363 // Copy a block of memory quickly, while keeping a CRC of the data.
1364 // Result check if the CRC mismatches. Warm the CPU while running
CrcWarmCopyPage(struct page_entry * dstpe,struct page_entry * srcpe)1365 int WorkerThread::CrcWarmCopyPage(struct page_entry *dstpe,
1366                                   struct page_entry *srcpe) {
1367   int errors = 0;
1368   const int blocksize = 4096;
1369   const int blockwords = blocksize / wordsize_;
1370   int blocks = sat_->page_length() / blocksize;
1371 
1372   // Base addresses for memory copy
1373   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1374   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1375   // Remember the expected CRC
1376   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1377 
1378   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1379     uint64 *targetmem = targetmembase + currentblock * blockwords;
1380     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1381 
1382     AdlerChecksum crc;
1383     if (tag_mode_) {
1384       AdlerAddrMemcpyWarm(targetmem, sourcemem, blocksize, &crc, srcpe);
1385     } else {
1386       os_->AdlerMemcpyWarm(targetmem, sourcemem, blocksize, &crc);
1387     }
1388 
1389     // Investigate miscompares.
1390     if (!crc.Equals(*expectedcrc)) {
1391       logprintf(11, "Log: CrcWarmCopyPage Falling through to slow compare, "
1392                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1393                 expectedcrc->ToHexString().c_str());
1394       int errorcount = CheckRegion(sourcemem,
1395                                    srcpe->pattern,
1396                                    srcpe->lastcpu,
1397                                    blocksize,
1398                                    currentblock * blocksize, 0);
1399       if (errorcount == 0) {
1400         logprintf(0, "Log: CrcWarmCopyPage CRC mismatch expected: %s != actual: %s, "
1401                      "but no miscompares found. Retrying with fresh data.\n",
1402                   expectedcrc->ToHexString().c_str(),
1403                   crc.ToHexString().c_str() );
1404         if (!tag_mode_) {
1405           // Copy the data originally read from this region back again.
1406           // This data should have any corruption read originally while
1407           // calculating the CRC.
1408           memcpy(sourcemem, targetmem, blocksize);
1409           errorcount = CheckRegion(sourcemem,
1410                                    srcpe->pattern,
1411                                    srcpe->lastcpu,
1412                                    blocksize,
1413                                    currentblock * blocksize, 0);
1414           if (errorcount == 0) {
1415             int core_id = sched_getcpu();
1416             logprintf(0, "Process Error: CPU %d(0x%s) CrciWarmCopyPage "
1417                          "CRC mismatch %s != %s, "
1418                          "but no miscompares found on second pass.\n",
1419                       core_id, CurrentCpusFormat().c_str(),
1420                       crc.ToHexString().c_str(),
1421                       expectedcrc->ToHexString().c_str());
1422             struct ErrorRecord er;
1423             er.actual = sourcemem[0];
1424             er.expected = 0xbad;
1425             er.vaddr = sourcemem;
1426             er.lastcpu = srcpe->lastcpu;
1427             er.patternname = srcpe->pattern->name();
1428             ProcessError(&er, 0, "Hardware Error");
1429             errors ++;
1430             errorcount_ ++;
1431           }
1432         }
1433       }
1434       errors += errorcount;
1435     }
1436   }
1437 
1438   // For odd length transfers, we should never hit this.
1439   int leftovers = sat_->page_length() % blocksize;
1440   if (leftovers) {
1441     uint64 *targetmem = targetmembase + blocks * blockwords;
1442     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1443 
1444     errors += CheckRegion(sourcemem,
1445                           srcpe->pattern,
1446                           srcpe->lastcpu,
1447                           leftovers,
1448                           blocks * blocksize, 0);
1449     int leftoverwords = leftovers / wordsize_;
1450     for (int i = 0; i < leftoverwords; i++) {
1451       targetmem[i] = sourcemem[i];
1452     }
1453   }
1454 
1455   // Update pattern reference to reflect new contents.
1456   dstpe->pattern = srcpe->pattern;
1457   dstpe->lastcpu = sched_getcpu();
1458 
1459 
1460   // Clean clean clean the errors away.
1461   if (errors) {
1462     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1463     // cause bad data to be propogated across the page.
1464     FillPage(dstpe);
1465   }
1466   return errors;
1467 }
1468 
1469 
1470 
1471 // Memory check work loop. Execute until done, then exhaust pages.
Work()1472 bool CheckThread::Work() {
1473   struct page_entry pe;
1474   bool result = true;
1475   int64 loops = 0;
1476 
1477   logprintf(9, "Log: Starting Check thread %d\n", thread_num_);
1478 
1479   // We want to check all the pages, and
1480   // stop when there aren't any left.
1481   while (true) {
1482     result = result && sat_->GetValid(&pe);
1483     if (!result) {
1484       if (IsReadyToRunNoPause())
1485         logprintf(0, "Process Error: check_thread failed to pop pages, "
1486                   "bailing\n");
1487       else
1488         result = true;
1489       break;
1490     }
1491 
1492     // Do the result check.
1493     CrcCheckPage(&pe);
1494 
1495     // Push pages back on the valid queue if we are still going,
1496     // throw them out otherwise.
1497     if (IsReadyToRunNoPause())
1498       result = result && sat_->PutValid(&pe);
1499     else
1500       result = result && sat_->PutEmpty(&pe);
1501     if (!result) {
1502       logprintf(0, "Process Error: check_thread failed to push pages, "
1503                 "bailing\n");
1504       break;
1505     }
1506     loops++;
1507   }
1508 
1509   pages_copied_ = loops;
1510   status_ = result;
1511   logprintf(9, "Log: Completed %d: Check thread. Status %d, %d pages checked\n",
1512             thread_num_, status_, pages_copied_);
1513   return result;
1514 }
1515 
1516 
1517 // Memory copy work loop. Execute until marked done.
Work()1518 bool CopyThread::Work() {
1519   struct page_entry src;
1520   struct page_entry dst;
1521   bool result = true;
1522   int64 loops = 0;
1523 
1524   logprintf(9, "Log: Starting copy thread %d: cpu %s, mem %x\n",
1525             thread_num_, cpuset_format(&cpu_mask_).c_str(), tag_);
1526 
1527   while (IsReadyToRun()) {
1528     // Pop the needed pages.
1529     result = result && sat_->GetValid(&src, tag_);
1530     result = result && sat_->GetEmpty(&dst, tag_);
1531     if (!result) {
1532       logprintf(0, "Process Error: copy_thread failed to pop pages, "
1533                 "bailing\n");
1534       break;
1535     }
1536 
1537     // Force errors for unittests.
1538     if (sat_->error_injection()) {
1539       if ((random() % 50000) == 8) {
1540         char *addr = reinterpret_cast<char*>(src.addr);
1541         int offset = random() % sat_->page_length();
1542         addr[offset] = 0xba;
1543       }
1544     }
1545 
1546     // We can use memcpy, or CRC check while we copy.
1547     if (sat_->warm()) {
1548       CrcWarmCopyPage(&dst, &src);
1549     } else if (sat_->strict()) {
1550       CrcCopyPage(&dst, &src);
1551     } else {
1552       memcpy(dst.addr, src.addr, sat_->page_length());
1553       dst.pattern = src.pattern;
1554       dst.lastcpu = sched_getcpu();
1555     }
1556 
1557     result = result && sat_->PutValid(&dst);
1558     result = result && sat_->PutEmpty(&src);
1559 
1560     // Copy worker-threads yield themselves at the end of each copy loop,
1561     // to avoid threads from preempting each other in the middle of the inner
1562     // copy-loop. Cooperations between Copy worker-threads results in less
1563     // unnecessary cache thrashing (which happens when context-switching in the
1564     // middle of the inner copy-loop).
1565     YieldSelf();
1566 
1567     if (!result) {
1568       logprintf(0, "Process Error: copy_thread failed to push pages, "
1569                 "bailing\n");
1570       break;
1571     }
1572     loops++;
1573   }
1574 
1575   pages_copied_ = loops;
1576   status_ = result;
1577   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1578             thread_num_, status_, pages_copied_);
1579   return result;
1580 }
1581 
1582 // Memory invert work loop. Execute until marked done.
Work()1583 bool InvertThread::Work() {
1584   struct page_entry src;
1585   bool result = true;
1586   int64 loops = 0;
1587 
1588   logprintf(9, "Log: Starting invert thread %d\n", thread_num_);
1589 
1590   while (IsReadyToRun()) {
1591     // Pop the needed pages.
1592     result = result && sat_->GetValid(&src);
1593     if (!result) {
1594       logprintf(0, "Process Error: invert_thread failed to pop pages, "
1595                 "bailing\n");
1596       break;
1597     }
1598 
1599     if (sat_->strict())
1600       CrcCheckPage(&src);
1601 
1602     // For the same reason CopyThread yields itself (see YieldSelf comment
1603     // in CopyThread::Work(), InvertThread yields itself after each invert
1604     // operation to improve cooperation between different worker threads
1605     // stressing the memory/cache.
1606     InvertPageUp(&src);
1607     YieldSelf();
1608     InvertPageDown(&src);
1609     YieldSelf();
1610     InvertPageDown(&src);
1611     YieldSelf();
1612     InvertPageUp(&src);
1613     YieldSelf();
1614 
1615     if (sat_->strict())
1616       CrcCheckPage(&src);
1617 
1618     result = result && sat_->PutValid(&src);
1619     if (!result) {
1620       logprintf(0, "Process Error: invert_thread failed to push pages, "
1621                 "bailing\n");
1622       break;
1623     }
1624     loops++;
1625   }
1626 
1627   pages_copied_ = loops * 2;
1628   status_ = result;
1629   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1630             thread_num_, status_, pages_copied_);
1631   return result;
1632 }
1633 
1634 
1635 // Set file name to use for File IO.
SetFile(const char * filename_init)1636 void FileThread::SetFile(const char *filename_init) {
1637   filename_ = filename_init;
1638   devicename_ = os_->FindFileDevice(filename_);
1639 }
1640 
1641 // Open the file for access.
OpenFile(int * pfile)1642 bool FileThread::OpenFile(int *pfile) {
1643   int flags = O_RDWR | O_CREAT | O_SYNC;
1644   int fd = open(filename_.c_str(), flags | O_DIRECT, 0644);
1645   if (O_DIRECT != 0 && fd < 0 && errno == EINVAL) {
1646     fd = open(filename_.c_str(), flags, 0644);  // Try without O_DIRECT
1647     os_->ActivateFlushPageCache();  // Not using O_DIRECT fixed EINVAL
1648   }
1649   if (fd < 0) {
1650     logprintf(0, "Process Error: Failed to create file %s!!\n",
1651               filename_.c_str());
1652     pages_copied_ = 0;
1653     return false;
1654   }
1655   *pfile = fd;
1656   return true;
1657 }
1658 
1659 // Close the file.
CloseFile(int fd)1660 bool FileThread::CloseFile(int fd) {
1661   close(fd);
1662   return true;
1663 }
1664 
1665 // Check sector tagging.
SectorTagPage(struct page_entry * src,int block)1666 bool FileThread::SectorTagPage(struct page_entry *src, int block) {
1667   int page_length = sat_->page_length();
1668   struct FileThread::SectorTag *tag =
1669     (struct FileThread::SectorTag *)(src->addr);
1670 
1671   // Tag each sector.
1672   unsigned char magic = ((0xba + thread_num_) & 0xff);
1673   for (int sec = 0; sec < page_length / 512; sec++) {
1674     tag[sec].magic = magic;
1675     tag[sec].block = block & 0xff;
1676     tag[sec].sector = sec & 0xff;
1677     tag[sec].pass = pass_ & 0xff;
1678   }
1679   return true;
1680 }
1681 
WritePageToFile(int fd,struct page_entry * src)1682 bool FileThread::WritePageToFile(int fd, struct page_entry *src) {
1683   int page_length = sat_->page_length();
1684   // Fill the file with our data.
1685   int64 size = write(fd, src->addr, page_length);
1686 
1687   if (size != page_length) {
1688     os_->ErrorReport(devicename_.c_str(), "write-error", 1);
1689     errorcount_++;
1690     logprintf(0, "Block Error: file_thread failed to write, "
1691               "bailing\n");
1692     return false;
1693   }
1694   return true;
1695 }
1696 
1697 // Write the data to the file.
WritePages(int fd)1698 bool FileThread::WritePages(int fd) {
1699   int strict = sat_->strict();
1700 
1701   // Start fresh at beginning of file for each batch of pages.
1702   lseek64(fd, 0, SEEK_SET);
1703   for (int i = 0; i < sat_->disk_pages(); i++) {
1704     struct page_entry src;
1705     if (!GetValidPage(&src))
1706       return false;
1707     // Save expected pattern.
1708     page_recs_[i].pattern = src.pattern;
1709     page_recs_[i].src = src.addr;
1710 
1711     // Check data correctness.
1712     if (strict)
1713       CrcCheckPage(&src);
1714 
1715     SectorTagPage(&src, i);
1716 
1717     bool result = WritePageToFile(fd, &src);
1718 
1719     if (!PutEmptyPage(&src))
1720       return false;
1721 
1722     if (!result)
1723       return false;
1724   }
1725   return os_->FlushPageCache();  // If O_DIRECT worked, this will be a NOP.
1726 }
1727 
1728 // Copy data from file into memory block.
ReadPageFromFile(int fd,struct page_entry * dst)1729 bool FileThread::ReadPageFromFile(int fd, struct page_entry *dst) {
1730   int page_length = sat_->page_length();
1731 
1732   // Do the actual read.
1733   int64 size = read(fd, dst->addr, page_length);
1734   if (size != page_length) {
1735     os_->ErrorReport(devicename_.c_str(), "read-error", 1);
1736     logprintf(0, "Block Error: file_thread failed to read, "
1737               "bailing\n");
1738     errorcount_++;
1739     return false;
1740   }
1741   return true;
1742 }
1743 
1744 // Check sector tagging.
SectorValidatePage(const struct PageRec & page,struct page_entry * dst,int block)1745 bool FileThread::SectorValidatePage(const struct PageRec &page,
1746                                     struct page_entry *dst, int block) {
1747   // Error injection.
1748   static int calls = 0;
1749   calls++;
1750 
1751   // Do sector tag compare.
1752   int firstsector = -1;
1753   int lastsector = -1;
1754   bool badsector = false;
1755   int page_length = sat_->page_length();
1756 
1757   // Cast data block into an array of tagged sectors.
1758   struct FileThread::SectorTag *tag =
1759   (struct FileThread::SectorTag *)(dst->addr);
1760 
1761   sat_assert(sizeof(*tag) == 512);
1762 
1763   // Error injection.
1764   if (sat_->error_injection()) {
1765     if (calls == 2) {
1766       for (int badsec = 8; badsec < 17; badsec++)
1767         tag[badsec].pass = 27;
1768     }
1769     if (calls == 18) {
1770       (static_cast<int32*>(dst->addr))[27] = 0xbadda7a;
1771     }
1772   }
1773 
1774   // Check each sector for the correct tag we added earlier,
1775   // then revert the tag to the to normal data pattern.
1776   unsigned char magic = ((0xba + thread_num_) & 0xff);
1777   for (int sec = 0; sec < page_length / 512; sec++) {
1778     // Check magic tag.
1779     if ((tag[sec].magic != magic) ||
1780         (tag[sec].block != (block & 0xff)) ||
1781         (tag[sec].sector != (sec & 0xff)) ||
1782         (tag[sec].pass != (pass_ & 0xff))) {
1783       // Offset calculation for tag location.
1784       int offset = sec * sizeof(SectorTag);
1785       if (tag[sec].block != (block & 0xff))
1786         offset += 1 * sizeof(uint8);
1787       else if (tag[sec].sector != (sec & 0xff))
1788         offset += 2 * sizeof(uint8);
1789       else if (tag[sec].pass != (pass_ & 0xff))
1790         offset += 3 * sizeof(uint8);
1791 
1792       // Run sector tag error through diagnoser for logging and reporting.
1793       errorcount_ += 1;
1794       os_->error_diagnoser_->AddHDDSectorTagError(devicename_, tag[sec].block,
1795                                                   offset,
1796                                                   tag[sec].sector,
1797                                                   page.src, page.dst);
1798 
1799       errorcount_ += 1;
1800       logprintf(5, "Sector Error: Sector tag @ 0x%x, pass %d/%d. "
1801                 "sec %x/%x, block %d/%d, magic %x/%x, File: %s \n",
1802                 block * page_length + 512 * sec,
1803                 (pass_ & 0xff), (unsigned int)tag[sec].pass,
1804                 sec, (unsigned int)tag[sec].sector,
1805                 block, (unsigned int)tag[sec].block,
1806                 magic, (unsigned int)tag[sec].magic,
1807                 filename_.c_str());
1808 
1809       // Keep track of first and last bad sector.
1810       if (firstsector == -1)
1811         firstsector = (block * page_length / 512) + sec;
1812       lastsector = (block * page_length / 512) + sec;
1813       badsector = true;
1814     }
1815     // Patch tag back to proper pattern.
1816     unsigned int *addr = (unsigned int *)(&tag[sec]);
1817     *addr = dst->pattern->pattern(512 * sec / sizeof(*addr));
1818   }
1819 
1820   // If we found sector errors:
1821   if (badsector == true) {
1822     logprintf(5, "Log: file sector miscompare at offset %x-%x. File: %s\n",
1823               firstsector * 512,
1824               ((lastsector + 1) * 512) - 1,
1825               filename_.c_str());
1826 
1827     // Either exit immediately, or patch the data up and continue.
1828     if (sat_->stop_on_error()) {
1829       exit(1);
1830     } else {
1831       // Patch up bad pages.
1832       for (int block = (firstsector * 512) / page_length;
1833           block <= (lastsector * 512) / page_length;
1834           block++) {
1835         unsigned int *memblock = static_cast<unsigned int *>(dst->addr);
1836         int length = page_length / wordsize_;
1837         for (int i = 0; i < length; i++) {
1838           memblock[i] = dst->pattern->pattern(i);
1839         }
1840       }
1841     }
1842   }
1843   return true;
1844 }
1845 
1846 // Get memory for an incoming data transfer..
PagePrepare()1847 bool FileThread::PagePrepare() {
1848   // We can only do direct IO to SAT pages if it is normal mem.
1849   page_io_ = os_->normal_mem();
1850 
1851   // Init a local buffer if we need it.
1852   if (!page_io_) {
1853 #ifdef HAVE_POSIX_MEMALIGN
1854     int result = posix_memalign(&local_page_, 512, sat_->page_length());
1855 #else
1856     local_page_ = memalign(512, sat_->page_length());
1857     int result = (local_page_ == 0);
1858 #endif
1859     if (result) {
1860       logprintf(0, "Process Error: disk thread posix_memalign "
1861                    "returned %d (fail)\n",
1862                 result);
1863       status_ = false;
1864       return false;
1865     }
1866   }
1867   return true;
1868 }
1869 
1870 
1871 // Remove memory allocated for data transfer.
PageTeardown()1872 bool FileThread::PageTeardown() {
1873   // Free a local buffer if we need to.
1874   if (!page_io_) {
1875     free(local_page_);
1876   }
1877   return true;
1878 }
1879 
1880 
1881 
1882 // Get memory for an incoming data transfer..
GetEmptyPage(struct page_entry * dst)1883 bool FileThread::GetEmptyPage(struct page_entry *dst) {
1884   if (page_io_) {
1885     if (!sat_->GetEmpty(dst))
1886       return false;
1887   } else {
1888     dst->addr = local_page_;
1889     dst->offset = 0;
1890     dst->pattern = 0;
1891     dst->lastcpu = 0;
1892   }
1893   return true;
1894 }
1895 
1896 // Get memory for an outgoing data transfer..
GetValidPage(struct page_entry * src)1897 bool FileThread::GetValidPage(struct page_entry *src) {
1898   struct page_entry tmp;
1899   if (!sat_->GetValid(&tmp))
1900     return false;
1901   if (page_io_) {
1902     *src = tmp;
1903     return true;
1904   } else {
1905     src->addr = local_page_;
1906     src->offset = 0;
1907     CrcCopyPage(src, &tmp);
1908     if (!sat_->PutValid(&tmp))
1909       return false;
1910   }
1911   return true;
1912 }
1913 
1914 
1915 // Throw out a used empty page.
PutEmptyPage(struct page_entry * src)1916 bool FileThread::PutEmptyPage(struct page_entry *src) {
1917   if (page_io_) {
1918     if (!sat_->PutEmpty(src))
1919       return false;
1920   }
1921   return true;
1922 }
1923 
1924 // Throw out a used, filled page.
PutValidPage(struct page_entry * src)1925 bool FileThread::PutValidPage(struct page_entry *src) {
1926   if (page_io_) {
1927     if (!sat_->PutValid(src))
1928       return false;
1929   }
1930   return true;
1931 }
1932 
1933 // Copy data from file into memory blocks.
ReadPages(int fd)1934 bool FileThread::ReadPages(int fd) {
1935   int page_length = sat_->page_length();
1936   int strict = sat_->strict();
1937   bool result = true;
1938 
1939   // Read our data back out of the file, into it's new location.
1940   lseek64(fd, 0, SEEK_SET);
1941   for (int i = 0; i < sat_->disk_pages(); i++) {
1942     struct page_entry dst;
1943     if (!GetEmptyPage(&dst))
1944       return false;
1945     // Retrieve expected pattern.
1946     dst.pattern = page_recs_[i].pattern;
1947     dst.lastcpu = sched_getcpu();
1948     // Update page recordpage record.
1949     page_recs_[i].dst = dst.addr;
1950 
1951     // Read from the file into destination page.
1952     if (!ReadPageFromFile(fd, &dst)) {
1953         PutEmptyPage(&dst);
1954         return false;
1955     }
1956 
1957     SectorValidatePage(page_recs_[i], &dst, i);
1958 
1959     // Ensure that the transfer ended up with correct data.
1960     if (strict) {
1961       // Record page index currently CRC checked.
1962       crc_page_ = i;
1963       int errors = CrcCheckPage(&dst);
1964       if (errors) {
1965         logprintf(5, "Log: file miscompare at block %d, "
1966                   "offset %x-%x. File: %s\n",
1967                   i, i * page_length, ((i + 1) * page_length) - 1,
1968                   filename_.c_str());
1969         result = false;
1970       }
1971       crc_page_ = -1;
1972       errorcount_ += errors;
1973     }
1974     if (!PutValidPage(&dst))
1975       return false;
1976   }
1977   return result;
1978 }
1979 
1980 // File IO work loop. Execute until marked done.
Work()1981 bool FileThread::Work() {
1982   bool result = true;
1983   int64 loops = 0;
1984 
1985   logprintf(9, "Log: Starting file thread %d, file %s, device %s\n",
1986             thread_num_,
1987             filename_.c_str(),
1988             devicename_.c_str());
1989 
1990   if (!PagePrepare()) {
1991     status_ = false;
1992     return false;
1993   }
1994 
1995   // Open the data IO file.
1996   int fd = 0;
1997   if (!OpenFile(&fd)) {
1998     status_ = false;
1999     return false;
2000   }
2001 
2002   pass_ = 0;
2003 
2004   // Load patterns into page records.
2005   page_recs_ = new struct PageRec[sat_->disk_pages()];
2006   for (int i = 0; i < sat_->disk_pages(); i++) {
2007     page_recs_[i].pattern = new class Pattern();
2008   }
2009 
2010   // Loop until done.
2011   while (IsReadyToRun()) {
2012     // Do the file write.
2013     if (!(result = result && WritePages(fd)))
2014       break;
2015 
2016     // Do the file read.
2017     if (!(result = result && ReadPages(fd)))
2018       break;
2019 
2020     loops++;
2021     pass_ = loops;
2022   }
2023 
2024   pages_copied_ = loops * sat_->disk_pages();
2025 
2026   // Clean up.
2027   CloseFile(fd);
2028   PageTeardown();
2029 
2030   logprintf(9, "Log: Completed %d: file thread status %d, %d pages copied\n",
2031             thread_num_, status_, pages_copied_);
2032   // Failure to read from device indicates hardware,
2033   // rather than procedural SW error.
2034   status_ = true;
2035   return true;
2036 }
2037 
IsNetworkStopSet()2038 bool NetworkThread::IsNetworkStopSet() {
2039   return !IsReadyToRunNoPause();
2040 }
2041 
IsNetworkStopSet()2042 bool NetworkSlaveThread::IsNetworkStopSet() {
2043   // This thread has no completion status.
2044   // It finishes whever there is no more data to be
2045   // passed back.
2046   return true;
2047 }
2048 
2049 // Set ip name to use for Network IO.
SetIP(const char * ipaddr_init)2050 void NetworkThread::SetIP(const char *ipaddr_init) {
2051   strncpy(ipaddr_, ipaddr_init, 256);
2052 }
2053 
2054 // Create a socket.
2055 // Return 0 on error.
CreateSocket(int * psocket)2056 bool NetworkThread::CreateSocket(int *psocket) {
2057   int sock = socket(AF_INET, SOCK_STREAM, 0);
2058   if (sock == -1) {
2059     logprintf(0, "Process Error: Cannot open socket\n");
2060     pages_copied_ = 0;
2061     status_ = false;
2062     return false;
2063   }
2064   *psocket = sock;
2065   return true;
2066 }
2067 
2068 // Close the socket.
CloseSocket(int sock)2069 bool NetworkThread::CloseSocket(int sock) {
2070   close(sock);
2071   return true;
2072 }
2073 
2074 // Initiate the tcp connection.
Connect(int sock)2075 bool NetworkThread::Connect(int sock) {
2076   struct sockaddr_in dest_addr;
2077   dest_addr.sin_family = AF_INET;
2078   dest_addr.sin_port = htons(kNetworkPort);
2079   memset(&(dest_addr.sin_zero), '\0', sizeof(dest_addr.sin_zero));
2080 
2081   // Translate dot notation to u32.
2082   if (inet_aton(ipaddr_, &dest_addr.sin_addr) == 0) {
2083     logprintf(0, "Process Error: Cannot resolve %s\n", ipaddr_);
2084     pages_copied_ = 0;
2085     status_ = false;
2086     return false;
2087   }
2088 
2089   if (-1 == connect(sock, reinterpret_cast<struct sockaddr *>(&dest_addr),
2090                     sizeof(struct sockaddr))) {
2091     logprintf(0, "Process Error: Cannot connect %s\n", ipaddr_);
2092     pages_copied_ = 0;
2093     status_ = false;
2094     return false;
2095   }
2096   return true;
2097 }
2098 
2099 // Initiate the tcp connection.
Listen()2100 bool NetworkListenThread::Listen() {
2101   struct sockaddr_in sa;
2102 
2103   memset(&(sa.sin_zero), '\0', sizeof(sa.sin_zero));
2104 
2105   sa.sin_family = AF_INET;
2106   sa.sin_addr.s_addr = INADDR_ANY;
2107   sa.sin_port = htons(kNetworkPort);
2108 
2109   if (-1 == ::bind(sock_, (struct sockaddr*)&sa, sizeof(struct sockaddr))) {
2110     char buf[256];
2111     sat_strerror(errno, buf, sizeof(buf));
2112     logprintf(0, "Process Error: Cannot bind socket: %s\n", buf);
2113     pages_copied_ = 0;
2114     status_ = false;
2115     return false;
2116   }
2117   listen(sock_, 3);
2118   return true;
2119 }
2120 
2121 // Wait for a connection from a network traffic generation thread.
Wait()2122 bool NetworkListenThread::Wait() {
2123     fd_set rfds;
2124     struct timeval tv;
2125     int retval;
2126 
2127     // Watch sock_ to see when it has input.
2128     FD_ZERO(&rfds);
2129     FD_SET(sock_, &rfds);
2130     // Wait up to five seconds.
2131     tv.tv_sec = 5;
2132     tv.tv_usec = 0;
2133 
2134     retval = select(sock_ + 1, &rfds, NULL, NULL, &tv);
2135 
2136     return (retval > 0);
2137 }
2138 
2139 // Wait for a connection from a network traffic generation thread.
GetConnection(int * pnewsock)2140 bool NetworkListenThread::GetConnection(int *pnewsock) {
2141   struct sockaddr_in sa;
2142   socklen_t size = sizeof(struct sockaddr_in);
2143 
2144   int newsock = accept(sock_, reinterpret_cast<struct sockaddr *>(&sa), &size);
2145   if (newsock < 0)  {
2146     logprintf(0, "Process Error: Did not receive connection\n");
2147     pages_copied_ = 0;
2148     status_ = false;
2149     return false;
2150   }
2151   *pnewsock = newsock;
2152   return true;
2153 }
2154 
2155 // Send a page, return false if a page was not sent.
SendPage(int sock,struct page_entry * src)2156 bool NetworkThread::SendPage(int sock, struct page_entry *src) {
2157   int page_length = sat_->page_length();
2158   char *address = static_cast<char*>(src->addr);
2159 
2160   // Send our data over the network.
2161   int size = page_length;
2162   while (size) {
2163     int transferred = send(sock, address + (page_length - size), size, 0);
2164     if ((transferred == 0) || (transferred == -1)) {
2165       if (!IsNetworkStopSet()) {
2166         char buf[256] = "";
2167         sat_strerror(errno, buf, sizeof(buf));
2168         logprintf(0, "Process Error: Thread %d, "
2169                      "Network write failed, bailing. (%s)\n",
2170                   thread_num_, buf);
2171         status_ = false;
2172       }
2173       return false;
2174     }
2175     size = size - transferred;
2176   }
2177   return true;
2178 }
2179 
2180 // Receive a page. Return false if a page was not received.
ReceivePage(int sock,struct page_entry * dst)2181 bool NetworkThread::ReceivePage(int sock, struct page_entry *dst) {
2182   int page_length = sat_->page_length();
2183   char *address = static_cast<char*>(dst->addr);
2184 
2185   // Maybe we will get our data back again, maybe not.
2186   int size = page_length;
2187   while (size) {
2188     int transferred = recv(sock, address + (page_length - size), size, 0);
2189     if ((transferred == 0) || (transferred == -1)) {
2190       // Typically network slave thread should exit as network master
2191       // thread stops sending data.
2192       if (IsNetworkStopSet()) {
2193         int err = errno;
2194         if (transferred == 0 && err == 0) {
2195           // Two system setups will not sync exactly,
2196           // allow early exit, but log it.
2197           logprintf(0, "Log: Net thread did not receive any data, exiting.\n");
2198         } else {
2199           char buf[256] = "";
2200           sat_strerror(err, buf, sizeof(buf));
2201           // Print why we failed.
2202           logprintf(0, "Process Error: Thread %d, "
2203                        "Network read failed, bailing (%s).\n",
2204                     thread_num_, buf);
2205           status_ = false;
2206           // Print arguments and results.
2207           logprintf(0, "Log: recv(%d, address %x, size %x, 0) == %x, err %d\n",
2208                     sock, address + (page_length - size),
2209                     size, transferred, err);
2210           if ((transferred == 0) &&
2211               (page_length - size < 512) &&
2212               (page_length - size > 0)) {
2213             // Print null terminated data received, to see who's been
2214             // sending us supicious unwanted data.
2215             address[page_length - size] = 0;
2216             logprintf(0, "Log: received  %d bytes: '%s'\n",
2217                       page_length - size, address);
2218           }
2219         }
2220       }
2221       return false;
2222     }
2223     size = size - transferred;
2224   }
2225   return true;
2226 }
2227 
2228 // Network IO work loop. Execute until marked done.
2229 // Return true if the thread ran as expected.
Work()2230 bool NetworkThread::Work() {
2231   logprintf(9, "Log: Starting network thread %d, ip %s\n",
2232             thread_num_,
2233             ipaddr_);
2234 
2235   // Make a socket.
2236   int sock = 0;
2237   if (!CreateSocket(&sock))
2238     return false;
2239 
2240   // Network IO loop requires network slave thread to have already initialized.
2241   // We will sleep here for awhile to ensure that the slave thread will be
2242   // listening by the time we connect.
2243   // Sleep for 15 seconds.
2244   sat_sleep(15);
2245   logprintf(9, "Log: Starting execution of network thread %d, ip %s\n",
2246             thread_num_,
2247             ipaddr_);
2248 
2249 
2250   // Connect to a slave thread.
2251   if (!Connect(sock))
2252     return false;
2253 
2254   // Loop until done.
2255   bool result = true;
2256   int strict = sat_->strict();
2257   int64 loops = 0;
2258   while (IsReadyToRun()) {
2259     struct page_entry src;
2260     struct page_entry dst;
2261     result = result && sat_->GetValid(&src);
2262     result = result && sat_->GetEmpty(&dst);
2263     if (!result) {
2264       logprintf(0, "Process Error: net_thread failed to pop pages, "
2265                 "bailing\n");
2266       break;
2267     }
2268 
2269     // Check data correctness.
2270     if (strict)
2271       CrcCheckPage(&src);
2272 
2273     // Do the network write.
2274     if (!(result = result && SendPage(sock, &src)))
2275       break;
2276 
2277     // Update pattern reference to reflect new contents.
2278     dst.pattern = src.pattern;
2279     dst.lastcpu = sched_getcpu();
2280 
2281     // Do the network read.
2282     if (!(result = result && ReceivePage(sock, &dst)))
2283       break;
2284 
2285     // Ensure that the transfer ended up with correct data.
2286     if (strict)
2287       CrcCheckPage(&dst);
2288 
2289     // Return all of our pages to the queue.
2290     result = result && sat_->PutValid(&dst);
2291     result = result && sat_->PutEmpty(&src);
2292     if (!result) {
2293       logprintf(0, "Process Error: net_thread failed to push pages, "
2294                 "bailing\n");
2295       break;
2296     }
2297     loops++;
2298   }
2299 
2300   pages_copied_ = loops;
2301   status_ = result;
2302 
2303   // Clean up.
2304   CloseSocket(sock);
2305 
2306   logprintf(9, "Log: Completed %d: network thread status %d, "
2307                "%d pages copied\n",
2308             thread_num_, status_, pages_copied_);
2309   return result;
2310 }
2311 
2312 // Spawn slave threads for incoming connections.
SpawnSlave(int newsock,int threadid)2313 bool NetworkListenThread::SpawnSlave(int newsock, int threadid) {
2314   logprintf(12, "Log: Listen thread spawning slave\n");
2315 
2316   // Spawn slave thread, to reflect network traffic back to sender.
2317   ChildWorker *child_worker = new ChildWorker;
2318   child_worker->thread.SetSock(newsock);
2319   child_worker->thread.InitThread(threadid, sat_, os_, patternlist_,
2320                                   &child_worker->status);
2321   child_worker->status.Initialize();
2322   child_worker->thread.SpawnThread();
2323   child_workers_.push_back(child_worker);
2324 
2325   return true;
2326 }
2327 
2328 // Reap slave threads.
ReapSlaves()2329 bool NetworkListenThread::ReapSlaves() {
2330   bool result = true;
2331   // Gather status and reap threads.
2332   logprintf(12, "Log: Joining all outstanding threads\n");
2333 
2334   for (size_t i = 0; i < child_workers_.size(); i++) {
2335     NetworkSlaveThread& child_thread = child_workers_[i]->thread;
2336     logprintf(12, "Log: Joining slave thread %d\n", i);
2337     child_thread.JoinThread();
2338     if (child_thread.GetStatus() != 1) {
2339       logprintf(0, "Process Error: Slave Thread %d failed with status %d\n", i,
2340                 child_thread.GetStatus());
2341       result = false;
2342     }
2343     errorcount_ += child_thread.GetErrorCount();
2344     logprintf(9, "Log: Slave Thread %d found %lld miscompares\n", i,
2345               child_thread.GetErrorCount());
2346     pages_copied_ += child_thread.GetPageCount();
2347   }
2348 
2349   return result;
2350 }
2351 
2352 // Network listener IO work loop. Execute until marked done.
2353 // Return false on fatal software error.
Work()2354 bool NetworkListenThread::Work() {
2355   logprintf(9, "Log: Starting network listen thread %d\n",
2356             thread_num_);
2357 
2358   // Make a socket.
2359   sock_ = 0;
2360   if (!CreateSocket(&sock_)) {
2361     status_ = false;
2362     return false;
2363   }
2364   logprintf(9, "Log: Listen thread created sock\n");
2365 
2366   // Allows incoming connections to be queued up by socket library.
2367   int newsock = 0;
2368   Listen();
2369   logprintf(12, "Log: Listen thread waiting for incoming connections\n");
2370 
2371   // Wait on incoming connections, and spawn worker threads for them.
2372   int threadcount = 0;
2373   while (IsReadyToRun()) {
2374     // Poll for connections that we can accept().
2375     if (Wait()) {
2376       // Accept those connections.
2377       logprintf(12, "Log: Listen thread found incoming connection\n");
2378       if (GetConnection(&newsock)) {
2379         SpawnSlave(newsock, threadcount);
2380         threadcount++;
2381       }
2382     }
2383   }
2384 
2385   // Gather status and join spawned threads.
2386   ReapSlaves();
2387 
2388   // Delete the child workers.
2389   for (ChildVector::iterator it = child_workers_.begin();
2390        it != child_workers_.end(); ++it) {
2391     (*it)->status.Destroy();
2392     delete *it;
2393   }
2394   child_workers_.clear();
2395 
2396   CloseSocket(sock_);
2397 
2398   status_ = true;
2399   logprintf(9,
2400             "Log: Completed %d: network listen thread status %d, "
2401             "%d pages copied\n",
2402             thread_num_, status_, pages_copied_);
2403   return true;
2404 }
2405 
2406 // Set network reflector socket struct.
SetSock(int sock)2407 void NetworkSlaveThread::SetSock(int sock) {
2408   sock_ = sock;
2409 }
2410 
2411 // Network reflector IO work loop. Execute until marked done.
2412 // Return false on fatal software error.
Work()2413 bool NetworkSlaveThread::Work() {
2414   logprintf(9, "Log: Starting network slave thread %d\n",
2415             thread_num_);
2416 
2417   // Verify that we have a socket.
2418   int sock = sock_;
2419   if (!sock) {
2420     status_ = false;
2421     return false;
2422   }
2423 
2424   // Loop until done.
2425   int64 loops = 0;
2426   // Init a local buffer for storing data.
2427   void *local_page = NULL;
2428 #ifdef HAVE_POSIX_MEMALIGN
2429   int result = posix_memalign(&local_page, 512, sat_->page_length());
2430 #else
2431   local_page = memalign(512, sat_->page_length());
2432   int result = (local_page == 0);
2433 #endif
2434   if (result) {
2435     logprintf(0, "Process Error: net slave posix_memalign "
2436                  "returned %d (fail)\n",
2437               result);
2438     status_ = false;
2439     return false;
2440   }
2441 
2442   struct page_entry page;
2443   page.addr = local_page;
2444 
2445   // This thread will continue to run as long as the thread on the other end of
2446   // the socket is still sending and receiving data.
2447   while (1) {
2448     // Do the network read.
2449     if (!ReceivePage(sock, &page))
2450       break;
2451 
2452     // Do the network write.
2453     if (!SendPage(sock, &page))
2454       break;
2455 
2456     loops++;
2457   }
2458 
2459   pages_copied_ = loops;
2460   // No results provided from this type of thread.
2461   status_ = true;
2462 
2463   // Clean up.
2464   CloseSocket(sock);
2465 
2466   logprintf(9,
2467             "Log: Completed %d: network slave thread status %d, "
2468             "%d pages copied\n",
2469             thread_num_, status_, pages_copied_);
2470   return true;
2471 }
2472 
2473 // Thread work loop. Execute until marked finished.
Work()2474 bool ErrorPollThread::Work() {
2475   logprintf(9, "Log: Starting system error poll thread %d\n", thread_num_);
2476 
2477   // This calls a generic error polling function in the Os abstraction layer.
2478   do {
2479     errorcount_ += os_->ErrorPoll();
2480     os_->ErrorWait();
2481   } while (IsReadyToRun());
2482 
2483   logprintf(9, "Log: Finished system error poll thread %d: %d errors\n",
2484             thread_num_, errorcount_);
2485   status_ = true;
2486   return true;
2487 }
2488 
2489 // Worker thread to heat up CPU.
2490 // This thread does not evaluate pass/fail or software error.
Work()2491 bool CpuStressThread::Work() {
2492   logprintf(9, "Log: Starting CPU stress thread %d\n", thread_num_);
2493 
2494   do {
2495     // Run ludloff's platform/CPU-specific assembly workload.
2496     os_->CpuStressWorkload();
2497     YieldSelf();
2498   } while (IsReadyToRun());
2499 
2500   logprintf(9, "Log: Finished CPU stress thread %d:\n",
2501             thread_num_);
2502   status_ = true;
2503   return true;
2504 }
2505 
CpuCacheCoherencyThread(cc_cacheline_data * data,int cacheline_count,int thread_num,int thread_count,int inc_count)2506 CpuCacheCoherencyThread::CpuCacheCoherencyThread(cc_cacheline_data *data,
2507                                                  int cacheline_count,
2508                                                  int thread_num,
2509                                                  int thread_count,
2510                                                  int inc_count) {
2511   cc_cacheline_data_ = data;
2512   cc_cacheline_count_ = cacheline_count;
2513   cc_thread_num_ = thread_num;
2514   cc_thread_count_ = thread_count;
2515   cc_inc_count_ = inc_count;
2516 }
2517 
2518 // A very simple psuedorandom generator.  Since the random number is based
2519 // on only a few simple logic operations, it can be done quickly in registers
2520 // and the compiler can inline it.
SimpleRandom(uint64 seed)2521 uint64 CpuCacheCoherencyThread::SimpleRandom(uint64 seed) {
2522   return (seed >> 1) ^ (-(seed & 1) & kRandomPolynomial);
2523 }
2524 
2525 // Worked thread to test the cache coherency of the CPUs
2526 // Return false on fatal sw error.
Work()2527 bool CpuCacheCoherencyThread::Work() {
2528   logprintf(9, "Log: Starting the Cache Coherency thread %d\n",
2529             cc_thread_num_);
2530   int64 time_start, time_end;
2531 
2532   // Use a slightly more robust random number for the initial
2533   // value, so the random sequences from the simple generator will
2534   // be more divergent.
2535 #ifdef HAVE_RAND_R
2536   unsigned int seed = static_cast<unsigned int>(gettid());
2537   uint64 r = static_cast<uint64>(rand_r(&seed));
2538   r |= static_cast<uint64>(rand_r(&seed)) << 32;
2539 #else
2540   srand(time(NULL));
2541   uint64 r = static_cast<uint64>(rand());  // NOLINT
2542   r |= static_cast<uint64>(rand()) << 32;  // NOLINT
2543 #endif
2544 
2545   time_start = sat_get_time_us();
2546 
2547   uint64 total_inc = 0;  // Total increments done by the thread.
2548   while (IsReadyToRun()) {
2549     for (int i = 0; i < cc_inc_count_; i++) {
2550       // Choose a datastructure in random and increment the appropriate
2551       // member in that according to the offset (which is the same as the
2552       // thread number.
2553       r = SimpleRandom(r);
2554       int cline_num = r % cc_cacheline_count_;
2555       int offset;
2556       // Reverse the order for odd numbered threads in odd numbered cache
2557       // lines.  This is designed for massively multi-core systems where the
2558       // number of cores exceeds the bytes in a cache line, so "distant" cores
2559       // get a chance to exercize cache coherency between them.
2560       if (cline_num & cc_thread_num_ & 1)
2561         offset = (cc_thread_count_ & ~1) - cc_thread_num_;
2562       else
2563         offset = cc_thread_num_;
2564       // Increment the member of the randomely selected structure.
2565       (cc_cacheline_data_[cline_num].num[offset])++;
2566     }
2567 
2568     total_inc += cc_inc_count_;
2569 
2570     // Calculate if the local counter matches with the global value
2571     // in all the cache line structures for this particular thread.
2572     int cc_global_num = 0;
2573     for (int cline_num = 0; cline_num < cc_cacheline_count_; cline_num++) {
2574       int offset;
2575       // Perform the same offset calculation from above.
2576       if (cline_num & cc_thread_num_ & 1)
2577         offset = (cc_thread_count_ & ~1) - cc_thread_num_;
2578       else
2579         offset = cc_thread_num_;
2580       cc_global_num += cc_cacheline_data_[cline_num].num[offset];
2581       // Reset the cachline member's value for the next run.
2582       cc_cacheline_data_[cline_num].num[offset] = 0;
2583     }
2584     if (sat_->error_injection())
2585       cc_global_num = -1;
2586 
2587     // Since the count is only stored in a byte, to squeeze more into a
2588     // single cache line, only compare it as a byte.  In the event that there
2589     // is something detected, the chance that it would be missed by a single
2590     // thread is 1 in 256.  If it affects all cores, that makes the chance
2591     // of it being missed terribly minute.  It seems unlikely any failure
2592     // case would be off by more than a small number.
2593     if ((cc_global_num & 0xff) != (cc_inc_count_ & 0xff)) {
2594       errorcount_++;
2595       logprintf(0, "Hardware Error: global(%d) and local(%d) do not match\n",
2596                 cc_global_num, cc_inc_count_);
2597     }
2598   }
2599   time_end = sat_get_time_us();
2600 
2601   int64 us_elapsed = time_end - time_start;
2602   // inc_rate is the no. of increments per second.
2603   double inc_rate = total_inc * 1e6 / us_elapsed;
2604 
2605   logprintf(4, "Stats: CC Thread(%d): Time=%llu us,"
2606             " Increments=%llu, Increments/sec = %.6lf\n",
2607             cc_thread_num_, us_elapsed, total_inc, inc_rate);
2608   logprintf(9, "Log: Finished CPU Cache Coherency thread %d:\n",
2609             cc_thread_num_);
2610   status_ = true;
2611   return true;
2612 }
2613 
DiskThread(DiskBlockTable * block_table)2614 DiskThread::DiskThread(DiskBlockTable *block_table) {
2615   read_block_size_ = kSectorSize;   // default 1 sector (512 bytes)
2616   write_block_size_ = kSectorSize;  // this assumes read and write block size
2617                                     // are the same
2618   segment_size_ = -1;               // use the entire disk as one segment
2619   cache_size_ = 16 * 1024 * 1024;   // assume 16MiB cache by default
2620   // Use a queue such that 3/2 times as much data as the cache can hold
2621   // is written before it is read so that there is little chance the read
2622   // data is in the cache.
2623   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2624   blocks_per_segment_ = 32;
2625 
2626   read_threshold_ = 100000;         // 100ms is a reasonable limit for
2627   write_threshold_ = 100000;        // reading/writing a sector
2628 
2629   read_timeout_ = 5000000;          // 5 seconds should be long enough for a
2630   write_timeout_ = 5000000;         // timout for reading/writing
2631 
2632   device_sectors_ = 0;
2633   non_destructive_ = 0;
2634 
2635 #ifdef HAVE_LIBAIO_H
2636   aio_ctx_ = 0;
2637 #endif
2638   block_table_ = block_table;
2639   update_block_table_ = 1;
2640 
2641   block_buffer_ = NULL;
2642 
2643   blocks_written_ = 0;
2644   blocks_read_ = 0;
2645 }
2646 
~DiskThread()2647 DiskThread::~DiskThread() {
2648   if (block_buffer_)
2649     free(block_buffer_);
2650 }
2651 
2652 // Set filename for device file (in /dev).
SetDevice(const char * device_name)2653 void DiskThread::SetDevice(const char *device_name) {
2654   device_name_ = device_name;
2655 }
2656 
2657 // Set various parameters that control the behaviour of the test.
2658 // -1 is used as a sentinel value on each parameter (except non_destructive)
2659 // to indicate that the parameter not be set.
SetParameters(int read_block_size,int write_block_size,int64 segment_size,int64 cache_size,int blocks_per_segment,int64 read_threshold,int64 write_threshold,int non_destructive)2660 bool DiskThread::SetParameters(int read_block_size,
2661                                int write_block_size,
2662                                int64 segment_size,
2663                                int64 cache_size,
2664                                int blocks_per_segment,
2665                                int64 read_threshold,
2666                                int64 write_threshold,
2667                                int non_destructive) {
2668   if (read_block_size != -1) {
2669     // Blocks must be aligned to the disk's sector size.
2670     if (read_block_size % kSectorSize != 0) {
2671       logprintf(0, "Process Error: Block size must be a multiple of %d "
2672                 "(thread %d).\n", kSectorSize, thread_num_);
2673       return false;
2674     }
2675 
2676     read_block_size_ = read_block_size;
2677   }
2678 
2679   if (write_block_size != -1) {
2680     // Write blocks must be aligned to the disk's sector size and to the
2681     // block size.
2682     if (write_block_size % kSectorSize != 0) {
2683       logprintf(0, "Process Error: Write block size must be a multiple "
2684                 "of %d (thread %d).\n", kSectorSize, thread_num_);
2685       return false;
2686     }
2687     if (write_block_size % read_block_size_ != 0) {
2688       logprintf(0, "Process Error: Write block size must be a multiple "
2689                 "of the read block size, which is %d (thread %d).\n",
2690                 read_block_size_, thread_num_);
2691       return false;
2692     }
2693 
2694     write_block_size_ = write_block_size;
2695 
2696   } else {
2697     // Make sure write_block_size_ is still valid.
2698     if (read_block_size_ > write_block_size_) {
2699       logprintf(5, "Log: Assuming write block size equal to read block size, "
2700                 "which is %d (thread %d).\n", read_block_size_,
2701                 thread_num_);
2702       write_block_size_ = read_block_size_;
2703     } else {
2704       if (write_block_size_ % read_block_size_ != 0) {
2705         logprintf(0, "Process Error: Write block size (defined as %d) must "
2706                   "be a multiple of the read block size, which is %d "
2707                   "(thread %d).\n", write_block_size_, read_block_size_,
2708                   thread_num_);
2709         return false;
2710       }
2711     }
2712   }
2713 
2714   if (cache_size != -1) {
2715     cache_size_ = cache_size;
2716   }
2717 
2718   if (blocks_per_segment != -1) {
2719     if (blocks_per_segment <= 0) {
2720       logprintf(0, "Process Error: Blocks per segment must be greater than "
2721                    "zero.\n (thread %d)", thread_num_);
2722       return false;
2723     }
2724 
2725     blocks_per_segment_ = blocks_per_segment;
2726   }
2727 
2728   if (read_threshold != -1) {
2729     if (read_threshold <= 0) {
2730       logprintf(0, "Process Error: Read threshold must be greater than "
2731                    "zero (thread %d).\n", thread_num_);
2732       return false;
2733     }
2734 
2735     read_threshold_ = read_threshold;
2736   }
2737 
2738   if (write_threshold != -1) {
2739     if (write_threshold <= 0) {
2740       logprintf(0, "Process Error: Write threshold must be greater than "
2741                    "zero (thread %d).\n", thread_num_);
2742       return false;
2743     }
2744 
2745     write_threshold_ = write_threshold;
2746   }
2747 
2748   if (segment_size != -1) {
2749     // Segments must be aligned to the disk's sector size.
2750     if (segment_size % kSectorSize != 0) {
2751       logprintf(0, "Process Error: Segment size must be a multiple of %d"
2752                 " (thread %d).\n", kSectorSize, thread_num_);
2753       return false;
2754     }
2755 
2756     segment_size_ = segment_size / kSectorSize;
2757   }
2758 
2759   non_destructive_ = non_destructive;
2760 
2761   // Having a queue of 150% of blocks that will fit in the disk's cache
2762   // should be enough to force out the oldest block before it is read and hence,
2763   // making sure the data comes form the disk and not the cache.
2764   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2765   // Updating DiskBlockTable parameters
2766   if (update_block_table_) {
2767     block_table_->SetParameters(kSectorSize, write_block_size_,
2768                                 device_sectors_, segment_size_,
2769                                 device_name_);
2770   }
2771   return true;
2772 }
2773 
2774 // Open a device, return false on failure.
OpenDevice(int * pfile)2775 bool DiskThread::OpenDevice(int *pfile) {
2776   int flags = O_RDWR | O_SYNC | O_LARGEFILE;
2777   int fd = open(device_name_.c_str(), flags | O_DIRECT, 0);
2778   if (O_DIRECT != 0 && fd < 0 && errno == EINVAL) {
2779     fd = open(device_name_.c_str(), flags, 0);  // Try without O_DIRECT
2780     os_->ActivateFlushPageCache();
2781   }
2782   if (fd < 0) {
2783     logprintf(0, "Process Error: Failed to open device %s (thread %d)!!\n",
2784               device_name_.c_str(), thread_num_);
2785     return false;
2786   }
2787   *pfile = fd;
2788 
2789   return GetDiskSize(fd);
2790 }
2791 
2792 // Retrieves the size (in bytes) of the disk/file.
2793 // Return false on failure.
GetDiskSize(int fd)2794 bool DiskThread::GetDiskSize(int fd) {
2795   struct stat device_stat;
2796   if (fstat(fd, &device_stat) == -1) {
2797     logprintf(0, "Process Error: Unable to fstat disk %s (thread %d).\n",
2798               device_name_.c_str(), thread_num_);
2799     return false;
2800   }
2801 
2802   // For a block device, an ioctl is needed to get the size since the size
2803   // of the device file (i.e. /dev/sdb) is 0.
2804   if (S_ISBLK(device_stat.st_mode)) {
2805     uint64 block_size = 0;
2806 
2807     if (ioctl(fd, BLKGETSIZE64, &block_size) == -1) {
2808       logprintf(0, "Process Error: Unable to ioctl disk %s (thread %d).\n",
2809                 device_name_.c_str(), thread_num_);
2810       return false;
2811     }
2812 
2813     // Zero size indicates nonworking device..
2814     if (block_size == 0) {
2815       os_->ErrorReport(device_name_.c_str(), "device-size-zero", 1);
2816       ++errorcount_;
2817       status_ = true;  // Avoid a procedural error.
2818       return false;
2819     }
2820 
2821     device_sectors_ = block_size / kSectorSize;
2822 
2823   } else if (S_ISREG(device_stat.st_mode)) {
2824     device_sectors_ = device_stat.st_size / kSectorSize;
2825 
2826   } else {
2827     logprintf(0, "Process Error: %s is not a regular file or block "
2828               "device (thread %d).\n", device_name_.c_str(),
2829               thread_num_);
2830     return false;
2831   }
2832 
2833   logprintf(12, "Log: Device sectors: %lld on disk %s (thread %d).\n",
2834             device_sectors_, device_name_.c_str(), thread_num_);
2835 
2836   if (update_block_table_) {
2837     block_table_->SetParameters(kSectorSize, write_block_size_,
2838                                 device_sectors_, segment_size_,
2839                                 device_name_);
2840   }
2841 
2842   return true;
2843 }
2844 
CloseDevice(int fd)2845 bool DiskThread::CloseDevice(int fd) {
2846   close(fd);
2847   return true;
2848 }
2849 
2850 // Return the time in microseconds.
GetTime()2851 int64 DiskThread::GetTime() {
2852   return sat_get_time_us();
2853 }
2854 
2855 // Do randomized reads and (possibly) writes on a device.
2856 // Return false on fatal SW error, true on SW success,
2857 // regardless of whether HW failed.
DoWork(int fd)2858 bool DiskThread::DoWork(int fd) {
2859   int64 block_num = 0;
2860   int64 num_segments;
2861 
2862   if (segment_size_ == -1) {
2863     num_segments = 1;
2864   } else {
2865     num_segments = device_sectors_ / segment_size_;
2866     if (device_sectors_ % segment_size_ != 0)
2867       num_segments++;
2868   }
2869 
2870   // Disk size should be at least 3x cache size.  See comment later for
2871   // details.
2872   sat_assert(device_sectors_ * kSectorSize > 3 * cache_size_);
2873 
2874   // This disk test works by writing blocks with a certain pattern to
2875   // disk, then reading them back and verifying it against the pattern
2876   // at a later time.  A failure happens when either the block cannot
2877   // be written/read or when the read block is different than what was
2878   // written.  If a block takes too long to write/read, then a warning
2879   // is given instead of an error since taking too long is not
2880   // necessarily an error.
2881   //
2882   // To prevent the read blocks from coming from the disk cache,
2883   // enough blocks are written before read such that a block would
2884   // be ejected from the disk cache by the time it is read.
2885   //
2886   // TODO(amistry): Implement some sort of read/write throttling.  The
2887   //                flood of asynchronous I/O requests when a drive is
2888   //                unplugged is causing the application and kernel to
2889   //                become unresponsive.
2890 
2891   while (IsReadyToRun()) {
2892     // Write blocks to disk.
2893     logprintf(16, "Log: Write phase %sfor disk %s (thread %d).\n",
2894               non_destructive_ ? "(disabled) " : "",
2895               device_name_.c_str(), thread_num_);
2896     while (IsReadyToRunNoPause() &&
2897            in_flight_sectors_.size() <
2898                static_cast<size_t>(queue_size_ + 1)) {
2899       // Confine testing to a particular segment of the disk.
2900       int64 segment = (block_num / blocks_per_segment_) % num_segments;
2901       if (!non_destructive_ &&
2902           (block_num % blocks_per_segment_ == 0)) {
2903         logprintf(20, "Log: Starting to write segment %lld out of "
2904                   "%lld on disk %s (thread %d).\n",
2905                   segment, num_segments, device_name_.c_str(),
2906                   thread_num_);
2907       }
2908       block_num++;
2909 
2910       BlockData *block = block_table_->GetUnusedBlock(segment);
2911 
2912       // If an unused sequence of sectors could not be found, skip to the
2913       // next block to process.  Soon, a new segment will come and new
2914       // sectors will be able to be allocated.  This effectively puts a
2915       // minumim on the disk size at 3x the stated cache size, or 48MiB
2916       // if a cache size is not given (since the cache is set as 16MiB
2917       // by default).  Given that todays caches are at the low MiB range
2918       // and drive sizes at the mid GB, this shouldn't pose a problem.
2919       // The 3x minimum comes from the following:
2920       //   1. In order to allocate 'y' blocks from a segment, the
2921       //      segment must contain at least 2y blocks or else an
2922       //      allocation may not succeed.
2923       //   2. Assume the entire disk is one segment.
2924       //   3. A full write phase consists of writing blocks corresponding to
2925       //      3/2 cache size.
2926       //   4. Therefore, the one segment must have 2 * 3/2 * cache
2927       //      size worth of blocks = 3 * cache size worth of blocks
2928       //      to complete.
2929       // In non-destructive mode, don't write anything to disk.
2930       if (!non_destructive_) {
2931         if (!WriteBlockToDisk(fd, block)) {
2932           block_table_->RemoveBlock(block);
2933           return true;
2934         }
2935         blocks_written_++;
2936       }
2937 
2938       // Block is either initialized by writing, or in nondestructive case,
2939       // initialized by being added into the datastructure for later reading.
2940       block->initialized();
2941 
2942       in_flight_sectors_.push(block);
2943     }
2944     if (!os_->FlushPageCache())  // If O_DIRECT worked, this will be a NOP.
2945       return false;
2946 
2947     // Verify blocks on disk.
2948     logprintf(20, "Log: Read phase for disk %s (thread %d).\n",
2949               device_name_.c_str(), thread_num_);
2950     while (IsReadyToRunNoPause() && !in_flight_sectors_.empty()) {
2951       BlockData *block = in_flight_sectors_.front();
2952       in_flight_sectors_.pop();
2953       if (!ValidateBlockOnDisk(fd, block))
2954         return true;
2955       block_table_->RemoveBlock(block);
2956       blocks_read_++;
2957     }
2958   }
2959 
2960   pages_copied_ = blocks_written_ + blocks_read_;
2961   return true;
2962 }
2963 
2964 // Do an asynchronous disk I/O operation.
2965 // Return false if the IO is not set up.
AsyncDiskIO(IoOp op,int fd,void * buf,int64 size,int64 offset,int64 timeout)2966 bool DiskThread::AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
2967                             int64 offset, int64 timeout) {
2968 #ifdef HAVE_LIBAIO_H
2969   // Use the Linux native asynchronous I/O interface for reading/writing.
2970   // A read/write consists of three basic steps:
2971   //    1. create an io context.
2972   //    2. prepare and submit an io request to the context
2973   //    3. wait for an event on the context.
2974 
2975   struct {
2976     const int opcode;
2977     const char *op_str;
2978     const char *error_str;
2979   } operations[2] = {
2980     { IO_CMD_PREAD, "read", "disk-read-error" },
2981     { IO_CMD_PWRITE, "write", "disk-write-error" }
2982   };
2983 
2984   struct iocb cb;
2985   memset(&cb, 0, sizeof(cb));
2986 
2987   cb.aio_fildes = fd;
2988   cb.aio_lio_opcode = operations[op].opcode;
2989   cb.u.c.buf = buf;
2990   cb.u.c.nbytes = size;
2991   cb.u.c.offset = offset;
2992 
2993   struct iocb *cbs[] = { &cb };
2994   if (io_submit(aio_ctx_, 1, cbs) != 1) {
2995     int error = errno;
2996     char buf[256];
2997     sat_strerror(error, buf, sizeof(buf));
2998     logprintf(0, "Process Error: Unable to submit async %s "
2999                  "on disk %s (thread %d). Error %d, %s\n",
3000               operations[op].op_str, device_name_.c_str(),
3001               thread_num_, error, buf);
3002     return false;
3003   }
3004 
3005   struct io_event event;
3006   memset(&event, 0, sizeof(event));
3007   struct timespec tv;
3008   tv.tv_sec = timeout / 1000000;
3009   tv.tv_nsec = (timeout % 1000000) * 1000;
3010   if (io_getevents(aio_ctx_, 1, 1, &event, &tv) != 1) {
3011     // A ctrl-c from the keyboard will cause io_getevents to fail with an
3012     // EINTR error code.  This is not an error and so don't treat it as such,
3013     // but still log it.
3014     int error = errno;
3015     if (error == EINTR) {
3016       logprintf(5, "Log: %s interrupted on disk %s (thread %d).\n",
3017                 operations[op].op_str, device_name_.c_str(),
3018                 thread_num_);
3019     } else {
3020       os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
3021       errorcount_ += 1;
3022       logprintf(0, "Hardware Error: Timeout doing async %s to sectors "
3023                    "starting at %lld on disk %s (thread %d).\n",
3024                 operations[op].op_str, offset / kSectorSize,
3025                 device_name_.c_str(), thread_num_);
3026     }
3027 
3028     // Don't bother checking return codes since io_cancel seems to always fail.
3029     // Since io_cancel is always failing, destroying and recreating an I/O
3030     // context is a workaround for canceling an in-progress I/O operation.
3031     // TODO(amistry): Find out why io_cancel isn't working and make it work.
3032     io_cancel(aio_ctx_, &cb, &event);
3033     io_destroy(aio_ctx_);
3034     aio_ctx_ = 0;
3035     if (io_setup(5, &aio_ctx_)) {
3036       int error = errno;
3037       char buf[256];
3038       sat_strerror(error, buf, sizeof(buf));
3039       logprintf(0, "Process Error: Unable to create aio context on disk %s"
3040                 " (thread %d) Error %d, %s\n",
3041                 device_name_.c_str(), thread_num_, error, buf);
3042     }
3043 
3044     return false;
3045   }
3046 
3047   // event.res contains the number of bytes written/read or
3048   // error if < 0, I think.
3049   if (event.res != static_cast<uint64>(size)) {
3050     errorcount_++;
3051     os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
3052 
3053     int64 result = static_cast<int64>(event.res);
3054     if (result < 0) {
3055       switch (result) {
3056         case -EIO:
3057           logprintf(0, "Hardware Error: Low-level I/O error while doing %s to "
3058                        "sectors starting at %lld on disk %s (thread %d).\n",
3059                     operations[op].op_str, offset / kSectorSize,
3060                     device_name_.c_str(), thread_num_);
3061           break;
3062         default:
3063           logprintf(0, "Hardware Error: Unknown error while doing %s to "
3064                        "sectors starting at %lld on disk %s (thread %d).\n",
3065                     operations[op].op_str, offset / kSectorSize,
3066                     device_name_.c_str(), thread_num_);
3067       }
3068     } else {
3069       logprintf(0, "Hardware Error: Unable to %s to sectors starting at "
3070                    "%lld on disk %s (thread %d).\n",
3071                 operations[op].op_str, offset / kSectorSize,
3072                 device_name_.c_str(), thread_num_);
3073     }
3074     return false;
3075   }
3076 
3077   return true;
3078 #else  // !HAVE_LIBAIO_H
3079   return false;
3080 #endif
3081 }
3082 
3083 // Write a block to disk.
3084 // Return false if the block is not written.
WriteBlockToDisk(int fd,BlockData * block)3085 bool DiskThread::WriteBlockToDisk(int fd, BlockData *block) {
3086   memset(block_buffer_, 0, block->size());
3087 
3088   // Fill block buffer with a pattern
3089   struct page_entry pe;
3090   if (!sat_->GetValid(&pe)) {
3091     // Even though a valid page could not be obatined, it is not an error
3092     // since we can always fill in a pattern directly, albeit slower.
3093     unsigned int *memblock = static_cast<unsigned int *>(block_buffer_);
3094     block->set_pattern(patternlist_->GetRandomPattern());
3095 
3096     logprintf(11, "Log: Warning, using pattern fill fallback in "
3097                   "DiskThread::WriteBlockToDisk on disk %s (thread %d).\n",
3098               device_name_.c_str(), thread_num_);
3099 
3100     for (unsigned int i = 0; i < block->size()/wordsize_; i++) {
3101       memblock[i] = block->pattern()->pattern(i);
3102     }
3103   } else {
3104     memcpy(block_buffer_, pe.addr, block->size());
3105     block->set_pattern(pe.pattern);
3106     sat_->PutValid(&pe);
3107   }
3108 
3109   logprintf(12, "Log: Writing %lld sectors starting at %lld on disk %s"
3110             " (thread %d).\n",
3111             block->size()/kSectorSize, block->address(),
3112             device_name_.c_str(), thread_num_);
3113 
3114   int64 start_time = GetTime();
3115 
3116   if (!AsyncDiskIO(ASYNC_IO_WRITE, fd, block_buffer_, block->size(),
3117                    block->address() * kSectorSize, write_timeout_)) {
3118     return false;
3119   }
3120 
3121   int64 end_time = GetTime();
3122   logprintf(12, "Log: Writing time: %lld us (thread %d).\n",
3123             end_time - start_time, thread_num_);
3124   if (end_time - start_time > write_threshold_) {
3125     logprintf(5, "Log: Write took %lld us which is longer than threshold "
3126                  "%lld us on disk %s (thread %d).\n",
3127               end_time - start_time, write_threshold_, device_name_.c_str(),
3128               thread_num_);
3129   }
3130 
3131   return true;
3132 }
3133 
3134 // Verify a block on disk.
3135 // Return true if the block was read, also increment errorcount
3136 // if the block had data errors or performance problems.
ValidateBlockOnDisk(int fd,BlockData * block)3137 bool DiskThread::ValidateBlockOnDisk(int fd, BlockData *block) {
3138   int64 blocks = block->size() / read_block_size_;
3139   int64 bytes_read = 0;
3140   int64 current_blocks;
3141   int64 current_bytes;
3142   uint64 address = block->address();
3143 
3144   logprintf(20, "Log: Reading sectors starting at %lld on disk %s "
3145             "(thread %d).\n",
3146             address, device_name_.c_str(), thread_num_);
3147 
3148   // Read block from disk and time the read.  If it takes longer than the
3149   // threshold, complain.
3150   if (lseek64(fd, address * kSectorSize, SEEK_SET) == -1) {
3151     logprintf(0, "Process Error: Unable to seek to sector %lld in "
3152               "DiskThread::ValidateSectorsOnDisk on disk %s "
3153               "(thread %d).\n", address, device_name_.c_str(), thread_num_);
3154     return false;
3155   }
3156   int64 start_time = GetTime();
3157 
3158   // Split a large write-sized block into small read-sized blocks and
3159   // read them in groups of randomly-sized multiples of read block size.
3160   // This assures all data written on disk by this particular block
3161   // will be tested using a random reading pattern.
3162   while (blocks != 0) {
3163     // Test all read blocks in a written block.
3164     current_blocks = (random() % blocks) + 1;
3165     current_bytes = current_blocks * read_block_size_;
3166 
3167     memset(block_buffer_, 0, current_bytes);
3168 
3169     logprintf(20, "Log: Reading %lld sectors starting at sector %lld on "
3170               "disk %s (thread %d)\n",
3171               current_bytes / kSectorSize,
3172               (address * kSectorSize + bytes_read) / kSectorSize,
3173               device_name_.c_str(), thread_num_);
3174 
3175     if (!AsyncDiskIO(ASYNC_IO_READ, fd, block_buffer_, current_bytes,
3176                      address * kSectorSize + bytes_read,
3177                      write_timeout_)) {
3178       return false;
3179     }
3180 
3181     int64 end_time = GetTime();
3182     logprintf(20, "Log: Reading time: %lld us (thread %d).\n",
3183               end_time - start_time, thread_num_);
3184     if (end_time - start_time > read_threshold_) {
3185       logprintf(5, "Log: Read took %lld us which is longer than threshold "
3186                 "%lld us on disk %s (thread %d).\n",
3187                 end_time - start_time, read_threshold_,
3188                 device_name_.c_str(), thread_num_);
3189     }
3190 
3191     // In non-destructive mode, don't compare the block to the pattern since
3192     // the block was never written to disk in the first place.
3193     if (!non_destructive_) {
3194       if (CheckRegion(block_buffer_, block->pattern(), 0, current_bytes,
3195                       0, bytes_read)) {
3196         os_->ErrorReport(device_name_.c_str(), "disk-pattern-error", 1);
3197         errorcount_ += 1;
3198         logprintf(0, "Hardware Error: Pattern mismatch in block starting at "
3199                   "sector %lld in DiskThread::ValidateSectorsOnDisk on "
3200                   "disk %s (thread %d).\n",
3201                   address, device_name_.c_str(), thread_num_);
3202       }
3203     }
3204 
3205     bytes_read += current_blocks * read_block_size_;
3206     blocks -= current_blocks;
3207   }
3208 
3209   return true;
3210 }
3211 
3212 // Direct device access thread.
3213 // Return false on software error.
Work()3214 bool DiskThread::Work() {
3215   int fd;
3216 
3217   logprintf(9, "Log: Starting disk thread %d, disk %s\n",
3218             thread_num_, device_name_.c_str());
3219 
3220   srandom(time(NULL));
3221 
3222   if (!OpenDevice(&fd)) {
3223     status_ = false;
3224     return false;
3225   }
3226 
3227   // Allocate a block buffer aligned to 512 bytes since the kernel requires it
3228   // when using direct IO.
3229 #ifdef HAVE_POSIX_MEMALIGN
3230   int memalign_result = posix_memalign(&block_buffer_, kBufferAlignment,
3231                                        sat_->page_length());
3232 #else
3233   block_buffer_ = memalign(kBufferAlignment, sat_->page_length());
3234   int memalign_result = (block_buffer_ == 0);
3235 #endif
3236   if (memalign_result) {
3237     CloseDevice(fd);
3238     logprintf(0, "Process Error: Unable to allocate memory for buffers "
3239                  "for disk %s (thread %d) posix memalign returned %d.\n",
3240               device_name_.c_str(), thread_num_, memalign_result);
3241     status_ = false;
3242     return false;
3243   }
3244 
3245 #ifdef HAVE_LIBAIO_H
3246   if (io_setup(5, &aio_ctx_)) {
3247     CloseDevice(fd);
3248     logprintf(0, "Process Error: Unable to create aio context for disk %s"
3249               " (thread %d).\n",
3250               device_name_.c_str(), thread_num_);
3251     status_ = false;
3252     return false;
3253   }
3254 #endif
3255 
3256   bool result = DoWork(fd);
3257 
3258   status_ = result;
3259 
3260 #ifdef HAVE_LIBAIO_H
3261   io_destroy(aio_ctx_);
3262 #endif
3263   CloseDevice(fd);
3264 
3265   logprintf(9, "Log: Completed %d (disk %s): disk thread status %d, "
3266                "%d pages copied\n",
3267             thread_num_, device_name_.c_str(), status_, pages_copied_);
3268   return result;
3269 }
3270 
RandomDiskThread(DiskBlockTable * block_table)3271 RandomDiskThread::RandomDiskThread(DiskBlockTable *block_table)
3272     : DiskThread(block_table) {
3273   update_block_table_ = 0;
3274 }
3275 
~RandomDiskThread()3276 RandomDiskThread::~RandomDiskThread() {
3277 }
3278 
3279 // Workload for random disk thread.
DoWork(int fd)3280 bool RandomDiskThread::DoWork(int fd) {
3281   logprintf(11, "Log: Random phase for disk %s (thread %d).\n",
3282             device_name_.c_str(), thread_num_);
3283   while (IsReadyToRun()) {
3284     BlockData *block = block_table_->GetRandomBlock();
3285     if (block == NULL) {
3286       logprintf(12, "Log: No block available for device %s (thread %d).\n",
3287                 device_name_.c_str(), thread_num_);
3288     } else {
3289       ValidateBlockOnDisk(fd, block);
3290       block_table_->ReleaseBlock(block);
3291       blocks_read_++;
3292     }
3293   }
3294   pages_copied_ = blocks_read_;
3295   return true;
3296 }
3297 
MemoryRegionThread()3298 MemoryRegionThread::MemoryRegionThread() {
3299   error_injection_ = false;
3300   pages_ = NULL;
3301 }
3302 
~MemoryRegionThread()3303 MemoryRegionThread::~MemoryRegionThread() {
3304   if (pages_ != NULL)
3305     delete pages_;
3306 }
3307 
3308 // Set a region of memory or MMIO to be tested.
3309 // Return false if region could not be mapped.
SetRegion(void * region,int64 size)3310 bool MemoryRegionThread::SetRegion(void *region, int64 size) {
3311   int plength = sat_->page_length();
3312   int npages = size / plength;
3313   if (size % plength) {
3314     logprintf(0, "Process Error: region size is not a multiple of SAT "
3315               "page length\n");
3316     return false;
3317   } else {
3318     if (pages_ != NULL)
3319       delete pages_;
3320     pages_ = new PageEntryQueue(npages);
3321     char *base_addr = reinterpret_cast<char*>(region);
3322     region_ = base_addr;
3323     for (int i = 0; i < npages; i++) {
3324       struct page_entry pe;
3325       init_pe(&pe);
3326       pe.addr = reinterpret_cast<void*>(base_addr + i * plength);
3327       pe.offset = i * plength;
3328 
3329       pages_->Push(&pe);
3330     }
3331     return true;
3332   }
3333 }
3334 
3335 // More detailed error printout for hardware errors in memory or MMIO
3336 // regions.
ProcessError(struct ErrorRecord * error,int priority,const char * message)3337 void MemoryRegionThread::ProcessError(struct ErrorRecord *error,
3338                                       int priority,
3339                                       const char *message) {
3340   uint32 buffer_offset;
3341   if (phase_ == kPhaseCopy) {
3342     // If the error occurred on the Copy Phase, it means that
3343     // the source data (i.e., the main memory) is wrong. so
3344     // just pass it to the original ProcessError to call a
3345     // bad-dimm error
3346     WorkerThread::ProcessError(error, priority, message);
3347   } else if (phase_ == kPhaseCheck) {
3348     // A error on the Check Phase means that the memory region tested
3349     // has an error. Gathering more information and then reporting
3350     // the error.
3351     // Determine if this is a write or read error.
3352     os_->Flush(error->vaddr);
3353     error->reread = *(error->vaddr);
3354     char *good = reinterpret_cast<char*>(&(error->expected));
3355     char *bad = reinterpret_cast<char*>(&(error->actual));
3356     sat_assert(error->expected != error->actual);
3357     unsigned int offset = 0;
3358     for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
3359       if (good[offset] != bad[offset])
3360         break;
3361     }
3362 
3363     error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
3364 
3365     buffer_offset = error->vbyteaddr - region_;
3366 
3367     // Find physical address if possible.
3368     error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
3369     logprintf(priority,
3370               "%s: miscompare on %s, CRC check at %p(0x%llx), "
3371               "offset %llx: read:0x%016llx, reread:0x%016llx "
3372               "expected:0x%016llx\n",
3373               message,
3374               identifier_.c_str(),
3375               error->vaddr,
3376               error->paddr,
3377               buffer_offset,
3378               error->actual,
3379               error->reread,
3380               error->expected);
3381   } else {
3382     logprintf(0, "Process Error: memory region thread raised an "
3383               "unexpected error.");
3384   }
3385 }
3386 
3387 // Workload for testion memory or MMIO regions.
3388 // Return false on software error.
Work()3389 bool MemoryRegionThread::Work() {
3390   struct page_entry source_pe;
3391   struct page_entry memregion_pe;
3392   bool result = true;
3393   int64 loops = 0;
3394   const uint64 error_constant = 0x00ba00000000ba00LL;
3395 
3396   // For error injection.
3397   int64 *addr = 0x0;
3398   int offset = 0;
3399   int64 data = 0;
3400 
3401   logprintf(9, "Log: Starting Memory Region thread %d\n", thread_num_);
3402 
3403   while (IsReadyToRun()) {
3404     // Getting pages from SAT and queue.
3405     phase_ = kPhaseNoPhase;
3406     result = result && sat_->GetValid(&source_pe);
3407     if (!result) {
3408       logprintf(0, "Process Error: memory region thread failed to pop "
3409                 "pages from SAT, bailing\n");
3410       break;
3411     }
3412 
3413     result = result && pages_->PopRandom(&memregion_pe);
3414     if (!result) {
3415       logprintf(0, "Process Error: memory region thread failed to pop "
3416                 "pages from queue, bailing\n");
3417       break;
3418     }
3419 
3420     // Error injection for CRC copy.
3421     if ((sat_->error_injection() || error_injection_) && loops == 1) {
3422       addr = reinterpret_cast<int64*>(source_pe.addr);
3423       offset = random() % (sat_->page_length() / wordsize_);
3424       data = addr[offset];
3425       addr[offset] = error_constant;
3426     }
3427 
3428     // Copying SAT page into memory region.
3429     phase_ = kPhaseCopy;
3430     CrcCopyPage(&memregion_pe, &source_pe);
3431     memregion_pe.pattern = source_pe.pattern;
3432     memregion_pe.lastcpu = sched_getcpu();
3433 
3434     // Error injection for CRC Check.
3435     if ((sat_->error_injection() || error_injection_) && loops == 2) {
3436       addr = reinterpret_cast<int64*>(memregion_pe.addr);
3437       offset = random() % (sat_->page_length() / wordsize_);
3438       data = addr[offset];
3439       addr[offset] = error_constant;
3440     }
3441 
3442     // Checking page content in memory region.
3443     phase_ = kPhaseCheck;
3444     CrcCheckPage(&memregion_pe);
3445 
3446     phase_ = kPhaseNoPhase;
3447     // Storing pages on their proper queues.
3448     result = result && sat_->PutValid(&source_pe);
3449     if (!result) {
3450       logprintf(0, "Process Error: memory region thread failed to push "
3451                 "pages into SAT, bailing\n");
3452       break;
3453     }
3454     result = result && pages_->Push(&memregion_pe);
3455     if (!result) {
3456       logprintf(0, "Process Error: memory region thread failed to push "
3457                 "pages into queue, bailing\n");
3458       break;
3459     }
3460 
3461     if ((sat_->error_injection() || error_injection_) &&
3462         loops >= 1 && loops <= 2) {
3463       addr[offset] = data;
3464     }
3465 
3466     loops++;
3467     YieldSelf();
3468   }
3469 
3470   pages_copied_ = loops;
3471   status_ = result;
3472   logprintf(9, "Log: Completed %d: Memory Region thread. Status %d, %d "
3473             "pages checked\n", thread_num_, status_, pages_copied_);
3474   return result;
3475 }
3476 
3477 // The list of MSRs to read from each cpu.
3478 const CpuFreqThread::CpuRegisterType CpuFreqThread::kCpuRegisters[] = {
3479   { kMsrTscAddr, "TSC" },
3480   { kMsrAperfAddr, "APERF" },
3481   { kMsrMperfAddr, "MPERF" },
3482 };
3483 
CpuFreqThread(int num_cpus,int freq_threshold,int round)3484 CpuFreqThread::CpuFreqThread(int num_cpus, int freq_threshold, int round)
3485   : num_cpus_(num_cpus),
3486     freq_threshold_(freq_threshold),
3487     round_(round) {
3488   sat_assert(round >= 0);
3489   if (round == 0) {
3490     // If rounding is off, force rounding to the nearest MHz.
3491     round_ = 1;
3492     round_value_ = 0.5;
3493   } else {
3494     round_value_ = round/2.0;
3495   }
3496 }
3497 
~CpuFreqThread()3498 CpuFreqThread::~CpuFreqThread() {
3499 }
3500 
3501 // Compute the difference between the currently read MSR values and the
3502 // previously read values and store the results in delta. If any of the
3503 // values did not increase, or the TSC value is too small, returns false.
3504 // Otherwise, returns true.
ComputeDelta(CpuDataType * current,CpuDataType * previous,CpuDataType * delta)3505 bool CpuFreqThread::ComputeDelta(CpuDataType *current, CpuDataType *previous,
3506                                  CpuDataType *delta) {
3507   // Loop through the msrs.
3508   for (int msr = 0; msr < kMsrLast; msr++) {
3509     if (previous->msrs[msr] > current->msrs[msr]) {
3510       logprintf(0, "Log: Register %s went backwards 0x%llx to 0x%llx "
3511                 "skipping interval\n", kCpuRegisters[msr], previous->msrs[msr],
3512                 current->msrs[msr]);
3513       return false;
3514     } else {
3515       delta->msrs[msr] = current->msrs[msr] - previous->msrs[msr];
3516     }
3517   }
3518 
3519   // Check for TSC < 1 Mcycles over interval.
3520   if (delta->msrs[kMsrTsc] < (1000 * 1000)) {
3521     logprintf(0, "Log: Insanely slow TSC rate, TSC stops in idle?\n");
3522     return false;
3523   }
3524   timersub(&current->tv, &previous->tv, &delta->tv);
3525 
3526   return true;
3527 }
3528 
3529 // Compute the change in values of the MSRs between current and previous,
3530 // set the frequency in MHz of the cpu. If there is an error computing
3531 // the delta, return false. Othewise, return true.
ComputeFrequency(CpuDataType * current,CpuDataType * previous,int * freq)3532 bool CpuFreqThread::ComputeFrequency(CpuDataType *current,
3533                                      CpuDataType *previous, int *freq) {
3534   CpuDataType delta;
3535   if (!ComputeDelta(current, previous, &delta)) {
3536     return false;
3537   }
3538 
3539   double interval = delta.tv.tv_sec + delta.tv.tv_usec / 1000000.0;
3540   double frequency = 1.0 * delta.msrs[kMsrTsc] / 1000000
3541                      * delta.msrs[kMsrAperf] / delta.msrs[kMsrMperf] / interval;
3542 
3543   // Use the rounding value to round up properly.
3544   int computed = static_cast<int>(frequency + round_value_);
3545   *freq = computed - (computed % round_);
3546   return true;
3547 }
3548 
3549 // This is the task function that the thread executes.
Work()3550 bool CpuFreqThread::Work() {
3551   cpu_set_t cpuset;
3552   if (!AvailableCpus(&cpuset)) {
3553     logprintf(0, "Process Error: Cannot get information about the cpus.\n");
3554     return false;
3555   }
3556 
3557   // Start off indicating the test is passing.
3558   status_ = true;
3559 
3560   int curr = 0;
3561   int prev = 1;
3562   uint32 num_intervals = 0;
3563   bool paused = false;
3564   bool valid;
3565   bool pass = true;
3566 
3567   vector<CpuDataType> data[2];
3568   data[0].resize(num_cpus_);
3569   data[1].resize(num_cpus_);
3570   while (IsReadyToRun(&paused)) {
3571     if (paused) {
3572       // Reset the intervals and restart logic after the pause.
3573       num_intervals = 0;
3574     }
3575     if (num_intervals == 0) {
3576       // If this is the first interval, then always wait a bit before
3577       // starting to collect data.
3578       sat_sleep(kStartupDelay);
3579     }
3580 
3581     // Get the per cpu counters.
3582     valid = true;
3583     for (int cpu = 0; cpu < num_cpus_; cpu++) {
3584       if (CPU_ISSET(cpu, &cpuset)) {
3585         if (!GetMsrs(cpu, &data[curr][cpu])) {
3586           logprintf(0, "Failed to get msrs on cpu %d.\n", cpu);
3587           valid = false;
3588           break;
3589         }
3590       }
3591     }
3592     if (!valid) {
3593       // Reset the number of collected intervals since something bad happened.
3594       num_intervals = 0;
3595       continue;
3596     }
3597 
3598     num_intervals++;
3599 
3600     // Only compute a delta when we have at least two intervals worth of data.
3601     if (num_intervals > 2) {
3602       for (int cpu = 0; cpu < num_cpus_; cpu++) {
3603         if (CPU_ISSET(cpu, &cpuset)) {
3604           int freq;
3605           if (!ComputeFrequency(&data[curr][cpu], &data[prev][cpu],
3606                                 &freq)) {
3607             // Reset the number of collected intervals since an unknown
3608             // error occurred.
3609             logprintf(0, "Log: Cannot get frequency of cpu %d.\n", cpu);
3610             num_intervals = 0;
3611             break;
3612           }
3613           logprintf(15, "Cpu %d Freq %d\n", cpu, freq);
3614           if (freq < freq_threshold_) {
3615             errorcount_++;
3616             pass = false;
3617             logprintf(0, "Log: Cpu %d frequency is too low, frequency %d MHz "
3618                       "threshold %d MHz.\n", cpu, freq, freq_threshold_);
3619           }
3620         }
3621       }
3622     }
3623 
3624     sat_sleep(kIntervalPause);
3625 
3626     // Swap the values in curr and prev (these values flip between 0 and 1).
3627     curr ^= 1;
3628     prev ^= 1;
3629   }
3630 
3631   return pass;
3632 }
3633 
3634 
3635 // Get the MSR values for this particular cpu and save them in data. If
3636 // any error is encountered, returns false. Otherwise, returns true.
GetMsrs(int cpu,CpuDataType * data)3637 bool CpuFreqThread::GetMsrs(int cpu, CpuDataType *data) {
3638   for (int msr = 0; msr < kMsrLast; msr++) {
3639     if (!os_->ReadMSR(cpu, kCpuRegisters[msr].msr, &data->msrs[msr])) {
3640       return false;
3641     }
3642   }
3643   // Save the time at which we acquired these values.
3644   gettimeofday(&data->tv, NULL);
3645 
3646   return true;
3647 }
3648 
3649 // Returns true if this test can run on the current machine. Otherwise,
3650 // returns false.
CanRun()3651 bool CpuFreqThread::CanRun() {
3652 #if defined(STRESSAPPTEST_CPU_X86_64) || defined(STRESSAPPTEST_CPU_I686)
3653   unsigned int eax, ebx, ecx, edx;
3654 
3655   // Check that the TSC feature is supported.
3656   // This check is valid for both Intel and AMD.
3657   eax = 1;
3658   cpuid(&eax, &ebx, &ecx, &edx);
3659   if (!(edx & (1 << 5))) {
3660     logprintf(0, "Process Error: No TSC support.\n");
3661     return false;
3662   }
3663 
3664   // Check the highest extended function level supported.
3665   // This check is valid for both Intel and AMD.
3666   eax = 0x80000000;
3667   cpuid(&eax, &ebx, &ecx, &edx);
3668   if (eax < 0x80000007) {
3669     logprintf(0, "Process Error: No invariant TSC support.\n");
3670     return false;
3671   }
3672 
3673   // Non-Stop TSC is advertised by CPUID.EAX=0x80000007: EDX.bit8
3674   // This check is valid for both Intel and AMD.
3675   eax = 0x80000007;
3676   cpuid(&eax, &ebx, &ecx, &edx);
3677   if ((edx & (1 << 8)) == 0) {
3678     logprintf(0, "Process Error: No non-stop TSC support.\n");
3679     return false;
3680   }
3681 
3682   // APERF/MPERF is advertised by CPUID.EAX=0x6: ECX.bit0
3683   // This check is valid for both Intel and AMD.
3684   eax = 0x6;
3685   cpuid(&eax, &ebx, &ecx, &edx);
3686   if ((ecx & 1) == 0) {
3687     logprintf(0, "Process Error: No APERF MSR support.\n");
3688     return false;
3689   }
3690   return true;
3691 #else
3692   logprintf(0, "Process Error: "
3693                "cpu_freq_test is only supported on X86 processors.\n");
3694   return false;
3695 #endif
3696 }
3697