logo

공유 메모리의 동시 병합 정렬

숫자 'n'과 n 숫자가 주어지면 다음을 사용하여 숫자를 정렬합니다. 경쟁 상대 병합 정렬. (힌트: shmget shmat 시스템 호출을 사용해 보십시오).
1부: 알고리즘(어떻게?)  
두 개의 하위 프로세스를 하나는 왼쪽 절반, 오른쪽 절반은 반복적으로 만듭니다. 프로세스 배열의 요소 수가 5개 미만인 경우 다음을 수행합니다. 삽입 정렬 . 그런 다음 두 자식의 부모는 결과를 병합하고 부모에게 다시 반환합니다. 그런데 어떻게 동시에 만들 수 있나요?
2부: 논리적(왜?)  
이 문제의 해결에서 중요한 부분은 알고리즘적인 부분이 아니라 운영체제와 커널의 개념을 설명하는 것입니다. 
동시 정렬을 달성하려면 두 프로세스가 동일한 배열에서 동시에 작동하도록 만드는 방법이 필요합니다. 작업을 더 쉽게 만들기 위해 Linux는 간단한 API 엔드포인트를 통해 많은 시스템 호출을 제공합니다. 그 중 두 개는 shmget() (공유 메모리 할당용) 및 shmat() (공유 메모리 작업용) 우리가 분기하는 하위 프로세스 사이에 공유 메모리 공간을 만듭니다. 각 세그먼트는 왼쪽과 오른쪽 하위로 분할되어 동시에 작업하므로 흥미로운 부분으로 정렬됩니다! shmget()은 커널에 할당을 요청합니다. 공유 페이지 두 프로세스 모두에 대해.
전통적인 fork()가 작동하지 않는 이유는 무엇입니까?  
대답은 fork()가 실제로 무엇을 하는지에 있습니다. 문서에서 'fork()는 호출 프로세스를 복제하여 새 프로세스를 생성합니다'. 하위 프로세스와 상위 프로세스는 별도의 메모리 공간에서 실행됩니다. fork() 시점에는 두 메모리 공간 모두 동일한 내용을 갖습니다. 프로세스 중 하나에서 수행된 메모리 쓰기 파일 설명자(fd) 변경 등은 다른 프로세스에 영향을 주지 않습니다. 따라서 공유 메모리 세그먼트가 필요합니다.
 

