IV100 Distribuované výpočty Rasťo Královič Katedra informatiky, FMFI UK Bratislava kralovic@dcs.fmph.uniba.sk Gerard Tel: Introduction to Distributed Algorithms, Cambridge University Press, 2000, ISBN 0521794838 Nancy Lynch: Distributed Algorithms, Morgan Kaufmann Publishers, 1996, ISBN 1558603484 Frank Thomson Leighton: Introduction to Parallel Algorithms and Architectures: Arrays, Trees, Hypercubes, Morgan Kaufmann Publishers, 1991, ISBN 1558601171 sekvenčné programovacie jazyky rôzne paradigmy, syntax, ... rôzne použitie interpreter (simulácia) z pohľadu počítania sú všetky ”rovnako silné” jednoduchý abstraktný model (návrh algoritmov) RAM – jednoduchý ”abstraktný počítač” registre r0, r1, r2, r3, . . . vstup a1, a2, a3, . . . program = postupnosť inštrukcií pozícia v programe inštrukcie: priradenie rX := rY X, Y je konštanta alebo register rX := aY rX := c c je konštanta výpočet rX := rY rZ je +, −, ∗, / skok goto i podmienka if rX rY then goto i if rX c then goto i zdieľaná pamäť – thready dynamické vytváranie asynchrónne (join) riadenie prístupu (mutex/semafór/...) main() Krt(1) Krt(9) Krt(10) pthread_create() main() join() counter thready (POSIX) #include #include pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER; int counter = 0; void *Krt(void *p) { pthread_mutex_lock( &mutex1 ); counter += *(int *)p; pthread_mutex_unlock( &mutex1 ); } main() { pthread_t thread_id[NTHREADS]; int i, j, param[NTHREADS]; for(i=0; i < NTHREADS; i++) { param[i]=i; pthread_create( &thread_id[i], NULL, Krt, param + i ); } for(j=0; j < NTHREADS; j++) pthread_join( thread_id[j], NULL); } thready (POSIX) #include #include pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER; int counter = 0; void *Krt(void *p) { pthread_mutex_lock( &mutex1 ); counter += *(int *)p; pthread_mutex_unlock( &mutex1 ); } main() { pthread_t thread_id[NTHREADS]; int i, j, param[NTHREADS]; for(i=0; i < NTHREADS; i++) { param[i]=i; pthread_create( &thread_id[i], NULL, Krt, param + i ); } for(j=0; j < NTHREADS; j++) pthread_join( thread_id[j], NULL); } deadlock synchronizácia debugovanie PRAM MEM CPU MEM CPU MEM CPU MEM CPU MEM CPU MEM CPU SHARED MEMORY synchrónne procesory kedy je problém paralelizovateľný? global read( A(i), a ) global write( a, B(i) ) for h = 1 to log n do if (i ≤ n/2h) global read( B(2i − 1),x ) global read( B(2i),y ) z := x + y global write(z, B(i)) if i = 1 global write( z,S ) 5 1 3 2 7 4 5 8 131156 11 24 35 WTF – Work-Time Framework písať algoritmy pre ľubovoľný počet procesorov work – počet použitých operácií for l ≤ i ≤ u pardo statement for 1 ≤ i ≤ n pardo B(i) := A(i) for h = 1 to log n do for 1 ≤ i ≤ n/2h pardo B(i) := B(2i − 1) + B(2i) S := B(1) global read( A(i), a ) global write( a, B(i) ) for h = 1 to log n do if (i ≤ n/2h) global read( B(2i − 1),x ) global read( B(2i),y ) z := x + y global write(z, B(i)) if i = 1 global write( z,S ) WT algoritmus s T(n), W (n) sa dá simulovať na PRAMe s p procesormi v čase W (n) p + T(n) PRAM EREW / CREW / CRCW príklad: pozícia prvej jednotky na common CRCW PRAM for i = 1 to n pardo B(i) := 0 for i = 1 to n2 pardo if A(i) = 1 then B( i/n ) := 1 for i = 1 to n pardo for j = 1 to i − 1 pardo if B(j) = 1 then B(i) := 0 for i = 1 to n pardo if B(i) = 1 then D := i − 1 for i = 1 to n pardo for j = 1 to i − 1 pardo if A(D + j) = 1 then A(D + i) := 0 for i = 1 to n pardo if A(D + i) = 1 then X := D + i Prefix Sum Problem Dané pole A dĺžky n. Pre každý index i nájsť i j=0 ai for 1 ≤ i ≤ n/2 pardo xi := a2i−1 + a2i z :=prefix sums( X, n/2 ) for 1 ≤ i ≤ n/2 pardo ai :=    zi /2 ak i = 2k 1 ak i = 1 z(i−1)/2 + ai ak i = 2k + 3 práca = O(n) čas = O(log n) bez zdieľanej pamäte – procesy void do_child(int data_pipe[]) { int c,rc; close(data_pipe[1]); while ((rc = read(data_pipe[0], &c, 1)) > 0) putchar(c); exit(0); } void do_parent(int data_pipe[]) { int c,rc; close(data_pipe[0]); while ((c = getchar()) > 0) rc = write(data_pipe[1], &c, 1); close(data_pipe[1]); exit(0); } int main() { int data_pipe[2]; int pid,rc; rc = pipe(data_pipe); pid = fork(); switch (pid) { case 0: do_child(data_pipe); default: do_parent(data_pipe); } } v sieti: posielanie správ (sokety) server int main(int argc, char *argv[]) { int sockfd, newsockfd, portno; socklen_t clilen; char buffer[256]; struct sockaddr_in serv_addr, cli_addr; int n; sockfd = socket(AF_INET, SOCK_STREAM, 0); bzero((char *) &serv_addr, sizeof(serv_addr)); portno = atoi(argv[1]); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = INADDR_ANY; serv_addr.sin_port = htons(portno); bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)); listen(sockfd,5); clilen = sizeof(cli_addr); newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen); bzero(buffer,256); n = read(newsockfd,buffer,255); printf("Here is the message: %s\n",buffer); n = write(newsockfd,"I got your message",18); close(newsockfd); close(sockfd); } klient int main(int argc, char *argv[]) { int sockfd, portno, n; struct sockaddr_in serv_addr; struct hostent *server; char buffer[256]; portno = atoi(argv[2]); sockfd = socket(AF_INET, SOCK_STREAM, 0); server = gethostbyname(argv[1]); bzero((char *) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; bcopy((char *)server->h_addr, (char *)&serv_addr.sin_addr.s_addr, server->h_length); serv_addr.sin_port = htons(portno); connect(sockfd,(struct sockaddr *) &serv_addr, sizeof(serv_addr)); printf("Please enter the message: "); bzero(buffer,256); fgets(buffer,255,stdin); n = write(sockfd,buffer,strlen(buffer)); bzero(buffer,256); n = read(sockfd,buffer,255); printf("%s\n",buffer); close(sockfd); return 0; } v sieti: posielanie správ (MPI) int main(int argc, char *argv[]) { const int tag = 47; ... MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &ntasks); MPI_Comm_rank(MPI_COMM_WORLD, &id); if (id == 0) { MPI_Get_processor_name(msg, &length); printf("Hello World from process %d running on %s\n", id, msg); for (i=1; i