Top K Elements Sort For DSP With Index Ordering

From Texas Instruments Wiki
Jump to: navigation, search

This implementation of the Quickselect with Index Ordering algorithm has been created as part of a short study on the optimization of sorting algorithms for Texas Instruments DSPs. If you have any suggestions for further optimizations please do not hesitate to give us feedback.

Refer to our study about Optimized Sort Algorithms For DSP for some benchmark results and comparison with other sort algorithms.

Top K Elements Sort with Index Ordering

The entry point for this code is the function: void fast_top_k(SORT_KEY_TYPE * restrict keys, SORT_INDEX_TYPE * restrict indexes, int size, int k);.

The top k elements of both keys and indexes arrays are sorted by decreasing values of the keys array. The remaining elements in both arrays are moved, and their order is most likely altered by the procedure.

This function can be used as a sort function when k = size.

This implementation supports the following features/optimizations compared to a basic Quicksort/Quickselect:

  • It supports "index ordering".
  • It does implement a mechanism to improve performance in case a bad pivot selection is detected (using median of median approach as described on http://en.wikipedia.org/wiki/Selection_algorithm#Linear_general_selection_algorithm_-_Median_of_Medians_algorithm). This prevents the algorithm from going into O(N^2) worst-case performance.
  • When doing recursive calls, if the size of the sub-array to be sorted is <= 4, then a simple inline sort function is used rather than further recursive calls. This change alone is responsible for significant improvements in performance vs simple quicksort implementation.
  • It does implement an optimization that take advantage of the fact several of the items being sorted could have the same value. This will dramatically reduce the performance if a large number of elements in the array to be sorted have the same value, and it will have no negative performance impacts in other cases.
  • It is faster on a DSP than a simple implementation of Quicksort because by using some scratch memory we are able to software pipepline efficiently the "partition" phase of the algorithm.

Source Code

#define SORT_INDEX_TYPE uint32_t
#define SORT_KEY_TYPE   int32_t
#define SORT_KEY_MIN INT32_MIN
 
inline int median_of_twenty_five(SORT_KEY_TYPE * restrict array, int left) {
	int i, j = left;
	SORT_KEY_TYPE i1, i2, i3, i4, i5;
	int idx1, idx2, idx3, idx4, idx5;
	int idx1_tmp, idx4_tmp, idx3_tmp;
	SORT_KEY_TYPE i1_tmp, i3_tmp, i4_tmp;
	SORT_KEY_TYPE medians[5];
	int medians_idx[5];
 
    /* This function is used to find in an array of 25 elements an element that is fair
       approximation of the median value of the array.  This algorithm guarantees that the
       median returned has at least 30% of the elements smaller and at least 30% higher.
       That is except of course when there are too many items with the same value in
       the array, in which case this algorithm loses its "efficiency".  */
	for (i = 0; i<5;i++) {
		i1 = array[j]; idx1 = j++;
		i2 = array[j]; idx2 = j++;
		i3 = array[j]; idx3 = j++;
		i4 = array[j]; idx4 = j++;
		i5 = array[j]; idx5 = j++;
		if (i1 > i2) { i1_tmp = i1; i1 = i2; i2 = i1_tmp; idx1_tmp = idx1; idx1 = idx2; idx2 = idx1_tmp; }
		if (i4 > i5) { i4_tmp = i4; i4 = i5; i5 = i4_tmp; idx4_tmp = idx4; idx4 = idx5; idx5 = idx4_tmp; }
		if (i2 > i3) { i3_tmp = i3; i3 = i2; i2 = i3_tmp; idx3_tmp = idx3; idx3 = idx2; idx2 = idx3_tmp; }
		if (i3 > i4) { i3_tmp = i3; i3 = i4; i4 = i3_tmp; idx3_tmp = idx3; idx3 = idx4; idx4 = idx3_tmp; }
		if (i2 > i3) { i3_tmp = i3; i3 = i2; i2 = i3_tmp; idx3_tmp = idx3; idx3 = idx2; idx2 = idx3_tmp; }
		medians[i] = i3;
		medians_idx[i] = idx3;
	}
	j = 0;
	i1 = medians[j]; idx1 = medians_idx[j++];
	i2 = medians[j]; idx2 = medians_idx[j++];
	i3 = medians[j]; idx3 = medians_idx[j++];
	i4 = medians[j]; idx4 = medians_idx[j++];
	i5 = medians[j]; idx5 = medians_idx[j++];
	if (i1 > i2) { i1_tmp = i1; i1 = i2; i2 = i1_tmp; idx1_tmp = idx1; idx1 = idx2; idx2 = idx1_tmp; }
	if (i4 > i5) { i4_tmp = i4; i4 = i5; i5 = i4_tmp; idx4_tmp = idx4; idx4 = idx5; idx5 = idx4_tmp; }
	if (i2 > i3) { i3_tmp = i3; i3 = i2; i2 = i3_tmp; idx3_tmp = idx3; idx3 = idx2; idx2 = idx3_tmp; }
	if (i3 > i4) { i3_tmp = i3; i3 = i4; i4 = i3_tmp; idx3_tmp = idx3; idx3 = idx4; idx4 = idx3_tmp; }
	if (i2 > i3) { i3_tmp = i3; i3 = i2; i2 = i3_tmp; idx3_tmp = idx3; idx3 = idx2; idx2 = idx3_tmp; }
	return idx3;
}
 
 
static uint32_t fast_partition(SORT_KEY_TYPE * restrict ping, SORT_KEY_TYPE * restrict pong, SORT_INDEX_TYPE * restrict iping, SORT_INDEX_TYPE * restrict ipong, int left, int right, int pivotIndex) {
	 SORT_KEY_TYPE pivotValue = ping[pivotIndex];
	 uint16_t storeIndex, i, rightIndex;
	 uint16_t equalCount = 0;
	 SORT_KEY_TYPE temp;
	 SORT_INDEX_TYPE itemp;
 
	 /* This function is the heart of the quicksort algorithm, as it performs the partitioning of
	    the array elements using the pivot it has been given.  */
     temp = ping[pivotIndex]; itemp = iping[pivotIndex];
     ping[pivotIndex] = ping[right]; // Move pivot to end
     iping[pivotIndex] = iping[right]; // Move pivot to end
     ping[right] = temp; iping[right] = itemp;
     storeIndex = left;
     rightIndex = right;
     #pragma MUST_ITERATE(1)
     for(i=left;i<right;i++) {
     	if (ping[i] > pivotValue) {
     		pong[storeIndex] = ping[i];
     		ipong[storeIndex++] = iping[i];
     	} else {
     		pong[rightIndex] = ping[i];
     		ipong[rightIndex--] = iping[i];
     	}
     	/* Here we keep track of how many items of value equal to the pivot
     	   are part of the data being partitioned.  Keeping track of this
     	   "equalCount" allows us to implement an optimization for the case
     	   several items have the same value.  */
     	if (ping[i] == pivotValue) equalCount++;
     }
     pong[storeIndex] = ping[right];
     ipong[storeIndex] = iping[right];
     /* Copy pivotValue back into the ping buffer.  */
     ping[storeIndex] = pong[storeIndex];
     iping[storeIndex] = ipong[storeIndex];
     /* The "equalCount" is packed in the return value along with the new pivotIndex.  */
     return ((uint32_t)storeIndex | (uint32_t)equalCount << 16);
}
 
inline void fast_small_sort(SORT_KEY_TYPE * restrict ping, SORT_KEY_TYPE * restrict pong, SORT_INDEX_TYPE * restrict iping, SORT_INDEX_TYPE * restrict ipong, int left, int right, int k, int depth) {
	SORT_KEY_TYPE * restrict target;
	SORT_INDEX_TYPE * restrict itarget;
	SORT_KEY_TYPE i1, i2, i3, i4, i_temp;
	SORT_INDEX_TYPE idx1, idx2, idx3, idx4, idx_temp;
 
	/* This function is called to sort small arrays of 4 or less elements.
	   Using this function rather than letting the quicksort code recurse
	   on arrays of size 4, 3 and 2 is leading to some noticeable
	   performance gains.  */
	if ((right == left) && (depth & 1)) {
		pong[right] = ping[right];
		ipong[right] = iping[right];
		return;
	}
	if (depth & 1) {
		target = pong; itarget = ipong;
	} else {
		target = ping; itarget = iping;
	}
	switch(right - left + 1) {
	case 2:
		i1 = ping[left]; i2 = ping[right];
		idx1 = iping[left]; idx2 = iping[right];
		if (i1 < i2) {
			i_temp = i1; idx_temp = idx1;
			i1 = i2; idx1 = idx2;
			i2 = i_temp; idx2 = idx_temp;
		}
		target[left] = i1; target[right] = i2;
		itarget[left] = idx1; itarget[right] = idx2;
		break;
	case 3:
		i1 = ping[left]; i2 = ping[left+1]; i3 = ping[right];
		idx1 = iping[left]; idx2 = iping[left+1]; idx3 = iping[right];
		/* Sort the three entries.  */
		if (i2 > i1) {
			i_temp = i1; idx_temp = idx1;
			i1 = i2; idx1 = idx2;
			i2 = i_temp; idx2 = idx_temp;
		}
		if (i3 > i2) {
			i_temp = i2; idx_temp = idx2;
			i2 = i3; idx2 = idx3;
			i3 = i_temp; idx3 = idx_temp;
		}
		if (i2 > i1) {
			i_temp = i1; idx_temp = idx1;
			i1 = i2; idx1 = idx2;
			i2 = i_temp; idx2 = idx_temp;
		}
		/* Write the output.  */
		target[left] = i1; target[left+1] = i2; target[right] = i3;
		itarget[left] = idx1; itarget[left+1] = idx2; itarget[right] = idx3;
		break;
	case 4:
		i1 = ping[left]; i2 = ping[left+1]; i3 = ping[left+2]; i4 = ping[right];
		idx1 = iping[left]; idx2 = iping[left+1]; idx3 = iping[left+2]; idx4 = iping[right];
		/* Sort the four entries.  */
		if (i2 > i1) {
			i_temp = i1; idx_temp = idx1;
			i1 = i2; idx1 = idx2;
			i2 = i_temp; idx2 = idx_temp;
		}
		if (i4 > i3) {
			i_temp = i4; idx_temp = idx4;
			i4 = i3; idx4 = idx3;
			i3 = i_temp; idx3 = idx_temp;
		}
		if (i3 > i2) {
			i_temp = i2; idx_temp = idx2;
			i2 = i3; idx2 = idx3;
			i3 = i_temp; idx3 = idx_temp;
		}
		if (i2 > i1) {
			i_temp = i1; idx_temp = idx1;
			i1 = i2; idx1 = idx2;
			i2 = i_temp; idx2 = idx_temp;
		}
		if (i4 > i3) {
			i_temp = i4; idx_temp = idx4;
			i4 = i3; idx4 = idx3;
			i3 = i_temp; idx3 = idx_temp;
		}
		if (i3 > i2) {
			i_temp = i2; idx_temp = idx2;
			i2 = i3; idx2 = idx3;
			i3 = i_temp; idx3 = idx_temp;
		}
		/* Write the output.  */
		target[left] = i1; target[left+1] = i2; target[left+2] = i3;  target[right] = i4;
		itarget[left] = idx1; itarget[left+1] = idx2; itarget[left+2] = idx3;  itarget[right] = idx4;
		break;
	}
}
 
static uint32_t partition_same_values(SORT_KEY_TYPE * restrict keys, SORT_INDEX_TYPE * restrict values, SORT_KEY_TYPE * restrict orig_keys, SORT_INDEX_TYPE * restrict orig_values, SORT_KEY_TYPE pivotValue, int size) {
	uint32_t storeIndex = 0, position;
	SORT_KEY_TYPE * restrict l_keys = keys;
	SORT_INDEX_TYPE * restrict l_values = values;
 
	/* This function is used whenever it has been detected that several items
	   have the same value.  */
	for(position = 0; position < size; position++) {
		if (keys[position] == pivotValue) {
			SORT_KEY_TYPE key_tmp = l_keys[storeIndex];
			SORT_INDEX_TYPE val_tmp = l_values[storeIndex];
			orig_keys[storeIndex] = keys[position];
			orig_values[storeIndex++] = values[position];
			keys[position] = key_tmp;
			values[position] = val_tmp;
		}
	}
	return storeIndex;
}
 
static void fast_top_k_aux(SORT_KEY_TYPE * restrict ping, SORT_KEY_TYPE * restrict pong, SORT_INDEX_TYPE * restrict iping, SORT_INDEX_TYPE * restrict ipong, int left, int right, int k, int depth, int alert) {
	uint32_t pivotIndex, pivotNewIndex;
	uint32_t equalCount;
	int i, new_alert = alert;
	SORT_KEY_TYPE pivotValue;
 
	/* Select a pivot index. */
	if ((right - left) > 25 && alert) {
		/* In case an alert has been previously raised we use a median-based
		   pivot selection algorithm rather than just take the middle point.
		   This is to avoid worst-case scenarios where bad pivots are selected
		   one after the other.	 */
		pivotIndex = median_of_twenty_five(ping, left);
	} else {
		/* If no alert has been raised, or if the number of items is now too small
		   we just pick the middle of the array as the pivot.  */ 
		pivotIndex = (left+right)/2;
	}
	pivotValue = ping[pivotIndex];
	pivotNewIndex = fast_partition(ping, pong, iping, ipong, left, right, pivotIndex);
	equalCount = (pivotNewIndex & 0xffff0000) >> 16;
	pivotNewIndex = (pivotNewIndex & 0x0000ffff);
	/* equalCount is the number of items that had the same value as the pivot during
	   the partitioning.  */
	if (equalCount > 1) {
		/* Take care of all the items that have the same value as the  pivot.  */
	 	if (depth & 1) {
			partition_same_values(pong+pivotNewIndex+1, ipong+pivotNewIndex+1,
										  pong+pivotNewIndex+1, ipong+pivotNewIndex+1,
										  pivotValue, right - pivotNewIndex);
	 	} else {
			partition_same_values(pong+pivotNewIndex+1, ipong+pivotNewIndex+1,
										  ping+pivotNewIndex+1, iping+pivotNewIndex+1,
										  pivotValue, right - pivotNewIndex);
	 	}
	} else if ((right - left>31)
		&& ((6 * (pivotNewIndex - left)) < (right - left)
		|| (6 * (right - pivotNewIndex)) < (right - left)) ) {
		new_alert = 1;
	}
	/* Do a real quicksort on the first k entries.  */
	if ((pivotNewIndex - left) <= 4) {
	    /* If there are 4 items or less to be sorted on the left of the new pivot then
	       use an optimized small sort function.  */
	 	fast_small_sort(pong, ping, ipong, iping, left, pivotNewIndex - 1, k, depth+1);
	} else {
	 	/* If there are more than 4 items to be sorted on the left of the new pivot then
	 	   we recurse.  */
	 	fast_top_k_aux(pong, ping, ipong, iping, left, pivotNewIndex - 1, k, depth+1, new_alert);
	}
	if (pivotNewIndex + equalCount > k) {
	 	/* Do nothing with the right-hand side if the new pivot is beyond the first k items.  */
	 	if (depth & 1) {
	     	for (i=pivotNewIndex+equalCount+1;i<=right;i++) {
	     		/* If needed we copy the sorted values from the scratch memory to the original memory.  */
				pong[i] = ping[i];
				ipong[i] = iping[i];
	     	}
	 	}
	} else {
	 	/* Only keep recursing on the right-hand side if the pivot is on the left of the k-th items.  */
	 	if ((right - pivotNewIndex - equalCount) <= 4) {
	        /* If there are 4 items or less to be sorted on the right of the new pivot then
	           use an optimized small sort function.  */
			fast_small_sort(pong, ping, ipong, iping, pivotNewIndex + equalCount + 1, right, k, depth+1);
	 	} else {
	  	 	/* If there are more than 4 items to be sorted on the right of the new pivot then
	  	 	   we recurse.  */
			fast_top_k_aux(pong, ping, ipong, iping, pivotNewIndex + equalCount + 1, right, k, depth+1, new_alert);
	 	}
	}
}
 
 
 
#pragma DATA_ALIGN(scratch,64);
static SORT_KEY_TYPE scratch[2048]; /* Scratch memory used by the algorithm, this is NOT re-entrant. */
void fast_top_k(SORT_KEY_TYPE * restrict keys, SORT_INDEX_TYPE * restrict indexes, int size, int k) {
 
	if (size > 4) {
		fast_top_k_aux(keys, scratch, indexes, (SORT_INDEX_TYPE *)(scratch+size), 0, size - 1, k, 0, 0 /* No alert for now.  */);
	} else {
		/* Use a simple fast sort algorithm for small arrays (size <= 4).  */
		fast_small_sort(keys, scratch, indexes, (SORT_INDEX_TYPE *)(scratch+size), 0, size - 1, k, 0);
	}
}

Optimized Maximum Search function

The function int find_max_position(SORT_KEY_TYPE * restrict array, int size); return the index of the maximum value in an array array of size size.

int find_max_position(SORT_KEY_TYPE * restrict array, int size) {
	int i;
	SORT_KEY_TYPE * restrict array1 = array;
	SORT_KEY_TYPE * restrict array2 = array + size/4;
	SORT_KEY_TYPE * restrict array3 = array + 2*(size/4);
	SORT_KEY_TYPE * restrict array4 = array + 3*(size/4);
	SORT_KEY_TYPE max1 = SORT_KEY_MIN, max2 = SORT_KEY_MIN, max3 = SORT_KEY_MIN, max4 = SORT_KEY_MIN;
	int max_idx = 0, max1_idx = 0, max2_idx = 0, max3_idx = 0, max4_idx = 0;
	/* Do 4 max searches in parallel.  */
	for(i=0;i<size/4;i++) {
		if (array1[i] > max1) {max1 = array1[i]; max1_idx = i;}
		if (array2[i] > max2) {max2 = array2[i]; max2_idx = i;}
		if (array3[i] > max3) {max3 = array3[i]; max3_idx = i;}
		if (array4[i] > max4) {max4 = array4[i]; max4_idx = i;}
	}
	/* These index calculations are moved out of the loop to keep the kernel small. */
	max2_idx += size/4;
	max3_idx += 2*(size/4);
	max4_idx += 3*(size/4);
	/* Find the max of the last 3-2-1 elements in the array.  */
	switch (size & 0x3) {
	case 3:
		if (array[size-3] > max3) {max3 = array[size-3]; max3_idx = size-3;}
	case 2:
		if (array[size-2] > max2) {max2 = array[size-2]; max2_idx = size-2;}
	case 1:
		if (array[size-1] > max1) {max1 = array[size-1]; max1_idx = size-1;}
	}
	if (max1 > max2) {max2 = max1; max2_idx = max1_idx;}
	if (max3 > max4) {max4 = max3; max4_idx = max3_idx;}
	if (max2 > max4) {
		max_idx = max2_idx;
	} else {
		max_idx = max4_idx;
	}
	return max_idx;
}