マルチスレッド(2)

並行システム

                               システム情報工学研究科コンピュータサイエンス専攻、電子・情報工学系
                               新城 靖
                               <yas@is.tsukuba.ac.jp>

このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/sie/
http://www.cs.tsukuba.ac.jp/~yas/

■今日の重要な話

教科書

■マスタ・スレーブ

◆基本

master()
{
	初期化;
	for( i=0 ; i<nthreads; i++ )
	{
	    pthread_create(,,slave,);
	}
	マスターの処理;/*必要なら*/
	for( i=0 ; i<nthreads; i++ )
	{
	    pthread_join(,,slave,);
	}
	終了処理;/*必要なら*/
}

slave(void *params)
{
	paramsからiを取り出す;
	仕事をする;
	結果を大域変数かparamsに返す;
	return; /* pthread_exit() */
}

◆マスタ・スレーブの例:行列の掛け算

C[M][N] = A[N][L] x B[L][M]

図? 行列の掛け算

[MatrixMultiplyMaster.java]
   1: /*
   2:  * MatrixMultiplyMaster.java -- The master of MatrixMultiplySlave threads.
   3:  */
   4: 
   5: class MatrixMultiplyMaster {
   6:     public static double matrix_multiply_trace( double a[][], double b[][],
   7:                                                 double c[][], int nthreads )
   8:         throws java.lang.InterruptedException
   9:     {
  10:         int i;
  11:         MatrixMultiplySlave slaves[] = new MatrixMultiplySlave[nthreads];
  12:         for( i=0; i<nthreads; i++ )
  13:         {
  14:             slaves[i] = new MatrixMultiplySlave( a,b,c,i,nthreads );
  15:             slaves[i].start();
  16:         }
  17:         double trace = 0.0;
  18:         for( i=0; i<nthreads; i++ )
  19:         {
  20:             slaves[i].join();
  21:             trace += slaves[i].get_trace();
  22:         }
  23:         return( trace );
  24:     }
  25: }
[MatrixMultiplySlave.java]
   1: /*
   2:  * MatrixMultiplySlave.java -- Slave threads for Matrix multiplication.
   3:  */
   4: 
   5: class MatrixMultiplySlave extends java.lang.Thread {
   6:     double[][] a, b, c;
   7:     int i,n ;
   8:     double trace_ans = 0.0 ;
   9:     public MatrixMultiplySlave( double a[][], double b[][], double c[][],
  10:                                 int i, int n )
  11:     {
  12:         this.a = a ; this.b = b ; this.c = c ;
  13:         this.i = i ; this.n = n ;
  14:         double trace = 0.0;
  15:     }
  16:     public void run()
  17:     {
  18:         int c_nrows = Matrix.nrows(c), c_ncols = Matrix.ncols(c), 
  19:             a_ncols = Matrix.ncols(a);
  20:         int quot = c_nrows /n;  int rem = c_nrows % n;
  21:         int do_rows = quot + (i < rem ? 1 : 0);
  22:         int first_row = quot * i + (i < rem ? i : rem);
  23:         double trace = 0.0;
  24:         
  25:         int row, col, j;
  26:         for( row = first_row; row < first_row + do_rows; row++ )
  27:         {
  28:             for( col=0; col < c_ncols; col++ )
  29:             {
  30:                 double sum = 0.0;
  31:                 for( j = 0; j < a_ncols; j++ )
  32:                 {
  33:                    sum += a[row][j] * b[j][col];
  34:                 }
  35:                 c[row][col] = sum;
  36:                 if( row == col ) 
  37:                 {
  38:                     trace += sum;
  39:                 }
  40:             }
  41:         }
  42:         trace_ans = trace;
  43:     }
  44:     public double get_trace()
  45:     {
  46:         return( trace_ans );
  47:     }
  48: }
