Home | Archive | About | Other

Parallelism

Start from matrix multiplication

We know in a single thread, multiplication of two matrixes in naive implementation can be easily done in O(n^3) time where n is the length of its sides. There are algorithms that can asymptotically approach the theoretical lower bound of O(w*n^2) where 2 <= w < 2.37.... One of such algorithms is called strassen. Its run time is $O(n^{\log_2^{7}})\approx{O(n^{2.8})}$

matrix multiplication in single thread

class BaseMatrix:
	"""
	implement a matrix class that supports metrix relevant operations
	"""

	def inv(self, A):
		"""
		find inverse matrix of A
		@input: input square matrix A
		@output: inverse of A
		"""
		n = len(A)
		A, P = self._LUPDecompose(A, n)
		IA = [[0.0] * n for i in range(n)]
		# forward/backward substitution to solve IA from L/U and P
		for j in range(n):
			for i in range(n):
				IA[i][j] = 1.0 if P[i] == j else 0.0

				for k in range(i):
					IA[i][j] -= A[i][k] * IA[k][j]

			for i in range(n-1, -1, -1):
				for k in range(i+1, n):
					IA[i][j] -= A[i][k] * IA[k][j]

				IA[i][j] /= A[i][i]

		return IA

	def det(self, A):
		"""
		find determinant of square matrix A
		@input: input square matrix of size n
		@output: determinant of A
		"""
		n = len(A)
		A, P = self._LUPDecompose(A, n)
		det = A[0][0]
		
		for i in range(1, n):
			det *= A[i][i]
		
		return det if (P[n] - n)%2 == 0 else -det

	def mult(self, A, B): 
		""" 
		matrix multiplication with divide and conquer approach (Strassen Algorithm), recursive
		"""
		h, w = len(A), len(B[0])
		A, B = self._strassen_padding(A), self._strassen_padding(B)
		C_padded = self._strassen(A, B)
		C = [C_padded[i][:w] for i in range(h)]
		return C

	def add(self, A, B):
		"""matrix element-wise add"""
		if len(A) != len(B) or len(A[0]) != len(B[0]):
			raise ValueError("two matrixes should have same shape to perform element-wise oeprations")
		return [[A[i][j]+B[i][j] for j in range(len(A[0]))] for i in range(len(A))]

	def sub(self, A, B):
		"""matrix element-wise subtract"""
		if len(A) != len(B) or len(A[0]) != len(B[0]):
			raise ValueError("two matrixes should have same shape to perform element-wise oeprations")
		return [[A[i][j]-B[i][j] for j in range(len(A[0]))] for i in range(len(A))]

	def prod(self, A, B):
		"""matrix element-wise product"""
		if len(A) != len(B) or len(A[0]) != len(B[0]):
			raise ValueError("two matrixes should have same shape to perform element-wise oeprations")
		return [[A[i][j]*B[i][j] for j in range(len(A[0]))] for i in range(len(A))]

	def show(self, A):
		for i in A:
			print(i)

	def _strassen_base(self, A, B):
		"""
		base case for strassen algorithm, default is product of two numbers, 
		since its data structure is nested list, the product is represented by element-wise matrix product
		"""
		return self.prod(A, B)

	def _strassen_padding(self, A):
		w, h = len(A[0]), len(A)
		k = math.log(max(w, h), 2)
		if not k.is_integer():
			n = int(2**(k//1 + 1))
		else:
			n = int(2**k)
		return [A[i] + [0]*(n - w) if i < h else [0]*n for i in range(n)]

	def _strassen(self, A, B): 
	    """ 
	    matrix multiplication by divide and conquer approach, recursive
	    """
	  
	    # Base case when size of matrices is 1x1 
	    if len(A) == 1:
	    	return self._strassen_base(A, B)

	    # Split the matrices into blocks until base case is reached
	    a, b, c, d = self._split(A) 
	    e, f, g, h = self._split(B) 
	  
	    # Computing the 7 matrix multiplications recursively
	    m1 = self._strassen(a, self.sub(f, h))
	    m2 = self._strassen(self.add(a, b), h)         
	    m3 = self._strassen(self.add(c, d), e)         
	    m4 = self._strassen(d, self.sub(g, e))
	    m5 = self._strassen(self.add(a, d), self.add(e, h))
	    m6 = self._strassen(self.sub(b, d), self.add(g, h))   
	    m7 = self._strassen(self.sub(a, c), self.add(e, f))
	  
	    # Computing the values of the 4 blocks of the final matrix c
	    c11 = self.add(self.sub(self.add(m5, m4), m2), m6)
	    c12 = self.add(m1, m2)            
	    c21 = self.add(m3, m4)          
	    c22 = self.sub(self.sub(self.add(m1, m5), m3), m7)
	  
	    # Combining the 4 blocks into a single matrix by stacking horizontally and vertically
	    C = self._vstack(self._hstack(c11, c12), self._hstack(c21, c22))

	    return C

	def _split(self, mat): 
	    """ 
	    Splits a given matrix into 4 blocks 
	    """
	    h, w = len(mat), len(mat[0])
	    h2, w2 = h//2, w//2
	    return [mat[r][:w2] for r in range(h2)], \
	    	   [mat[r][w2:] for r in range(h2)], \
	    	   [mat[r][:w2] for r in range(h2, h)], \
	    	   [mat[r][w2:] for r in range(h2, h)]

	def _hstack(self, M1, M2):
		"""satck two matrixes into one horizontally"""
		w1, w2 = len(M1[0]), len(M2[0])
		h1, h2 = len(M1), len(M2)
		if h1 != h2:
			raise ValueError("two matrixes should have same height to be stacked horizontally")
		M0 = [[0 for x in range(w1+w2)] for y in range(h1)]
		i = 0
		while i < h1:
			M0[i] = M1[i] + M2[i]
			i+=1
		return M0

	def _vstack(self, M1, M2):
		"""satck two matrixes into one vertically"""
		w1, w2 = len(M1[0]), len(M2[0])
		h1, h2 = len(M1), len(M2)
		if w1 != w2:
			raise ValueError("two matrixes should have same width to be stacked vertically")
		M0 = [[0 for x in range(w1)] for y in range(h1+h2)]
		for i in range(h1+h2):
			if i < h1:
				M0[i] = M1[i]
			else:
				M0[i] = M2[i-h1]
		return M0

	def _LUPDecompose(self, A, n):
		"""
		LUP decomposition of matrix A
		@input
		A: input square matrix of dimension n
		@output
		A: matrix A is modified and contains a copy of both matrices L-E and U as A=(L-E)+U such that 
			it is a LUP decomposition where P*A=L*U. The permutation matrix is a integer list of size n+1
			containing column indexes of the represented permutation matrix P where the P has "1". The last 
			element P[n]=s+n, where s is the number of row exchanges of P from idendity matrix E, for the purpose 
			of computing determinant as det(P)=(-1)^S
		P: 1-d list of length n+1 representing permutation matrix and swap count
		"""
		
		if n!= len(A[0]):
			raise ValueError("input is not a square matrix")

		P = [i for i in range(n+1)]  # initiate P as an identity matrix with a permutation counter with initial value = n

		for i in range(n):
			maxA = 0.0
			imax = i

			for k in range(i, n):  # determine index of max value per column in A
				absA = abs(A[k][i])
				if absA > maxA:
					maxA = absA
					imax = k

			if imax != i:  # if max is not in diagonal, swap
				# swap P
				j = P[i]
				P[i] = P[imax]
				P[imax] = j
				# swap rows of A
				row = A[i]
				A[i] = A[imax]
				A[imax] = row
				# count swaps
				P[n]+=1

			for j in range(i+1, n):  # forward substitution for A=(L-E)+U so that P*A=L*U
				A[j][i] /= A[i][i]

				for k in range(i+1, n):
					A[j][k] -= A[j][i] * A[i][k]

		return A, P

matrix multiplication in distributed network

Preword: Distributed Memory Netowrk Topology

some definitions or properties of a network…

improve diameter:

some types of network… (assume p nodes)

Network Type Links Diameter Bisection
Linear p-1 p-1 1
2-D Mesh 2p $2\sqrt{p}$ $\sqrt{p}$
Fully Connected $p(p-1)/2$ 1 $p^2/4$
Binary Tree P log(p) 1
D-Dimensional Mesh dP $\frac{1}{2}dP^{1/d}$ $2P^{(d-1)/d}$
Hypercube plogp logp p/2
congestion concept

to map a logical topological network to a physical one… consider congestion:

lower bound of a congestion C = logical bisection / physical bisection. If you know the congestion. you’ll know how much worse the cost of your algorithm will be on a physical network with a lower bisection capacity.

Loomis and Whitney

In matrix multiply A = BC. Given any sub-block of sA, sB, sC,

On distributed network

1D network algorithm

Use Block Row Distribution: each node gets n/P rows. In AB=C, A, or B, say B, has to do row shift. 1dmatmul

Rearrange code to overlap communication latency: (improve run time by mostly 2)

let A[1: n/P] [1:n] = local part of A 
	B, C = same for B, C 

let B’’[1:n/P][1:n] = temp storage 
let rnext  (RANK + 1) mod P 
	rprev  (RANK + P ­1) mod P 

for L  0 to P-1 do 
	sendAsync (B  rnext ) 
	recvAsync (B’’  rprev ) 
	C[:][:] += A[:][...L] . B[...L][:] 
	wait(*) 
	swap(B, B’’)

2D network algorithm

each node in mesh responsible for its corresponding block multiplication. trip by strip, each trip needs to be broadcast 2d network mat mul algo

beyond 2D - lower communication bound

$T (n; p) = a * √P + b * n^2 √P)$

More On Parallelism of Distributed Memory

Message Passing Interface based parallelism(MPI)

MPI defines a standard library for message-passing that can be used to develop portable message-passing programs using either C or Fortran. The MPI standard defines both the syntax as well as the semantics of a core set of library routines that are very useful in writing message-passing programs.

principals of message passing

The logical view of a machine supporting the message-passing paradigm consists of p processes, each with its own exclusive address space. Instances of such a view come naturally from clustered workstations and non-shared address space multicomputers. There are two immediate implications of a partitioned address space. First, each data element must belong to one of the partitions of the space; hence, data must be explicitly partitioned and placed. This adds complexity to programming, but encourages locality of access that is critical for achieving high performance on non-UMA architecture, since a processor can access its local data much faster than non-local data on such architectures. The second implication is that all interactions (read-only or read/write) require cooperation of two processes – the process that has the data and the process that wants to access the data. This requirement for cooperation adds a great deal of complexity for a number of reasons. The process that has the data must participate in the interaction even if it has no logical connection to the events at the requesting process. In certain circumstances, this requirement leads to unnatural programs. In particular, for dynamic and/or unstructured interactions the complexity of the code written for this type of paradigm can be very high for this reason. However, a primary advantage of explicit two-way interactions is that the programmer is fully aware of all the costs of non-local interactions, and is more likely to think about algorithms (and mappings) that minimize interactions. Another major advantage of this type of programming paradigm is that it can be efficiently implemented on a wide variety of architectures.

The message-passing programming paradigm requires that the parallelism is coded explicitly by the programmer. That is, the programmer is responsible for analyzing the underlying serial algorithm/application and identifying ways by which he or she can decompose the computations and extract concurrency. As a result, programming using the message-passing paradigm tends to be hard and intellectually demanding. However, on the other hand, properly written message-passing programs can often achieve very high performance and scale to a very large number of processes.

A Basic Model of Distributed Memory

Assumptions

Pipelined message delivery

If two nodes are P nodes away in network, two directly connected nodes has message delivery time of t, then transferring n words between these two nodes needs time \(\alpha + t(P-2) + tn\)

Point to Point primitives

Remember every send must have a matching receive sendAsync and recvAsync can get trapped in a deadlock depending upon how the wait isimplemented.

Collectives Operation runtime

A collective is an operation that must be executed by all processes.

reduce(A_local[1:n], root)

shared-memory based parallelism (OpenMP)

Explicit parallel programming requires specification of parallel tasks along with their interactions. These interactions may be in the form of synchronization between concurrent tasks or communication of intermediate results. In shared address space architectures, communication is implicitly specified since some (or all) of the memory is accessible to all the processors. Consequently,** programming paradigms for shared address space machines focus on constructs for expressing concurrency and synchronization along with techniques for minimizing associated overheads**. In this chapter, we discuss shared-address-space programming paradigms along with their performance issues and related extensions to directive-based paradigms.

Shared address space programming paradigms can vary on mechanisms for data sharing, concurrency models, and support for synchronization. Process based models assume that all data associated with a process is private, by default, unless otherwise specified (using UNIX system calls such as shmget and shmat).

OpenMP: a Standard for Directive Based Parallel Programming

…APIs such as Pthreads are considered to be low-level primitives. Conventional wisdom indicates that a large class of applications can be efficiently supported by higher level constructs (or directives) which rid the programmer of the mechanics of manipulating threads. Such directive-based languages have existed for a long time, but only recently have standardization efforts succeeded in the form of OpenMP. OpenMP is an API that can be used with FORTRAN, C, and C++ for programming shared address space machines. OpenMP directives provide support for concurrency, synchronization, and data handling while obviating the need for explicitly setting up mutexes, condition variables, data scope, and initialization.

considerations for Design Parallel Algorithms

Automatic

compiler finds any possible parallelization points

Manual

programmer finds any possible parallelization points

  1. Determine if the problem is one that can be solved in parallel.
  2. Identify Program Hotspots
  3. Identify Bottlenecks
  4. Identify Inhibitors to Parallelism
  5. If possible look at other algorithms to see if they can be parallelized.
  6. Use parallel programs and libraries from third party vendors.
partitioning

Divide the tasks to be done into discrete chunks.There are two methods for partitioning:

  1. Domain Decomposition The data is decomposed. Then each task works on a portion of the data.

  2. Functional Decomposition The problem is broken into smaller tasks. Then each task does part of the work that needs to be done. In functional decomposition the emphasis is on the computation, rather than the data.

Examples of Functional Decomposition

  1. Ecosystem modeling
  2. Signal processing
  3. Climate modeling
communications

Which tasks need to communicate with each other. If communication is needed between processes

synchronization

There are 3 kinds of Synchronization:

load balancing

keep all tasks computing volume even, so no bottleneck.

Two-Level Memory Model (slow-fast memory model)

key take-away: caches managed by hardware itself are fast, but not sufficient, therefore we need algorithms for more efficient slow-fast memory transfers.

notation

A First Basic Model

To find a locality aware algorithm we need a machine model - will be using a variation on the von Neumann model.

von Neumann Model:

Rules:

  1. The processor can only work with data that is in the fast memory, known as the local data rule.
  2. When there is a transfer of data between the fast and slow memory, the data is transferred in blocks of size L, known as the block transfer rule. In this model you may need to consider data alignment

Costs: The model has two costs associated with it:

  1. Work. W(n) == the # of computation operations. (How many operations will the processor have to perform?)
  2. Data transfers, Q(n;Z;L) == # of L-sized slow-fast transfers **(loads and stores). The number of transfers is dependent upon the size of the cache and block size. This will referred to as **Q and be called the I/O Complexity.

Algorithm Design goals

performance

suppose

then

with refactoring

then

if we normalize this performance

we can see that $R$ is capped when $I=B$ which means

more..

The intuition from energy & time efficiency analysis and actual testing1 is, as intensity increases,

IO Avoiding Algorithm

Continue analysis for several common IO algorithms:

Q (transfers) for merge sort

\(Q(n;Z;L) = \Omega(\frac{(n/L)\log(n/L)}{\log(Z/L)})\)

analysis:

Summing up:

If we do K-way merging that fully utilizes fast mem size $Z$, then we can reach theoretical lower bound:

\(Q(n;Z;L) = \Omega{\frac{\log n}{\log L}}\)

according to information theory, an array with $n$ binary bits contains $\log n$ information. For each $L$ transfer, you can learn $\log L$ information.

van emeda boas data layout in fast memory achieves this lower bound: (DATA STRUCTURE MATTERS.)

Q for matrix multiplication

\(Q(n;Z;L) = \Omega{(n^3/(L\sqrt{Z}))}\)

more about Cache oblivious..

the term Cache oblivious refers to an IO algorithm whose $Q$ is irrelevant to cache size $Z$. for example, sum of $n$ size array takes $Q=O(n/L)$. for a counter example, matrix multiplication in block transfer is cache ware.

LRU-OPT Competitiveness Lemma

the lemma2 says \(Q_{LRU}(n;Z;L)\leq 2Q_{OPT}(n;\frac{n}{2};L)\) It means that number of transfers for a LRU-based cache can be asymptotically close to number of transfers for an cache with optimal replacement policy but only half of size.

Corollary (Regularity Condition)

$Q_{OPT}(n;Z;L)=O(Q_{OPT}(n;2*Z;L))$. For example, previously we see for matrix multiplication, $Q(n;Z;L) = \Omega{(n^3/(L\sqrt{Z}))}$, when $Z$ doubles, there’s only constant factor change of optimal number of transfer.

Once the $Q$ obeys this condition, then the lemma stays.

tall-cache assumption

an interesting design assumption of cache where its height (number of cache lines) shall be larger than its width (number of words in a line). this will be helpful when algorithm is related to matrix block transfer. for example, a $bb$ block transfer requires $Z\geq bb$

most actual caches, probably except for TLB, are indeed tall.

Work Span Model

Work-Span model is implemented by a DAG (directed acyclic graph) where each node as a work depends on one another.

Assume:

  1. All processors run at same speed
  2. 1 operation work = 1 unit of time
  3. no edge cost

Denote:

Analysis

Brent’s Theorem

break execution in phases:

extra denote:

then:
\(t_k=\text{Ceil}(\frac{w_k}{p})=>T_p=\sum_{k=1}^D{\text{Ceil}(\frac{w_k}{p})}\leq\sum_{k=1}^D\text{Floor}(\frac{w_k-1}{p}+1)=\frac{W-D}{p}+D\)

to sum up: \(\max \{D(n), \ \text{Ceil}(\frac{W(n)}{p})\}\leq T_p\leq{\frac{W-D}{p}+D}\)

Speedup

Consider speedup on our DAG parallel algorithm calculated as \(\frac{best\ sequential \ time}{parallel\ time}\) namely \(S_p(n)=\frac{T_*(n)}{T_p(n)}\) substitution by previous inequality: \(S_p(n)\leq \frac{p}{\frac{W}{W_*}+\frac{p-1}{W_*/D}}\)

you can clearly see if speedup wants to scale linearly with $p$, this two equation must be met in the scaling inequality:

basic concurrency primitives

basic concur primitives.png|600

desiderata for work-span

motivation: $\frac{W}{D}=O(\frac{n}{log^kn})$ grows nearly linearly with $n$

matrix-vector multiply
![par bfs.png 500](/assets/images/par%20bfs.png)

Depth of BFS search should be “diameter” of graph, or the longest distance between two nodes (i.e. the number of waves from one node to spread over to another node)

Algorithm

pseudo code

Data structure: Bag

bag property:

operations:

Pennant

Pennant is: a tree with 2k nodes and a unary root having a child. the child is the root of a complete binary tree. X is the root. Xs is the complete binary tree. So a pennant has 2^n nodes. Essentially a binary number representation, a binary can be represented as a series of pennants.

For example, bag of 23 nodes = 2^4 size pennant, 2^2 size pennant, 2^1 size pennant, 2^0 size pennant ($23=10111_2$). Since it is essentially a binary number,

  1. Jee Whan Choi, et al, A roofline model of energy, 2012.05 

  2. Frigo et al, (FOCS, 1999)