1 // Copyright 2006 Google Inc. All Rights Reserved.
2 
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 // worker.h : worker thread interface
16 
17 // This file contains the Worker Thread class interface
18 // for the SAT test. Worker Threads implement a repetative
19 // task used to test or stress the system.
20 
21 #ifndef STRESSAPPTEST_WORKER_H_
22 #define STRESSAPPTEST_WORKER_H_
23 
24 #include <pthread.h>
25 
26 #include <sys/time.h>
27 #include <sys/types.h>
28 
29 #ifdef HAVE_LIBAIO_H
30 #include <libaio.h>
31 #endif
32 
33 #include <queue>
34 #include <set>
35 #include <string>
36 #include <vector>
37 
38 // This file must work with autoconf on its public version,
39 // so these includes are correct.
40 #include "disk_blocks.h"
41 #include "queue.h"
42 #include "sattypes.h"
43 
44 
45 // Global Datastruture shared by the Cache Coherency Worker Threads.
46 struct cc_cacheline_data {
47   char *num;
48 };
49 
50 // Typical usage:
51 // (Other workflows may be possible, see function comments for details.)
52 // - Control thread creates object.
53 // - Control thread calls AddWorkers(1) for each worker thread.
54 // - Control thread calls Initialize().
55 // - Control thread launches worker threads.
56 // - Every worker thread frequently calls ContinueRunning().
57 // - Control thread periodically calls PauseWorkers(), effectively sleeps, and
58 //     then calls ResumeWorkers().
59 // - Some worker threads may exit early, before StopWorkers() is called.  They
60 //     call RemoveSelf() after their last call to ContinueRunning().
61 // - Control thread eventually calls StopWorkers().
62 // - Worker threads exit.
63 // - Control thread joins worker threads.
64 // - Control thread calls Destroy().
65 // - Control thread destroys object.
66 //
67 // Threadsafety:
68 // - ContinueRunning() may be called concurrently by different workers, but not
69 //     by a single worker.
70 // - No other methods may ever be called concurrently, with themselves or
71 //     eachother.
72 // - This object may be used by multiple threads only between Initialize() and
73 //     Destroy().
74 //
75 // TODO(matthewb): Move this class and its unittest to their own files.
76 class WorkerStatus {
77  public:
78   //--------------------------------
79   // Methods for the control thread.
80   //--------------------------------
81 
WorkerStatus()82   WorkerStatus() : num_workers_(0), status_(RUN) {}
83 
84   // Called by the control thread to increase the worker count.  Must be called
85   // before Initialize().  The worker count is 0 upon object initialization.
AddWorkers(int num_new_workers)86   void AddWorkers(int num_new_workers) {
87     // No need to lock num_workers_mutex_ because this is before Initialize().
88     num_workers_ += num_new_workers;
89   }
90 
91   // Called by the control thread.  May not be called multiple times.  If
92   // called, Destroy() must be called before destruction.
93   void Initialize();
94 
95   // Called by the control thread after joining all worker threads.  Must be
96   // called iff Initialize() was called.  No methods may be called after calling
97   // this.
98   void Destroy();
99 
100   // Called by the control thread to tell the workers to pause.  Does not return
101   // until all workers have called ContinueRunning() or RemoveSelf().  May only
102   // be called between Initialize() and Stop().  Must not be called multiple
103   // times without ResumeWorkers() having been called inbetween.
104   void PauseWorkers();
105 
106   // Called by the control thread to tell the workers to resume from a pause.
107   // May only be called between Initialize() and Stop().  May only be called
108   // directly after PauseWorkers().
109   void ResumeWorkers();
110 
111   // Called by the control thread to tell the workers to stop.  May only be
112   // called between Initialize() and Destroy().  May only be called once.
113   void StopWorkers();
114 
115   //--------------------------------
116   // Methods for the worker threads.
117   //--------------------------------
118 
119   // Called by worker threads to decrease the worker count by one.  May only be
120   // called between Initialize() and Destroy().  May wait for ResumeWorkers()
121   // when called after PauseWorkers().
122   void RemoveSelf();
123 
124   // Called by worker threads between Initialize() and Destroy().  May be called
125   // any number of times.  Return value is whether or not the worker should
126   // continue running.  When called after PauseWorkers(), does not return until
127   // ResumeWorkers() or StopWorkers() has been called.  Number of distinct
128   // calling threads must match the worker count (see AddWorkers() and
129   // RemoveSelf()).
130   bool ContinueRunning(bool *paused);
131 
132   // This is a hack!  It's like ContinueRunning(), except it won't pause.  If
133   // any worker threads use this exclusively in place of ContinueRunning() then
134   // PauseWorkers() should never be used!
135   bool ContinueRunningNoPause();
136 
137  private:
138   enum Status { RUN, PAUSE, STOP };
139 
WaitOnPauseBarrier()140   void WaitOnPauseBarrier() {
141 #ifdef HAVE_PTHREAD_BARRIERS
142     pthread_rwlock_rdlock(&pause_rwlock_);
143     int error = pthread_barrier_wait(&pause_barrier_);
144     if (error != PTHREAD_BARRIER_SERIAL_THREAD)
145       sat_assert(error == 0);
146     pthread_rwlock_unlock(&pause_rwlock_);
147 #endif
148   }
149 
AcquireNumWorkersLock()150   void AcquireNumWorkersLock() {
151     sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_));
152   }
153 
ReleaseNumWorkersLock()154   void ReleaseNumWorkersLock() {
155     sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_));
156   }
157 
AcquireStatusReadLock()158   void AcquireStatusReadLock() {
159     sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_));
160   }
161 
AcquireStatusWriteLock()162   void AcquireStatusWriteLock() {
163     sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_));
164   }
165 
ReleaseStatusLock()166   void ReleaseStatusLock() {
167     sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_));
168   }
169 
GetStatus()170   Status GetStatus() {
171     AcquireStatusReadLock();
172     Status status = status_;
173     ReleaseStatusLock();
174     return status;
175   }
176 
177   // Returns the previous status.
SetStatus(Status status)178   Status SetStatus(Status status) {
179     AcquireStatusWriteLock();
180     Status prev_status = status_;
181     status_ = status;
182     ReleaseStatusLock();
183     return prev_status;
184   }
185 
186   pthread_mutex_t num_workers_mutex_;
187   int num_workers_;
188 
189   pthread_rwlock_t status_rwlock_;
190   Status status_;
191 
192 #ifdef HAVE_PTHREAD_BARRIERS
193   pthread_barrier_t pause_barrier_;
194   pthread_rwlock_t pause_rwlock_;  // Guards pause_barrier_
195 #endif
196 
197   DISALLOW_COPY_AND_ASSIGN(WorkerStatus);
198 };
199 
200 
201 // This is a base class for worker threads.
202 // Each thread repeats a specific
203 // task on various blocks of memory.
204 class WorkerThread {
205  public:
206   // Enum to mark a thread as low/med/high priority.
207   enum Priority {
208     Low,
209     Normal,
210     High,
211   };
212   WorkerThread();
213   virtual ~WorkerThread();
214 
215   // Initialize values and thread ID number.
216   virtual void InitThread(int thread_num_init,
217                           class Sat *sat_init,
218                           class OsLayer *os_init,
219                           class PatternList *patternlist_init,
220                           WorkerStatus *worker_status);
221 
222   // This function is DEPRECATED, it does nothing.
SetPriority(Priority priority)223   void SetPriority(Priority priority) { priority_ = priority; }
224   // Spawn the worker thread, by running Work().
225   int SpawnThread();
226   // Only for ThreadSpawnerGeneric().
227   void StartRoutine();
228   bool InitPriority();
229 
230   // Wait for the thread to complete its cleanup.
231   virtual bool JoinThread();
232   // Kill worker thread with SIGINT.
233   virtual bool KillThread();
234 
235   // This is the task function that the thread executes.
236   // This is implemented per subclass.
237   virtual bool Work();
238 
239   // Starts per-WorkerThread timer.
StartThreadTimer()240   void StartThreadTimer() {start_time_ = sat_get_time_us();}
241   // Reads current timer value and returns run duration without recording it.
ReadThreadTimer()242   int64 ReadThreadTimer() {
243     int64 end_time_ = sat_get_time_us();
244     return end_time_ - start_time_;
245   }
246   // Stops per-WorkerThread timer and records thread run duration.
247   // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer
248   // is effectively paused and restarted, so runduration_usec accumulates on.
StopThreadTimer()249   void StopThreadTimer() {
250     runduration_usec_ += ReadThreadTimer();
251   }
252 
253   // Acccess member variables.
GetStatus()254   bool GetStatus() {return status_;}
GetErrorCount()255   int64 GetErrorCount() {return errorcount_;}
GetPageCount()256   int64 GetPageCount() {return pages_copied_;}
GetRunDurationUSec()257   int64 GetRunDurationUSec() {return runduration_usec_;}
258 
259   // Returns bandwidth defined as pages_copied / thread_run_durations.
260   virtual float GetCopiedData();
261   // Calculate worker thread specific copied data.
GetMemoryCopiedData()262   virtual float GetMemoryCopiedData() {return 0;}
GetDeviceCopiedData()263   virtual float GetDeviceCopiedData() {return 0;}
264   // Calculate worker thread specific bandwidth.
GetMemoryBandwidth()265   virtual float GetMemoryBandwidth()
266     {return GetMemoryCopiedData() / (
267         runduration_usec_ * 1.0 / 1000000.);}
GetDeviceBandwidth()268   virtual float GetDeviceBandwidth()
269     {return GetDeviceCopiedData() / (
270         runduration_usec_ * 1.0 / 1000000.);}
271 
set_cpu_mask(cpu_set_t * mask)272   void set_cpu_mask(cpu_set_t *mask) {
273     memcpy(&cpu_mask_, mask, sizeof(*mask));
274   }
275 
set_cpu_mask_to_cpu(int cpu_num)276   void set_cpu_mask_to_cpu(int cpu_num) {
277     cpuset_set_ab(&cpu_mask_, cpu_num, cpu_num + 1);
278   }
279 
set_tag(int32 tag)280   void set_tag(int32 tag) {tag_ = tag;}
281 
282   // Returns CPU mask, where each bit represents a logical cpu.
283   bool AvailableCpus(cpu_set_t *cpuset);
284   // Returns CPU mask of CPUs this thread is bound to,
285   bool CurrentCpus(cpu_set_t *cpuset);
286   // Returns Current Cpus mask as string.
CurrentCpusFormat()287   string CurrentCpusFormat() {
288     cpu_set_t current_cpus;
289     CurrentCpus(&current_cpus);
290     return cpuset_format(&current_cpus);
291   }
292 
ThreadID()293   int ThreadID() {return thread_num_;}
294 
295   // Bind worker thread to specified CPU(s)
296   bool BindToCpus(const cpu_set_t *cpuset);
297 
298  protected:
299   // This function dictates whether the main work loop
300   // continues, waits, or terminates.
301   // All work loops should be of the form:
302   //   do {
303   //     // work.
304   //   } while (IsReadyToRun());
305   virtual bool IsReadyToRun(bool *paused = NULL) {
306     return worker_status_->ContinueRunning(paused);
307   }
308 
309   // Like IsReadyToRun(), except it won't pause.
IsReadyToRunNoPause()310   virtual bool IsReadyToRunNoPause() {
311     return worker_status_->ContinueRunningNoPause();
312   }
313 
314   // These are functions used by the various work loops.
315   // Pretty print and log a data miscompare.
316   virtual void ProcessError(struct ErrorRecord *er,
317                             int priority,
318                             const char *message);
319 
320   // Compare a region of memory with a known data patter, and report errors.
321   virtual int CheckRegion(void *addr,
322                           class Pattern *pat,
323                           uint32 lastcpu,
324                           int64 length,
325                           int offset,
326                           int64 patternoffset);
327 
328   // Fast compare a block of memory.
329   virtual int CrcCheckPage(struct page_entry *srcpe);
330 
331   // Fast copy a block of memory, while verifying correctness.
332   virtual int CrcCopyPage(struct page_entry *dstpe,
333                           struct page_entry *srcpe);
334 
335   // Fast copy a block of memory, while verifying correctness, and heating CPU.
336   virtual int CrcWarmCopyPage(struct page_entry *dstpe,
337                               struct page_entry *srcpe);
338 
339   // Fill a page with its specified pattern.
340   virtual bool FillPage(struct page_entry *pe);
341 
342   // Copy with address tagging.
343   virtual bool AdlerAddrMemcpyC(uint64 *dstmem64,
344                                 uint64 *srcmem64,
345                                 unsigned int size_in_bytes,
346                                 AdlerChecksum *checksum,
347                                 struct page_entry *pe);
348   // SSE copy with address tagging.
349   virtual bool AdlerAddrMemcpyWarm(uint64 *dstmem64,
350                                    uint64 *srcmem64,
351                                    unsigned int size_in_bytes,
352                                    AdlerChecksum *checksum,
353                                    struct page_entry *pe);
354   // Crc data with address tagging.
355   virtual bool AdlerAddrCrcC(uint64 *srcmem64,
356                              unsigned int size_in_bytes,
357                              AdlerChecksum *checksum,
358                              struct page_entry *pe);
359   // Setup tagging on an existing page.
360   virtual bool TagAddrC(uint64 *memwords,
361                         unsigned int size_in_bytes);
362   // Report a mistagged cacheline.
363   virtual bool ReportTagError(uint64 *mem64,
364                       uint64 actual,
365                       uint64 tag);
366   // Print out the error record of the tag mismatch.
367   virtual void ProcessTagError(struct ErrorRecord *error,
368                        int priority,
369                        const char *message);
370 
371   // A worker thread can yield itself to give up CPU until it's scheduled again
372   bool YieldSelf();
373 
374  protected:
375   // General state variables that all subclasses need.
376   int thread_num_;                  // Thread ID.
377   volatile bool status_;            // Error status.
378   volatile int64 pages_copied_;     // Recorded for memory bandwidth calc.
379   volatile int64 errorcount_;       // Miscompares seen by this thread.
380 
381   cpu_set_t cpu_mask_;              // Cores this thread is allowed to run on.
382   volatile uint32 tag_;             // Tag hint for memory this thread can use.
383 
384   bool tag_mode_;                   // Tag cachelines with vaddr.
385 
386   // Thread timing variables.
387   int64 start_time_;                 // Worker thread start time.
388   volatile int64 runduration_usec_;  // Worker run duration in u-seconds.
389 
390   // Function passed to pthread_create.
391   void *(*thread_spawner_)(void *args);
392   pthread_t thread_;                // Pthread thread ID.
393   Priority priority_;               // Worker thread priority.
394   class Sat *sat_;                  // Reference to parent stest object.
395   class OsLayer *os_;               // Os abstraction: put hacks here.
396   class PatternList *patternlist_;  // Reference to data patterns.
397 
398   // Work around style guide ban on sizeof(int).
399   static const uint64 iamint_ = 0;
400   static const int wordsize_ = sizeof(iamint_);
401 
402  private:
403   WorkerStatus *worker_status_;
404 
405   DISALLOW_COPY_AND_ASSIGN(WorkerThread);
406 };
407 
408 // Worker thread to perform File IO.
409 class FileThread : public WorkerThread {
410  public:
411   FileThread();
412   // Set filename to use for file IO.
413   virtual void SetFile(const char *filename_init);
414   virtual bool Work();
415 
416   // Calculate worker thread specific bandwidth.
GetDeviceCopiedData()417   virtual float GetDeviceCopiedData()
418     {return GetCopiedData()*2;}
419   virtual float GetMemoryCopiedData();
420 
421  protected:
422   // Record of where these pages were sourced from, and what
423   // potentially broken components they passed through.
424   struct PageRec {
425      class Pattern *pattern;  // This is the data it should contain.
426      void *src;  // This is the memory location the data was sourced from.
427      void *dst;  // This is where it ended up.
428   };
429 
430   // These are functions used by the various work loops.
431   // Pretty print and log a data miscompare. Disks require
432   // slightly different error handling.
433   virtual void ProcessError(struct ErrorRecord *er,
434                             int priority,
435                             const char *message);
436 
437   virtual bool OpenFile(int *pfile);
438   virtual bool CloseFile(int fd);
439 
440   // Read and write whole file to disk.
441   virtual bool WritePages(int fd);
442   virtual bool ReadPages(int fd);
443 
444   // Read and write pages to disk.
445   virtual bool WritePageToFile(int fd, struct page_entry *src);
446   virtual bool ReadPageFromFile(int fd, struct page_entry *dst);
447 
448   // Sector tagging support.
449   virtual bool SectorTagPage(struct page_entry *src, int block);
450   virtual bool SectorValidatePage(const struct PageRec &page,
451                                   struct page_entry *dst,
452                                   int block);
453 
454   // Get memory for an incoming data transfer..
455   virtual bool PagePrepare();
456   // Remove memory allocated for data transfer.
457   virtual bool PageTeardown();
458 
459   // Get memory for an incoming data transfer..
460   virtual bool GetEmptyPage(struct page_entry *dst);
461   // Get memory for an outgoing data transfer..
462   virtual bool GetValidPage(struct page_entry *dst);
463   // Throw out a used empty page.
464   virtual bool PutEmptyPage(struct page_entry *src);
465   // Throw out a used, filled page.
466   virtual bool PutValidPage(struct page_entry *src);
467 
468 
469   struct PageRec *page_recs_;          // Array of page records.
470   int crc_page_;                        // Page currently being CRC checked.
471   string filename_;                     // Name of file to access.
472   string devicename_;                   // Name of device file is on.
473 
474   bool page_io_;                        // Use page pool for IO.
475   void *local_page_;                   // malloc'd page fon non-pool IO.
476   int pass_;                            // Number of writes to the file so far.
477 
478   // Tag to detect file corruption.
479   struct SectorTag {
480     volatile uint8 magic;
481     volatile uint8 block;
482     volatile uint8 sector;
483     volatile uint8 pass;
484     char pad[512-4];
485   };
486 
487   DISALLOW_COPY_AND_ASSIGN(FileThread);
488 };
489 
490 
491 // Worker thread to perform Network IO.
492 class NetworkThread : public WorkerThread {
493  public:
494   NetworkThread();
495   // Set hostname to use for net IO.
496   virtual void SetIP(const char *ipaddr_init);
497   virtual bool Work();
498 
499   // Calculate worker thread specific bandwidth.
GetDeviceCopiedData()500   virtual float GetDeviceCopiedData()
501     {return GetCopiedData()*2;}
502 
503  protected:
504   // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override.
505   virtual bool IsNetworkStopSet();
506   virtual bool CreateSocket(int *psocket);
507   virtual bool CloseSocket(int sock);
508   virtual bool Connect(int sock);
509   virtual bool SendPage(int sock, struct page_entry *src);
510   virtual bool ReceivePage(int sock, struct page_entry *dst);
511   char ipaddr_[256];
512   int sock_;
513 
514  private:
515   DISALLOW_COPY_AND_ASSIGN(NetworkThread);
516 };
517 
518 // Worker thread to reflect Network IO.
519 class NetworkSlaveThread : public NetworkThread {
520  public:
521   NetworkSlaveThread();
522   // Set socket for IO.
523   virtual void SetSock(int sock);
524   virtual bool Work();
525 
526  protected:
527   virtual bool IsNetworkStopSet();
528 
529  private:
530   DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread);
531 };
532 
533 // Worker thread to detect incoming Network IO.
534 class NetworkListenThread : public NetworkThread {
535  public:
536   NetworkListenThread();
537   virtual bool Work();
538 
539  private:
540   virtual bool Listen();
541   virtual bool Wait();
542   virtual bool GetConnection(int *pnewsock);
543   virtual bool SpawnSlave(int newsock, int threadid);
544   virtual bool ReapSlaves();
545 
546   // For serviced incoming connections.
547   struct ChildWorker {
548     WorkerStatus status;
549     NetworkSlaveThread thread;
550   };
551   typedef vector<ChildWorker*> ChildVector;
552   ChildVector child_workers_;
553 
554   DISALLOW_COPY_AND_ASSIGN(NetworkListenThread);
555 };
556 
557 // Worker thread to perform Memory Copy.
558 class CopyThread : public WorkerThread {
559  public:
CopyThread()560   CopyThread() {}
561   virtual bool Work();
562   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()563   virtual float GetMemoryCopiedData()
564     {return GetCopiedData()*2;}
565 
566  private:
567   DISALLOW_COPY_AND_ASSIGN(CopyThread);
568 };
569 
570 // Worker thread to perform Memory Invert.
571 class InvertThread : public WorkerThread {
572  public:
InvertThread()573   InvertThread() {}
574   virtual bool Work();
575   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()576   virtual float GetMemoryCopiedData()
577     {return GetCopiedData()*4;}
578 
579  private:
580   virtual int InvertPageUp(struct page_entry *srcpe);
581   virtual int InvertPageDown(struct page_entry *srcpe);
582   DISALLOW_COPY_AND_ASSIGN(InvertThread);
583 };
584 
585 // Worker thread to fill blank pages on startup.
586 class FillThread : public WorkerThread {
587  public:
588   FillThread();
589   // Set how many pages this thread should fill before exiting.
590   virtual void SetFillPages(int64 num_pages_to_fill_init);
591   virtual bool Work();
592 
593  private:
594   // Fill a page with the data pattern in pe->pattern.
595   virtual bool FillPageRandom(struct page_entry *pe);
596   int64 num_pages_to_fill_;
597   DISALLOW_COPY_AND_ASSIGN(FillThread);
598 };
599 
600 // Worker thread to verify page data matches pattern data.
601 // Thread will check and replace pages until "done" flag is set,
602 // then it will check and discard pages until no more remain.
603 class CheckThread : public WorkerThread {
604  public:
CheckThread()605   CheckThread() {}
606   virtual bool Work();
607   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()608   virtual float GetMemoryCopiedData()
609     {return GetCopiedData();}
610 
611  private:
612   DISALLOW_COPY_AND_ASSIGN(CheckThread);
613 };
614 
615 
616 // Worker thread to poll for system error messages.
617 // Thread will check for messages until "done" flag is set.
618 class ErrorPollThread : public WorkerThread {
619  public:
ErrorPollThread()620   ErrorPollThread() {}
621   virtual bool Work();
622 
623  private:
624   DISALLOW_COPY_AND_ASSIGN(ErrorPollThread);
625 };
626 
627 // Computation intensive worker thread to stress CPU.
628 class CpuStressThread : public WorkerThread {
629  public:
CpuStressThread()630   CpuStressThread() {}
631   virtual bool Work();
632 
633  private:
634   DISALLOW_COPY_AND_ASSIGN(CpuStressThread);
635 };
636 
637 // Worker thread that tests the correctness of the
638 // CPU Cache Coherency Protocol.
639 class CpuCacheCoherencyThread : public WorkerThread {
640  public:
641   CpuCacheCoherencyThread(cc_cacheline_data *cc_data,
642                           int cc_cacheline_count_,
643                           int cc_thread_num_,
644                           int cc_thread_count_,
645                           int cc_inc_count_);
646   virtual bool Work();
647 
648  protected:
649   // Used by the simple random number generator as a shift feedback;
650   // this polynomial (x^64 + x^63 + x^61 + x^60 + 1) will produce a
651   // psuedorandom cycle of period 2^64-1.
652   static const uint64 kRandomPolynomial = 0xD800000000000000ULL;
653   // A very simple psuedorandom generator that can be inlined and use
654   // registers, to keep the CC test loop tight and focused.
655   static uint64 SimpleRandom(uint64 seed);
656 
657   cc_cacheline_data *cc_cacheline_data_;  // Datstructure for each cacheline.
658   int cc_local_num_;        // Local counter for each thread.
659   int cc_cacheline_count_;  // Number of cache lines to operate on.
660   int cc_thread_num_;       // The integer id of the thread which is
661                             // used as an index into the integer array
662                             // of the cacheline datastructure.
663   int cc_thread_count_;     // Total number of threads being run, for
664                             // calculations mixing up cache line access.
665   int cc_inc_count_;        // Number of times to increment the counter.
666 
667  private:
668   DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread);
669 };
670 
671 // Worker thread to perform disk test.
672 class DiskThread : public WorkerThread {
673  public:
674   explicit DiskThread(DiskBlockTable *block_table);
675   virtual ~DiskThread();
676   // Calculate disk thread specific bandwidth.
GetDeviceCopiedData()677   virtual float GetDeviceCopiedData() {
678     return (blocks_written_ * write_block_size_ +
679             blocks_read_ * read_block_size_) / kMegabyte;}
680 
681   // Set filename for device file (in /dev).
682   virtual void SetDevice(const char *device_name);
683   // Set various parameters that control the behaviour of the test.
684   virtual bool SetParameters(int read_block_size,
685                              int write_block_size,
686                              int64 segment_size,
687                              int64 cache_size,
688                              int blocks_per_segment,
689                              int64 read_threshold,
690                              int64 write_threshold,
691                              int non_destructive);
692 
693   virtual bool Work();
694 
GetMemoryCopiedData()695   virtual float GetMemoryCopiedData() {return 0;}
696 
697  protected:
698   static const int kSectorSize = 512;       // Size of sector on disk.
699   static const int kBufferAlignment = 512;  // Buffer alignment required by the
700                                             // kernel.
701   static const int kBlockRetry = 100;       // Number of retries to allocate
702                                             // sectors.
703 
704   enum IoOp {
705     ASYNC_IO_READ   = 0,
706     ASYNC_IO_WRITE  = 1
707   };
708 
709   virtual bool OpenDevice(int *pfile);
710   virtual bool CloseDevice(int fd);
711 
712   // Retrieves the size (in bytes) of the disk/file.
713   virtual bool GetDiskSize(int fd);
714 
715   // Retrieves the current time in microseconds.
716   virtual int64 GetTime();
717 
718   // Do an asynchronous disk I/O operation.
719   virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
720                            int64 offset, int64 timeout);
721 
722   // Write a block to disk.
723   virtual bool WriteBlockToDisk(int fd, BlockData *block);
724 
725   // Verify a block on disk.
726   virtual bool ValidateBlockOnDisk(int fd, BlockData *block);
727 
728   // Main work loop.
729   virtual bool DoWork(int fd);
730 
731   int read_block_size_;       // Size of blocks read from disk, in bytes.
732   int write_block_size_;      // Size of blocks written to disk, in bytes.
733   int64 blocks_read_;         // Number of blocks read in work loop.
734   int64 blocks_written_;      // Number of blocks written in work loop.
735   int64 segment_size_;        // Size of disk segments (in bytes) that the disk
736                               // will be split into where testing can be
737                               // confined to a particular segment.
738                               // Allows for control of how evenly the disk will
739                               // be tested.  Smaller segments imply more even
740                               // testing (less random).
741   int blocks_per_segment_;    // Number of blocks that will be tested per
742                               // segment.
743   int cache_size_;            // Size of disk cache, in bytes.
744   int queue_size_;            // Length of in-flight-blocks queue, in blocks.
745   int non_destructive_;       // Use non-destructive mode or not.
746   int update_block_table_;    // If true, assume this is the thread
747                               // responsible for writing the data in the disk
748                               // for this block device and, therefore,
749                               // update the block table. If false, just use
750                               // the block table to get data.
751 
752   // read/write times threshold for reporting a problem
753   int64 read_threshold_;      // Maximum time a read should take (in us) before
754                               // a warning is given.
755   int64 write_threshold_;     // Maximum time a write should take (in us) before
756                               // a warning is given.
757   int64 read_timeout_;        // Maximum time a read can take before a timeout
758                               // and the aborting of the read operation.
759   int64 write_timeout_;       // Maximum time a write can take before a timeout
760                               // and the aborting of the write operation.
761 
762   string device_name_;        // Name of device file to access.
763   int64 device_sectors_;      // Number of sectors on the device.
764 
765   std::queue<BlockData*> in_flight_sectors_;   // Queue of sectors written but
766                                                 // not verified.
767   void *block_buffer_;        // Pointer to aligned block buffer.
768 
769 #ifdef HAVE_LIBAIO_H
770   io_context_t aio_ctx_;     // Asynchronous I/O context for Linux native AIO.
771 #endif
772 
773   DiskBlockTable *block_table_;  // Disk Block Table, shared by all disk
774                                  // threads that read / write at the same
775                                  // device
776 
777   DISALLOW_COPY_AND_ASSIGN(DiskThread);
778 };
779 
780 class RandomDiskThread : public DiskThread {
781  public:
782   explicit RandomDiskThread(DiskBlockTable *block_table);
783   virtual ~RandomDiskThread();
784   // Main work loop.
785   virtual bool DoWork(int fd);
786  protected:
787   DISALLOW_COPY_AND_ASSIGN(RandomDiskThread);
788 };
789 
790 // Worker thread to perform checks in a specific memory region.
791 class MemoryRegionThread : public WorkerThread {
792  public:
793   MemoryRegionThread();
794   ~MemoryRegionThread();
795   virtual bool Work();
796   void ProcessError(struct ErrorRecord *error, int priority,
797                     const char *message);
798   bool SetRegion(void *region, int64 size);
799   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()800   virtual float GetMemoryCopiedData()
801     {return GetCopiedData();}
GetDeviceCopiedData()802   virtual float GetDeviceCopiedData()
803     {return GetCopiedData() * 2;}
SetIdentifier(string identifier)804   void SetIdentifier(string identifier) {
805     identifier_ = identifier;
806   }
807 
808  protected:
809   // Page queue for this particular memory region.
810   char *region_;
811   PageEntryQueue *pages_;
812   bool error_injection_;
813   int phase_;
814   string identifier_;
815   static const int kPhaseNoPhase = 0;
816   static const int kPhaseCopy = 1;
817   static const int kPhaseCheck = 2;
818 
819  private:
820   DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread);
821 };
822 
823 // Worker thread to check that the frequency of every cpu does not go below a
824 // certain threshold.
825 class CpuFreqThread : public WorkerThread {
826  public:
827   CpuFreqThread(int num_cpus, int freq_threshold, int round);
828   ~CpuFreqThread();
829 
830   // This is the task function that the thread executes.
831   virtual bool Work();
832 
833   // Returns true if this test can run on the current machine. Otherwise,
834   // returns false.
835   static bool CanRun();
836 
837  private:
838   static const int kIntervalPause = 10;   // The number of seconds to pause
839                                           // between acquiring the MSR data.
840   static const int kStartupDelay = 5;     // The number of seconds to wait
841                                           // before acquiring MSR data.
842   static const int kMsrTscAddr = 0x10;    // The address of the TSC MSR.
843   static const int kMsrAperfAddr = 0xE8;  // The address of the APERF MSR.
844   static const int kMsrMperfAddr = 0xE7;  // The address of the MPERF MSR.
845 
846   // The index values into the CpuDataType.msr[] array.
847   enum MsrValues {
848     kMsrTsc = 0,           // MSR index 0 = TSC.
849     kMsrAperf = 1,         // MSR index 1 = APERF.
850     kMsrMperf = 2,         // MSR index 2 = MPERF.
851     kMsrLast,              // Last MSR index.
852   };
853 
854   typedef struct {
855     uint32 msr;         // The address of the MSR.
856     const char *name;   // A human readable string for the MSR.
857   } CpuRegisterType;
858 
859   typedef struct {
860     uint64 msrs[kMsrLast];  // The values of the MSRs.
861     struct timeval tv;      // The time at which the MSRs were read.
862   } CpuDataType;
863 
864   // The set of MSR addresses and register names.
865   static const CpuRegisterType kCpuRegisters[kMsrLast];
866 
867   // Compute the change in values of the MSRs between current and previous,
868   // set the frequency in MHz of the cpu. If there is an error computing
869   // the delta, return false. Othewise, return true.
870   bool ComputeFrequency(CpuDataType *current, CpuDataType *previous,
871                         int *frequency);
872 
873   // Get the MSR values for this particular cpu and save them in data. If
874   // any error is encountered, returns false. Otherwise, returns true.
875   bool GetMsrs(int cpu, CpuDataType *data);
876 
877   // Compute the difference between the currently read MSR values and the
878   // previously read values and store the results in delta. If any of the
879   // values did not increase, or the TSC value is too small, returns false.
880   // Otherwise, returns true.
881   bool ComputeDelta(CpuDataType *current, CpuDataType *previous,
882                     CpuDataType *delta);
883 
884   // The total number of cpus on the system.
885   int num_cpus_;
886 
887   // The minimum frequency that each cpu must operate at (in MHz).
888   int freq_threshold_;
889 
890   // The value to round the computed frequency to.
891   int round_;
892 
893   // Precomputed value to add to the frequency to do the rounding.
894   double round_value_;
895 
896   DISALLOW_COPY_AND_ASSIGN(CpuFreqThread);
897 };
898 
899 #endif  // STRESSAPPTEST_WORKER_H_
900