[MatrixMultiplyDemo.java]
   1: /*
   2:  * MatrixMultiplyDemo.java -- run and measure execution time of MatrixMultiply. 
   3:  */
   4: 
   5: class MatrixMultiplyDemo {
   6:     public static void main(String argv[]) 
   7:         throws java.lang.InterruptedException
   8:     {
   9:         if( argv.length != 3 )
  10:         {
  11:             System.err.println("Usage:% java MatrixMultiplyDemo matrix-size nthreads niters");
  12:             System.err.println("Hints: matrix-size: 200..400, nthreads: 1..nCPU, niters: 1..100");
  13:             System.exit( 1 );
  14:         }
  15:         int matrix_size = Integer.parseInt( argv[0] );
  16:         int nthreads    = Integer.parseInt( argv[1] );
  17:         int niters      = Integer.parseInt( argv[2] );
  18:         run_matrix_multiply_trace( matrix_size, nthreads, niters );
  19:         System.exit( 0 );
  20:     }
  21:     static void run_matrix_multiply_trace( int matrix_size, int nthreads, 
  22:                                            int niters )
  23:         throws java.lang.InterruptedException
  24:     {
  25:         // pthread_setconcurrency( nthreads );
  26:         double a[][] = Matrix.alloc( matrix_size, matrix_size );
  27:         double b[][] = Matrix.alloc( matrix_size, matrix_size );
  28:         double c[][] = Matrix.alloc( matrix_size, matrix_size );
  29:         java.util.Random r = new java.util.Random();
  30:         Matrix.fill_rand( a,r );
  31:         Matrix.fill_rand( b,r );
  32:         Matrix.fill_rand( c,r );
  33:         long start = System.currentTimeMillis();
  34:             for( int i=0; i<niters; i++ )
  35:             {
  36:                 MatrixMultiplyMaster.matrix_multiply_trace( a,b,c,nthreads );
  37:             }
  38:         long end = System.currentTimeMillis();
  39:         double diff = (double)(end - start)/1000.0 ;
  40:         int concurrency = 0; // pthread_getconcurrency();
  41:         System.out.printf("matrix_size==%d, nthreads==%d, niters==%d, concurrency==%d\n",
  42:                matrix_size, nthreads, niters, concurrency );
  43:         System.out.printf("%6.3f [sec] / %d [iteration] == %6.3f [sec/iteration]\n",
  44:                diff, niters, diff/niters );
  45:         System.out.printf("%d [iteration] / %6.3f [sec] == %6.3f [iteration/sec]\n",
  46:                 niters, diff, niters/diff );
  47:     }
  48: }
[Matrix.java]
/*
 * Matrix.java -- Simple matrix with wo-dimensional array. 
 */

class Matrix {
    public static double[][] alloc( int rows, int cols )
    {
	double m[][];
	m = new double[rows][cols];
	return( m );
    }
    public static void fill_rand( double m[][], java.util.Random r )
    {
	int m_nrows = nrows( m );
	int m_ncols = ncols( m );
	for( int i=0; i<m_nrows; i++ )
	    for( int j=0; j<m_ncols; j++ )
		m[i][j] = r.nextDouble();
    }
    public static int nrows( double m[][] )
    {
	return( m.length );
    }
    public static int ncols( double m[][] )
    {
	return( m[0].length );
    }
}

◆実行例