CPP
#include    #include  #include  #include  #include  #include  #include  #include  void insertionSort(int arr[] int n); void merge(int a[] int l1 int h1 int h2); void mergeSort(int a[] int l int h) {  int i len = (h - l + 1);  // Using insertion sort for small sized array  if (len <= 5)  {  insertionSort(a + l len);  return;  }  pid_t lpid rpid;  lpid = fork();  if (lpid < 0)  {  // Lchild proc not created  perror('Left Child Proc. not createdn');  _exit(-1);  }  else if (lpid == 0)  {  mergeSort(a l l + len / 2 - 1);  _exit(0);  }  else  {  rpid = fork();  if (rpid < 0)  {  // Rchild proc not created  perror('Right Child Proc. not createdn');  _exit(-1);  }  else if (rpid == 0)  {  mergeSort(a l + len / 2 h);  _exit(0);  }  }  int status;  // Wait for child processes to finish  waitpid(lpid &status 0);  waitpid(rpid &status 0);  // Merge the sorted subarrays  merge(a l l + len / 2 - 1 h); } /* Function to sort an array using insertion sort*/ void insertionSort(int arr[] int n) {  int i key j;  for (i = 1; i < n; i++)  {  key = arr[i];  j = i - 1;  /* Move elements of arr[0..i-1] that are  greater than key to one position ahead  of their current position */  while (j >= 0 && arr[j] > key)  {  arr[j + 1] = arr[j];  j = j - 1;  }  arr[j + 1] = key;  } } // Method to merge sorted subarrays void merge(int a[] int l1 int h1 int h2) {  // We can directly copy the sorted elements  // in the final array no need for a temporary  // sorted array.  int count = h2 - l1 + 1;  int sorted[count];  int i = l1 k = h1 + 1 m = 0;  while (i <= h1 && k <= h2)  {  if (a[i] < a[k])  sorted[m++] = a[i++];  else if (a[k] < a[i])  sorted[m++] = a[k++];  else if (a[i] == a[k])  {  sorted[m++] = a[i++];  sorted[m++] = a[k++];  }  }  while (i <= h1)  sorted[m++] = a[i++];  while (k <= h2)  sorted[m++] = a[k++];  int arr_count = l1;  for (i = 0; i < count; i++ l1++)  a[l1] = sorted[i]; } // To check if array is actually sorted or not void isSorted(int arr[] int len) {  if (len == 1)  {  std::cout << 'Sorting Done Successfully' << std::endl;  return;  }  int i;  for (i = 1; i < len; i++)  {  if (arr[i] < arr[i - 1])  {  std::cout << 'Sorting Not Done' << std::endl;  return;  }  }  std::cout << 'Sorting Done Successfully' << std::endl;  return; } // To fill random values in array for testing // purpose void fillData(int a[] int len) {  // Create random arrays  int i;  for (i = 0; i < len; i++)  a[i] = rand();  return; } // Driver code int main() {  int shmid;  key_t key = IPC_PRIVATE;  int *shm_array;  int length = 128;  // Calculate segment length  size_t SHM_SIZE = sizeof(int) * length;  // Create the segment.  if ((shmid = shmget(key SHM_SIZE IPC_CREAT | 0666)) < 0)  {  perror('shmget');  _exit(1);  }  // Now we attach the segment to our data space.  if ((shm_array = (int *)shmat(shmid NULL 0)) == (int *)-1)  {  perror('shmat');  _exit(1);  }  // Create a random array of given length  srand(time(NULL));  fillData(shm_array length);  // Sort the created array  mergeSort(shm_array 0 length - 1);  // Check if array is sorted or not  isSorted(shm_array length);  /* Detach from the shared memory now that we are  done using it. */  if (shmdt(shm_array) == -1)  {  perror('shmdt');  _exit(1);  }  /* Delete the shared memory segment. */  if (shmctl(shmid IPC_RMID NULL) == -1)  {  perror('shmctl');  _exit(1);  }  return 0; } 
Java
import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; public class ConcurrentMergeSort {  // Method to merge sorted subarrays  private static void merge(int[] a int low int mid int high) {  int[] temp = new int[high - low + 1];  int i = low j = mid + 1 k = 0;  while (i <= mid && j <= high) {  if (a[i] <= a[j]) {  temp[k++] = a[i++];  } else {  temp[k++] = a[j++];  }  }  while (i <= mid) {  temp[k++] = a[i++];  }  while (j <= high) {  temp[k++] = a[j++];  }  System.arraycopy(temp 0 a low temp.length);  }  // RecursiveAction for fork/join framework  static class SortTask extends RecursiveAction {  private final int[] a;  private final int low high;  SortTask(int[] a int low int high) {  this.a = a;  this.low = low;  this.high = high;  }  @Override  protected void compute() {  if (high - low <= 5) {  Arrays.sort(a low high + 1);  } else {  int mid = low + (high - low) / 2;  invokeAll(new SortTask(a low mid) new SortTask(a mid + 1 high));  merge(a low mid high);  }  }  }  // Method to check if array is sorted  private static boolean isSorted(int[] a) {  for (int i = 0; i < a.length - 1; i++) {  if (a[i] > a[i + 1]) {  return false;  }  }  return true;  }  // Method to fill array with random numbers  private static void fillData(int[] a) {  Random rand = new Random();  for (int i = 0; i < a.length; i++) {  a[i] = rand.nextInt();  }  }  public static void main(String[] args) {  int length = 128;  int[] a = new int[length];  fillData(a);  ForkJoinPool pool = new ForkJoinPool();  pool.invoke(new SortTask(a 0 a.length - 1));  if (isSorted(a)) {  System.out.println('Sorting Done Successfully');  } else {  System.out.println('Sorting Not Done');  }  } } 
Python3
import numpy as np import multiprocessing as mp import time def insertion_sort(arr): n = len(arr) for i in range(1 n): key = arr[i] j = i - 1 while j >= 0 and arr[j] > key: arr[j + 1] = arr[j] j -= 1 arr[j + 1] = key def merge(arr l mid r): n1 = mid - l + 1 n2 = r - mid L = arr[l:l + n1].copy() R = arr[mid + 1:mid + 1 + n2].copy() i = j = 0 k = l while i < n1 and j < n2: if L[i] <= R[j]: arr[k] = L[i] i += 1 else: arr[k] = R[j] j += 1 k += 1 while i < n1: arr[k] = L[i] i += 1 k += 1 while j < n2: arr[k] = R[j] j += 1 k += 1 def merge_sort(arr l r): if l < r: if r - l + 1 <= 5: insertion_sort(arr) else: mid = (l + r) // 2 p1 = mp.Process(target=merge_sort args=(arr l mid)) p2 = mp.Process(target=merge_sort args=(arr mid + 1 r)) p1.start() p2.start() p1.join() p2.join() merge(arr l mid r) def is_sorted(arr): for i in range(1 len(arr)): if arr[i] < arr[i - 1]: return False return True def fill_data(arr): np.random.seed(0) arr[:] = np.random.randint(0 1000 size=len(arr)) if __name__ == '__main__': length = 128 shm_array = mp.Array('i' length) fill_data(shm_array) start_time = time.time() merge_sort(shm_array 0 length - 1) end_time = time.time() if is_sorted(shm_array): print('Sorting Done Successfully') else: print('Sorting Not Done') print('Time taken:' end_time - start_time) 
JavaScript
// Importing required modules const { Worker isMainThread parentPort workerData } = require('worker_threads'); // Function to merge sorted subarrays function merge(a low mid high) {  let temp = new Array(high - low + 1);  let i = low j = mid + 1 k = 0;  while (i <= mid && j <= high) {  if (a[i] <= a[j]) {  temp[k++] = a[i++];  } else {  temp[k++] = a[j++];  }  }  while (i <= mid) {  temp[k++] = a[i++];  }  while (j <= high) {  temp[k++] = a[j++];  }  for (let p = 0; p < temp.length; p++) {  a[low + p] = temp[p];  } } // Function to check if array is sorted function isSorted(a) {  for (let i = 0; i < a.length - 1; i++) {  if (a[i] > a[i + 1]) {  return false;  }  }  return true; } // Function to fill array with random numbers function fillData(a) {  for (let i = 0; i < a.length; i++) {  a[i] = Math.floor(Math.random() * 1000);  } } // Function to sort the array using merge sort function sortArray(a low high) {  if (high - low <= 5) {  a.sort((a b) => a - b);  } else {  let mid = low + Math.floor((high - low) / 2);  sortArray(a low mid);  sortArray(a mid + 1 high);  merge(a low mid high);  } } // Main function function main() {  let length = 128;  let a = new Array(length);  fillData(a);  sortArray(a 0 a.length - 1);  if (isSorted(a)) {  console.log('Sorting Done Successfully');  } else {  console.log('Sorting Not Done');  } } main(); 

산출: 
 



Sorting Done Successfully  

시간 복잡도 :O(N log N )

보조 공간:O(N)


성능 개선?  
코드의 시간을 측정하고 성능을 기존 순차 코드와 비교해 보세요. 순차 정렬 성능이 더 좋다는 사실을 알면 놀랄 것입니다! 
왼쪽 자식이 왼쪽 배열에 액세스하면 배열이 프로세서의 캐시에 로드됩니다. 이제 오른쪽 배열에 액세스하면(동시 액세스로 인해) 캐시가 왼쪽 세그먼트로 채워지고 오른쪽 세그먼트가 캐시 메모리에 복사되므로 캐시 누락이 발생합니다. 이 왕복 프로세스는 계속되며 순차 코드보다 성능이 떨어지는 수준으로 성능이 저하됩니다.
코드의 작업 흐름을 제어하여 캐시 누락을 줄이는 방법이 있습니다. 하지만 완전히 피할 수는 없습니다!