SMP (4 CPU, SPARC, Solaris) での実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/MatrixMultiplyMaster.java [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/MatrixMultiplySlave.java [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/MatrixMultiplyDemo.java [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/Matrix.java [←]
% javac MatrixMultiplyMaster.java MatrixMultiplySlave.java MatrixMultiplyDemo.java Matrix.java [←]
% java -version [←]
java version "1.5.0_14"
Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_14-b03)
Java HotSpot(TM) Server VM (build 1.5.0_14-b03, mixed mode)
% java MatrixMultiplyDemo [←]
Usage:% java MatrixMultiplyDemo matrix-size nthreads niters
Hints: matrix-size: 200..400, nthreads: 1..nCPU, niters: 1..100
% java MatrixMultiplyDemo [←]
Usage:% java MatrixMultiplyDemo matrix-size nthreads niters
Hints: matrix-size: 200..400, nthreads: 1..nCPU, niters: 1..100
% java MatrixMultiplyDemo 300 1 1 [←]
matrix_size==300, nthreads==1, niters==1, concurrency==0
 3.586 [sec] / 1 [iteration] ==  3.586 [sec/iteration]
1 [iteration] /  3.586 [sec] ==  0.279 [iteration/sec]
% java MatrixMultiplyDemo 300 2 1 [←]
matrix_size==300, nthreads==2, niters==1, concurrency==0
 1.819 [sec] / 1 [iteration] ==  1.819 [sec/iteration]
1 [iteration] /  1.819 [sec] ==  0.550 [iteration/sec]
% java MatrixMultiplyDemo 300 3 1 [←]
matrix_size==300, nthreads==3, niters==1, concurrency==0
 1.229 [sec] / 1 [iteration] ==  1.229 [sec/iteration]
1 [iteration] /  1.229 [sec] ==  0.814 [iteration/sec]
% java MatrixMultiplyDemo 300 4 1 [←]
matrix_size==300, nthreads==4, niters==1, concurrency==0
 0.936 [sec] / 1 [iteration] ==  0.936 [sec/iteration]
1 [iteration] /  0.936 [sec] ==  1.068 [iteration/sec]
% java MatrixMultiplyDemo 300 5 1 [←]
matrix_size==300, nthreads==5, niters==1, concurrency==0
 1.061 [sec] / 1 [iteration] ==  1.061 [sec/iteration]
1 [iteration] /  1.061 [sec] ==  0.943 [iteration/sec]
% []

◆マスタ・スレーブの例:行列の掛け算

SMP (4 CPU, SPARC, Solaris) での実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/ms_matrix-main.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/ms_matrix.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/timeval.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/Makefile [←]
% emacs Makefile (CFLAGSの調整) [←]
% make ms_matrix-main [←]
gcc -D_REENTRANT ms_matrix-main.o -lpthread -lposix4 -o ms_matrix-main
% ./ms_matrix-main 400 1 1 [←]
matrix_size==400, nthreads==1, niters==1, concurrency==1
 1.846 [sec] / 1 [iteration] ==  1.846 [sec/iteration]
1 [iteration] /  1.846 [sec] ==  0.542 [iteration/sec]
% ./ms_matrix-main 400 1 1 [←]
matrix_size==400, nthreads==1, niters==1, concurrency==1
 1.842 [sec] / 1 [iteration] ==  1.842 [sec/iteration]
1 [iteration] /  1.842 [sec] ==  0.543 [iteration/sec]
% ./ms_matrix-main 400 2 1 [←]
matrix_size==400, nthreads==2, niters==1, concurrency==2
 0.924 [sec] / 1 [iteration] ==  0.924 [sec/iteration]
1 [iteration] /  0.924 [sec] ==  1.082 [iteration/sec]
% ./ms_matrix-main 400 3 1 [←]
matrix_size==400, nthreads==3, niters==1, concurrency==3
 0.657 [sec] / 1 [iteration] ==  0.657 [sec/iteration]
1 [iteration] /  0.657 [sec] ==  1.523 [iteration/sec]
% ./ms_matrix-main 400 4 1 [←]
matrix_size==400, nthreads==4, niters==1, concurrency==4
 0.467 [sec] / 1 [iteration] ==  0.467 [sec/iteration]
1 [iteration] /  0.467 [sec] ==  2.144 [iteration/sec]
% ./ms_matrix-main 400 5 1 [←]
matrix_size==400, nthreads==5, niters==1, concurrency==5
 0.728 [sec] / 1 [iteration] ==  0.728 [sec/iteration]
1 [iteration] /  0.728 [sec] ==  1.374 [iteration/sec]
% []

◆台数効果をうまく図る方法

参考

■マスタ・スレーブ(バリア付き)

◆基本

master()
{
	初期化;
	for( i=0 ; i<nthreads; i++ )
	{
	    pthread_create(,,slave,);
	}
	while( !termcond )
	{
	    マスターの処理A;/*必要なら*/
	    barrier_wait();
	    マスターの処理B;/*必要なら*/
	    barrier_wait();
	}
	for( i=0 ; i<nthreads; i++ )
	{
	    pthread_join(,,slave,);
	}
	終了処理;/*必要なら*/
}

slave(void *params)
{
	paramsからiを取り出す;
	while( !termcond )
	{
	    /* マスターの処理Aの完了を待つ */
	    barrier_wait();
	    スレーブ処理B;
	    結果を大域変数かparamsに返す;
	    barrier_wait();
	}
	return; /* pthread_exit() */
}
ループの間、マスタの仕事がないこともある。その場合は、マスタはバリアに は参加しない(relax.c参照)。

◆バリア同期の例:弛緩法

境界と初期値

図? 弛緩法

◆弛緩法(Java版)

[RelaxMaster.java]
   1: /*
   2:  * RelaxMaster.java -- The Master thread for the relax method.
   3:  */
   4: 
   5: import java.util.concurrent.CyclicBarrier;
   6: 
   7: class RelaxMaster {
   8:     RelaxSlave slaves[];
   9:     int nthreads;
  10:     public RelaxMaster( double boundary[][], double t[][], 
  11:                              final int nthreads)
  12:     {
  13:         this.nthreads = nthreads;
  14:         CyclicBarrier barrier = 
  15:             new CyclicBarrier( nthreads,
  16:                                new Runnable() {
  17:                                    public synchronized void run() {
  18:                                        all_done = n_done == nthreads;
  19:                                        n_done = 0;
  20:                                    }
  21:                                }
  22:                              );
  23:         double t_temp[][] = Matrix.alloc( Matrix.nrows(t), Matrix.ncols(t) );
  24:         slaves = new RelaxSlave[nthreads];
  25:         int i;
  26:         for( i=0; i<nthreads; i++ )
  27:         {
  28:             slaves[i] = new RelaxSlave( i,nthreads, boundary, t, t_temp,
  29:                                         barrier, this );
  30:         }
  31:     }
  32:     public void relax()
  33:         throws java.lang.InterruptedException
  34:     {
  35:         int i;
  36:         for( i=0; i<nthreads; i++ )
  37:         {
  38:             slaves[i].start();
  39:         }
  40:         for( i=0; i<nthreads; i++ )
  41:         {
  42:             slaves[i].join();
  43:         }
  44:     }
  45:     int n_done = 0;
  46:     boolean all_done = false;
  47:     public synchronized void i_am_done()
  48:     {
  49:         n_done ++;
  50:     }
  51:     public synchronized boolean all_done()
  52:     {
  53:         return( all_done );
  54:     }
  55: }
[RelaxSlave.java]
   1: /*
   2:  * RelaxSlave.java -- Slave threads for relax method
   3:  */
   4: 
   5: import java.util.concurrent.CyclicBarrier;
   6: 
   7: class RelaxSlave extends java.lang.Thread {
   8:     double[][] boundary, t, t_temp;
   9:     int i,n ;
  10:     CyclicBarrier barrier;
  11:     RelaxMaster master;
  12:     public RelaxSlave( int i, int n, double boundary[][], double t[][], 
  13:                        double t_temp[][], CyclicBarrier barrier,
  14:                        RelaxMaster master)
  15:     {
  16:         this.i = i ; this.n = n ;
  17:         this.boundary = boundary ; this.t = t ; this.t_temp = t_temp ;
  18:         this.barrier = barrier;
  19:         this.master = master;
  20:     }
  21:     public void run()
  22:     {
  23:         double t_old[][] = this.t;
  24:         double t_new[][] = this.t_temp;
  25:         int quot = Matrix.nrows(boundary) / n;
  26:         int rem  = Matrix.nrows(boundary) % n;
  27:         int do_rows = quot + (i < rem ? 1 : 0);
  28:         int first_row = quot * i + (i < rem ? i : rem);
  29:         double temp[][];
  30:         
  31:         int row, col;
  32:         boolean converged;
  33: //      int n_done;
  34: 
  35:         do {
  36:             converged = true;
  37:             for( row = first_row; row < first_row + do_rows; row++ )
  38:             {
  39:                 for( col = 0; col < Matrix.ncols(boundary); col++ )
  40:                 {
  41:                     if( boundary[row][col] == 0 )
  42:                     {
  43:                         double v = 
  44:                             (t_old[row-1][col] + 
  45:                              t_old[row+1][col] +
  46:                              t_old[row  ][col-1] +
  47:                              t_old[row  ][col+1]) / 4.0;
  48:                         if( java.lang.Math.abs(v-t_old[row][col]) > 0.000001)
  49:                             converged = false;
  50:                         t_new[row][col] = v;
  51:                     }
  52:                     else
  53:                     {
  54:                         t_new[row][col] = t_old[row][col];
  55:                     }
  56:                 }
  57:             }
  58:             try
  59:             {
  60:                 if( converged && t_new == this.t )
  61:                     master.i_am_done();
  62:                 barrier.await();
  63:             }
  64:             catch( java.lang.InterruptedException e)
  65:             {
  66:                 return;
  67:             }
  68:             catch( java.util.concurrent.BrokenBarrierException e)
  69:             {
  70:                 return;
  71:             }
  72:             temp = t_new; t_new = t_old; t_old = temp;
  73:         } while( !master.all_done() );
  74:     }
  75: }
[RelaxDemo.java]
   1: /*
   2:  * RelaxDemo.java -- run and measure execution time of Relax. 
   3:  */
   4: 
   5: class RelaxDemo {
   6:     public static void main(String argv[]) 
   7:         throws java.lang.InterruptedException
   8:     {
   9:         if( argv.length != 3 )
  10:         {
  11:             System.err.println("Usage:% java RelaxDemo matrix-size nthreads niters");
  12:             System.err.println("Hints: matrix-size: 40..60, nthreads: 1..nCPU, niters: 1..100");
  13:             System.exit( 1 );
  14:         }
  15:         int matrix_size = Integer.parseInt( argv[0] );
  16:         int nthreads    = Integer.parseInt( argv[1] );
  17:         int niters      = Integer.parseInt( argv[2] );
  18:         run_relax( matrix_size, nthreads, niters );
  19:         System.exit( 0 );
  20:     }
  21:     static void run_relax( int matrix_size, int nthreads, 
  22:                            int niters )
  23:         throws java.lang.InterruptedException
  24:     {
  25:         // pthread_setconcurrency( nthreads );
  26:         double t[][]        = Matrix.alloc( matrix_size, matrix_size );
  27:         double boundary[][] = Matrix.alloc( matrix_size, matrix_size );
  28:         matrix_t_fill( t );
  29:         matrix_boundary_fill( boundary );
  30:         long start = System.currentTimeMillis();
  31:             for( int i=0; i<niters; i++ )
  32:             {
  33:                 RelaxMaster m = new RelaxMaster( boundary, t, nthreads );
  34:                 m.relax();
  35:             }
  36:         long end = System.currentTimeMillis();
  37:         double diff = (double)(end - start)/1000.0 ;
  38:         int concurrency = 0; // pthread_getconcurrency();
  39:         System.out.printf("matrix_size==%d, nthreads==%d, niters==%d, concurrency==%d\n",
  40:                matrix_size, nthreads, niters, concurrency );
  41:         System.out.printf("%6.3f [sec] / %d [iteration] == %6.3f [sec/iteration]\n",
  42:                diff, niters, diff/niters );
  43:         System.out.printf("%d [iteration] / %6.3f [sec] == %6.3f [iteration/sec]\n",
  44:                 niters, diff, niters/diff );
  45:     }
  46:     static void matrix_t_fill( double t[][] )
  47:     {
  48:         int n_rows = Matrix.nrows( t );
  49:         int n_cols = Matrix.nrows( t );
  50:         int i,j;
  51:         for( i=0 ; i<n_rows; i++ )
  52:         {
  53:             for( j=0; j<n_cols; j++ )
  54:             {
  55:                 t[i][j] = 0.0 ;
  56:             }
  57:         }
  58:         j = n_cols/2 ;
  59:         for( i=0 ; i<n_rows; i++ )
  60:         {
  61:             t[i][j] = 1.0 ;
  62:         }
  63:         i = n_cols/2 ;
  64:         for( j=0; j<n_cols; j++ )
  65:         {
  66:             t[i][j] = 1.0 ;
  67:         }
  68:     }
  69:     static void matrix_boundary_fill( double m[][] )
  70:     {
  71:         int n_rows = Matrix.nrows( m );
  72:         int n_cols = Matrix.nrows( m );
  73:         int i,j ;
  74:         for( i=0 ; i<n_rows; i++ )
  75:         {
  76:             for( j=0; j<n_cols; j++ )
  77:             {
  78:                 m[i][j] = 0 ;
  79:             }
  80:         }
  81:         for( i=0 ; i<n_rows; i++ )
  82:         {
  83:             m[i][0] = 1 ;
  84:             m[i][n_cols-1] = 1 ;
  85:         }
  86:         for( j=0; j<n_cols; j++ )
  87:         {
  88:             m[0][j] = 1 ;
  89:             m[n_rows-1][j] = 1 ;
  90:         }
  91:     }
  92: }

◆実行例

SMP (4 CPU, SPARC, Solaris) での実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/RelaxMaster.java [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/RelaxSlave.java [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/RelaxDemo.java [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/Matrix.java [←]
% javac RelaxMaster.java RelaxSlave.java RelaxDemo.java Matrix.java [←]
% java RelaxDemo  [←]
Usage:% java RelaxDemo matrix-size nthreads niters
Hints: matrix-size: 40..60, nthreads: 1..nCPU, niters: 1..100
% java RelaxDemo 60 1 1 [←]
matrix_size==60, nthreads==1, niters==1, concurrency==0
 4.026 [sec] / 1 [iteration] ==  4.026 [sec/iteration]
1 [iteration] /  4.026 [sec] ==  0.248 [iteration/sec]
% java RelaxDemo 60 2 1 [←]
matrix_size==60, nthreads==2, niters==1, concurrency==0
 2.751 [sec] / 1 [iteration] ==  2.751 [sec/iteration]
1 [iteration] /  2.751 [sec] ==  0.364 [iteration/sec]
% java RelaxDemo 60 3 1 [←]
matrix_size==60, nthreads==3, niters==1, concurrency==0
 2.415 [sec] / 1 [iteration] ==  2.415 [sec/iteration]
1 [iteration] /  2.415 [sec] ==  0.414 [iteration/sec]
% java RelaxDemo 60 4 1 [←]
matrix_size==60, nthreads==4, niters==1, concurrency==0
 2.357 [sec] / 1 [iteration] ==  2.357 [sec/iteration]
1 [iteration] /  2.357 [sec] ==  0.424 [iteration/sec]
% []

◆バリア同期の例:弛緩法(C版)

% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/ms_relax.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/ms_relax-main.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/barrier.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/timeval.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/Makefile [←]
% emacs Makefile (CFLAGSの調整) [←]
% make ms_relax-main.c [←]
% ./ms_relax-main 50 1 1 [←]
matrix_size==50, nthreads==1, niters==1, concurrency==1
 1.575 [sec] / 1 [iteration] ==  1.575 [sec/iteration]
1 [iteration] /  1.575 [sec] ==  0.635 [iteration/sec]
% ./ms_relax-main 50 1 1 [←]
matrix_size==50, nthreads==1, niters==1, concurrency==1
 1.572 [sec] / 1 [iteration] ==  1.572 [sec/iteration]
1 [iteration] /  1.572 [sec] ==  0.636 [iteration/sec]
% ./ms_relax-main 50 2 1 [←]
matrix_size==50, nthreads==2, niters==1, concurrency==2
 0.857 [sec] / 1 [iteration] ==  0.857 [sec/iteration]
1 [iteration] /  0.857 [sec] ==  1.167 [iteration/sec]
% ./ms_relax-main 50 3 1 [←]
matrix_size==50, nthreads==3, niters==1, concurrency==3
 0.646 [sec] / 1 [iteration] ==  0.646 [sec/iteration]
1 [iteration] /  0.646 [sec] ==  1.547 [iteration/sec]
% ./ms_relax-main 50 4 1 [←]
matrix_size==50, nthreads==4, niters==1, concurrency==4
 0.938 [sec] / 1 [iteration] ==  0.938 [sec/iteration]
1 [iteration] /  0.938 [sec] ==  1.066 [iteration/sec]
% uptime [←]
  1:06am  up 116 day(s), 14:35,  7 users,  load average: 1.33, 1.14, 1.10
% []

■タスクバッグ(ワークパイル)

◆基本

CPU数だけワーカ・スレッドを作成する。
worker_thraed()
{
	while( (ptr=work_get(workpile))!=NULL )
	{
	    ptrが示す仕事をする;
	    if( 新しい仕事ができた )
	    {
		work_put(workpile,新しい仕事);
	    }
	}
}

タスクバッグ、スレッド、タスク

図? タスクバッグ

◆応用

◆Java Executor フレームワーク

◆クイックソート(Java)

[QuickSort.java]
   1: /*
   2:  * QuickSort.java -- Multithreaded quick sort with a task bag
   3:  */
   4: 
   5: import java.util.concurrent.Executor;
   6: import java.util.concurrent.ExecutorService;
   7: import java.util.concurrent.Executors;
   8: 
   9: class QuickSortTask implements Runnable {
  10:     double data[];
  11:     int base;
  12:     int n;
  13:     QuickSort manager;
  14:     public QuickSortTask( double data[], int base, int n,
  15:                           QuickSort manager )
  16:     {
  17:         this.data    = data ;
  18:         this.base    = base ;
  19:         this.n       = n ;
  20:         this.manager = manager ;
  21:     }
  22:     static final int SORT_DIRECT = 200;
  23:     public void run() 
  24:     {
  25:         int i,j;
  26:         if( n <= SORT_DIRECT )
  27:         {
  28:             for( j=1; j<n; j++)
  29:             {
  30:                 double key = data[base+j];
  31:                 for( i=j-1; i >= 0 && key < data[base+i]; i--)
  32:                     data[base+i+1] = data[base+i];
  33:                 data[base+i+1] = key;
  34:             }
  35:             manager.task_done();
  36:             return;
  37:         }
  38:         i = 0;
  39:         j = n - 1;
  40:         while( true )
  41:         {
  42:             while( data[base+i] < data[base+j] ) j--;
  43:             if( i >= j ) break;
  44:             {  double t = data[base+i]; data[base+i] = data[base+j]; 
  45:                data[base+j] = t;  } /* swap */
  46:             i++;
  47:             while( data[base+i] < data[base+j] ) i++;
  48:             if (i >= j) { i = j; break; }
  49:             {  double t = data[base+i]; data[base+i] = data[base+j]; 
  50:                data[base+j] = t;  } /* swap */
  51:             j--;
  52:         }
  53:         manager.add_task( data,base,    i     );
  54:         manager.add_task( data,base+i+1,n-i-1 );
  55:         manager.task_done();
  56:     }
  57: }
  58: 
  59: class QuickSort {
  60:     int task_count;
  61:     ExecutorService exec;
  62:     public QuickSort(int n_threads)
  63:     {
  64:         task_count = 0;
  65:         exec = Executors.newFixedThreadPool(n_threads);
  66:     }
  67:     public synchronized void add_task( double data[], int base, int n )
  68:     {
  69:         task_count ++;
  70:         Runnable task = new QuickSortTask( data,base,n,this );
  71:         exec.execute( task );
  72:     }
  73:     public synchronized void task_done()
  74:     {
  75:         task_count -- ;
  76:         if( task_count <= 0 )
  77:             notify();
  78:     }
  79:     public synchronized void work_wait()
  80:         throws java.lang.InterruptedException
  81:     {
  82:         while( task_count > 0 )
  83:         {
  84:             wait();
  85:         }
  86:         exec.shutdown();
  87:     }
  88: }

[QuickSortDemo.java]
   1: /*
   2:  * QuicksortDemo.java -- run and measure execution time of Quicksort. 
   3:  */
   4: 
   5: class QuickSortDemo {
   6:     public static void main(String argv[]) 
   7:         throws java.lang.InterruptedException
   8:     {
   9:         if( argv.length != 2 )
  10:         {
  11:             System.err.println("Usage:% java QuicksortDemo matrix-size nthreads");
  12:             System.err.println("Hints: matrix-size: 1000000, nthreads: 1..nCPU");
  13:             System.exit( 1 );
  14:         }
  15:         int matrix_size = Integer.parseInt( argv[0] );
  16:         int nthreads    = Integer.parseInt( argv[1] );
  17:         run_quicksort( matrix_size, nthreads );
  18:         System.exit( 0 );
  19:     }
  20:     static void run_quicksort( int matrix_size, int nthreads )
  21:         throws java.lang.InterruptedException
  22:     {
  23:         // pthread_setconcurrency( nthreads );
  24:         double data[] = new double[matrix_size];
  25:         data_fill( data,matrix_size );
  26:         long start = System.currentTimeMillis();
  27:             QuickSort qs = new QuickSort( nthreads );
  28:             qs.add_task( data,0,matrix_size );
  29:             qs.work_wait();
  30:         long end = System.currentTimeMillis();
  31:         double diff = (double)(end - start)/1000.0 ;
  32:         int concurrency = 0; // pthread_getconcurrency();
  33:         int niters = 1;
  34:         System.out.printf("matrix_size==%d, nthreads==%d, niters==%d, concurrency==%d\n",
  35:                matrix_size, nthreads, niters, concurrency );
  36:         System.out.printf("%6.3f [sec] / %d [iteration] == %6.3f [sec/iteration]\n",
  37:                diff, niters, diff/niters );
  38:         System.out.printf("%d [iteration] / %6.3f [sec] == %6.3f [iteration/sec]\n",
  39:                 niters, diff, niters/diff );
  40:     }
  41:     static void data_fill( double data[], int size )
  42:     {
  43:         int i;
  44:         java.util.Random r = new java.util.Random();
  45:         for( i=0 ; i<size; i++ )
  46:         {
  47:             data[i] = r.nextDouble();
  48:         }
  49:     }
  50:     static void data_print( double data[], int size )
  51:     {
  52:         int i;
  53:         for( i=0 ; i<size; i++ )
  54:         {
  55:             System.out.printf("%4.2f ",data[i]);
  56:         }
  57:         System.out.printf("\n");
  58:     }
  59: }

◆実行例

SMP (4 CPU, SPARC, Solaris) での実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/QuickSort.java [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/QuickSortDemo.java [←]
% javac QuickSort.java QuickSortDemo.java [←]
% java QuickSortDemo  [←]
Usage:% java QuicksortDemo matrix-size nthreads
Hints: matrix-size: 1000000, nthreads: 1..nCPU
% java QuickSortDemo 1000000 1 [←]
matrix_size==1000000, nthreads==1, niters==1, concurrency==0
 3.830 [sec] / 1 [iteration] ==  3.830 [sec/iteration]
1 [iteration] /  3.830 [sec] ==  0.261 [iteration/sec]
% java QuickSortDemo 1000000 2 [←]
matrix_size==1000000, nthreads==2, niters==1, concurrency==0
 2.210 [sec] / 1 [iteration] ==  2.210 [sec/iteration]
1 [iteration] /  2.210 [sec] ==  0.452 [iteration/sec]
% java QuickSortDemo 1000000 3 [←]
matrix_size==1000000, nthreads==3, niters==1, concurrency==0
 1.768 [sec] / 1 [iteration] ==  1.768 [sec/iteration]
1 [iteration] /  1.768 [sec] ==  0.566 [iteration/sec]
% java QuickSortDemo 1000000 4 [←]
matrix_size==1000000, nthreads==4, niters==1, concurrency==0
 1.736 [sec] / 1 [iteration] ==  1.736 [sec/iteration]
1 [iteration] /  1.736 [sec] ==  0.576 [iteration/sec]
% []

◆C言語版: ワークパイルコントローラ(workpile.c)

workpile_t work_init(int max_pile, work_proc_t worker_proc, int n_threads)
初期化する。max_pile は、蓄える仕事の数の最大。worker_proc は、1回の仕事をするスレッド。n_threads は、スレッドの数。
void work_put(workpile_t wp, void *ptr)
仕事をワークパイルに加える。
void work_wait(workpile_t wp)
仕事がなくなるのを待つ。

◆クイックソート(C言語)

% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/quicksort-main.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/quicksort.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/workpile.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/timeval.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2007-12-21/ex/Makefile [←]
% emacs Makefile (CFLAGSの調整) [←]
% make quicksort-main [←]
% ./quicksort-main 1000000 1 [←]
size==1000000, nthreads==1, concurrency==1
execution time:  0.766 [sec]
    throughput:  1.305 [/sec]
% ./quicksort-main 1000000 1 [←]
size==1000000, nthreads==1, concurrency==1
execution time:  0.761 [sec]
    throughput:  1.315 [/sec]
% ./quicksort-main 1000000 2 [←]
size==1000000, nthreads==2, concurrency==2
execution time:  0.439 [sec]
    throughput:  2.280 [/sec]
% ./quicksort-main 1000000 3 [←]
size==1000000, nthreads==3, concurrency==3
execution time:  0.334 [sec]
    throughput:  2.993 [/sec]
% ./quicksort-main 1000000 4 [←]
size==1000000, nthreads==4, concurrency==4
execution time:  0.318 [sec]
    throughput:  3.145 [/sec]
% ./quicksort-main 1000000 5 [←]
size==1000000, nthreads==5, concurrency==5
execution time:  0.276 [sec]
    throughput:  3.621 [/sec]
% []

■性能

◆よい性能をえるために重要なこと

◆並列プログラムを書くためのガイドライン

◆gprofコマンド

アルゴリズムが時間を消費している所を知るための道具。

コンパイル時: -pg オプションを付ける。

実行すると gmon.out というファイルが作られる。 これを、実行形式ファイルと共に gprof コマンドに渡す。

■練習問題

★練習問題(11) 配列の総和

配列が与えられた時に、次のような値を計算する並列プログラムを記述しなさ い。 注意:並列化しにくいものも混じっている。全部はやらなくてもよい。

プログラミング言語としては、C言語、または、Java を用いなさい。

★練習問題(12) 行列の掛け算を分割統治で

行列の掛け算をするプログラムを、分割統治法で書き直しなさい。部分行列に 分割して、掛け算して足す。配列の要素の並びに注意する。

プログラミング言語としては、C言語、または、Java を用いなさい。

★練習問題(13) 行列の掛け算をタスクバッグで

行列の掛け算をするプログラムを、タスクバッグ(ワークパイル)を使った手 法で書き直しなさい。ワーカは、CPU の数だけ作る。タスクは、CPU の数より も多くてもよい。

プログラミング言語としては、C言語、または、Java を用いなさい。

★練習問題(14) 逐次プロラムの並列化

自分が持っている逐次プログラムを、Pthreads、または、Java を使って並列 処理をするプログラムに書き換えなさい。

プログラミング言語としては、C言語、または、Java を用いなさい。


↑[もどる] ←[12月14日] ・[12月21日] →[1月11日]
Last updated: 2007/12/21 05:25:55
Yasushi Shinjo / <yas@is.tsukuba.ac.jp>