NOTICE: The Processors Wiki will End-of-Life in December of 2020. It is recommended to download any files or other content you may need that are hosted on processors.wiki.ti.com. The site is now set to read only.

# Top K Elements Sort For DSP With Index Ordering

This implementation of the Quickselect with Index Ordering algorithm has been created as part of a short study on the optimization of sorting algorithms for Texas Instruments DSPs. If you have any suggestions for further optimizations please do not hesitate to give us feedback.

Refer to our study about Optimized Sort Algorithms For DSP for some benchmark results and comparison with other sort algorithms.

# Top K Elements Sort with Index Ordering

The entry point for this code is the function: `void fast_top_k(SORT_KEY_TYPE * restrict keys, SORT_INDEX_TYPE * restrict indexes, int size, int k);`.

The top `k` elements of both `keys` and `indexes` arrays are sorted by decreasing values of the `keys` array. The remaining elements in both arrays are moved, and their order is most likely altered by the procedure.

This function can be used as a sort function when `k = size`.

This implementation supports the following features/optimizations compared to a basic Quicksort/Quickselect:

• It supports "index ordering".
• It does implement a mechanism to improve performance in case a bad pivot selection is detected (using median of median approach as described on http://en.wikipedia.org/wiki/Selection_algorithm#Linear_general_selection_algorithm_-_Median_of_Medians_algorithm). This prevents the algorithm from going into O(N^2) worst-case performance.
• When doing recursive calls, if the size of the sub-array to be sorted is <= 4, then a simple inline sort function is used rather than further recursive calls. This change alone is responsible for significant improvements in performance vs simple quicksort implementation.
• It does implement an optimization that take advantage of the fact several of the items being sorted could have the same value. This will dramatically reduce the performance if a large number of elements in the array to be sorted have the same value, and it will have no negative performance impacts in other cases.
• It is faster on a DSP than a simple implementation of Quicksort because by using some scratch memory we are able to software pipepline efficiently the "partition" phase of the algorithm.

## Source Code

<syntaxhighlight lang='c'>

1. define SORT_INDEX_TYPE uint32_t
2. define SORT_KEY_TYPE int32_t
3. define SORT_KEY_MIN INT32_MIN

inline int median_of_twenty_five(SORT_KEY_TYPE * restrict array, int left) { int i, j = left; SORT_KEY_TYPE i1, i2, i3, i4, i5; int idx1, idx2, idx3, idx4, idx5; int idx1_tmp, idx4_tmp, idx3_tmp; SORT_KEY_TYPE i1_tmp, i3_tmp, i4_tmp; SORT_KEY_TYPE medians; int medians_idx;

```   /* This function is used to find in an array of 25 elements an element that is fair
approximation of the median value of the array.  This algorithm guarantees that the
median returned has at least 30% of the elements smaller and at least 30% higher.
That is except of course when there are too many items with the same value in
the array, in which case this algorithm loses its "efficiency".  */
```

for (i = 0; i<5;i++) { i1 = array[j]; idx1 = j++; i2 = array[j]; idx2 = j++; i3 = array[j]; idx3 = j++; i4 = array[j]; idx4 = j++; i5 = array[j]; idx5 = j++; if (i1 > i2) { i1_tmp = i1; i1 = i2; i2 = i1_tmp; idx1_tmp = idx1; idx1 = idx2; idx2 = idx1_tmp; } if (i4 > i5) { i4_tmp = i4; i4 = i5; i5 = i4_tmp; idx4_tmp = idx4; idx4 = idx5; idx5 = idx4_tmp; } if (i2 > i3) { i3_tmp = i3; i3 = i2; i2 = i3_tmp; idx3_tmp = idx3; idx3 = idx2; idx2 = idx3_tmp; } if (i3 > i4) { i3_tmp = i3; i3 = i4; i4 = i3_tmp; idx3_tmp = idx3; idx3 = idx4; idx4 = idx3_tmp; } if (i2 > i3) { i3_tmp = i3; i3 = i2; i2 = i3_tmp; idx3_tmp = idx3; idx3 = idx2; idx2 = idx3_tmp; } medians[i] = i3; medians_idx[i] = idx3; } j = 0; i1 = medians[j]; idx1 = medians_idx[j++]; i2 = medians[j]; idx2 = medians_idx[j++]; i3 = medians[j]; idx3 = medians_idx[j++]; i4 = medians[j]; idx4 = medians_idx[j++]; i5 = medians[j]; idx5 = medians_idx[j++]; if (i1 > i2) { i1_tmp = i1; i1 = i2; i2 = i1_tmp; idx1_tmp = idx1; idx1 = idx2; idx2 = idx1_tmp; } if (i4 > i5) { i4_tmp = i4; i4 = i5; i5 = i4_tmp; idx4_tmp = idx4; idx4 = idx5; idx5 = idx4_tmp; } if (i2 > i3) { i3_tmp = i3; i3 = i2; i2 = i3_tmp; idx3_tmp = idx3; idx3 = idx2; idx2 = idx3_tmp; } if (i3 > i4) { i3_tmp = i3; i3 = i4; i4 = i3_tmp; idx3_tmp = idx3; idx3 = idx4; idx4 = idx3_tmp; } if (i2 > i3) { i3_tmp = i3; i3 = i2; i2 = i3_tmp; idx3_tmp = idx3; idx3 = idx2; idx2 = idx3_tmp; } return idx3; }

static uint32_t fast_partition(SORT_KEY_TYPE * restrict ping, SORT_KEY_TYPE * restrict pong, SORT_INDEX_TYPE * restrict iping, SORT_INDEX_TYPE * restrict ipong, int left, int right, int pivotIndex) { SORT_KEY_TYPE pivotValue = ping[pivotIndex]; uint16_t storeIndex, i, rightIndex; uint16_t equalCount = 0; SORT_KEY_TYPE temp; SORT_INDEX_TYPE itemp;

/* This function is the heart of the quicksort algorithm, as it performs the partitioning of the array elements using the pivot it has been given. */

```    temp = ping[pivotIndex]; itemp = iping[pivotIndex];
ping[pivotIndex] = ping[right]; // Move pivot to end
iping[pivotIndex] = iping[right]; // Move pivot to end
ping[right] = temp; iping[right] = itemp;
storeIndex = left;
rightIndex = right;
#pragma MUST_ITERATE(1)
for(i=left;i<right;i++) {
if (ping[i] > pivotValue) {
pong[storeIndex] = ping[i];
ipong[storeIndex++] = iping[i];
} else {
pong[rightIndex] = ping[i];
ipong[rightIndex--] = iping[i];
}
/* Here we keep track of how many items of value equal to the pivot
are part of the data being partitioned.  Keeping track of this
"equalCount" allows us to implement an optimization for the case
several items have the same value.  */
if (ping[i] == pivotValue) equalCount++;
}
pong[storeIndex] = ping[right];
ipong[storeIndex] = iping[right];
/* Copy pivotValue back into the ping buffer.  */
ping[storeIndex] = pong[storeIndex];
iping[storeIndex] = ipong[storeIndex];
/* The "equalCount" is packed in the return value along with the new pivotIndex.  */
return ((uint32_t)storeIndex | (uint32_t)equalCount << 16);
```

}

inline void fast_small_sort(SORT_KEY_TYPE * restrict ping, SORT_KEY_TYPE * restrict pong, SORT_INDEX_TYPE * restrict iping, SORT_INDEX_TYPE * restrict ipong, int left, int right, int k, int depth) { SORT_KEY_TYPE * restrict target; SORT_INDEX_TYPE * restrict itarget; SORT_KEY_TYPE i1, i2, i3, i4, i_temp; SORT_INDEX_TYPE idx1, idx2, idx3, idx4, idx_temp;

/* This function is called to sort small arrays of 4 or less elements. Using this function rather than letting the quicksort code recurse on arrays of size 4, 3 and 2 is leading to some noticeable performance gains. */ if ((right == left) && (depth & 1)) { pong[right] = ping[right]; ipong[right] = iping[right]; return; } if (depth & 1) { target = pong; itarget = ipong; } else { target = ping; itarget = iping; } switch(right - left + 1) { case 2: i1 = ping[left]; i2 = ping[right]; idx1 = iping[left]; idx2 = iping[right]; if (i1 < i2) { i_temp = i1; idx_temp = idx1; i1 = i2; idx1 = idx2; i2 = i_temp; idx2 = idx_temp; } target[left] = i1; target[right] = i2; itarget[left] = idx1; itarget[right] = idx2; break; case 3: i1 = ping[left]; i2 = ping[left+1]; i3 = ping[right]; idx1 = iping[left]; idx2 = iping[left+1]; idx3 = iping[right]; /* Sort the three entries. */ if (i2 > i1) { i_temp = i1; idx_temp = idx1; i1 = i2; idx1 = idx2; i2 = i_temp; idx2 = idx_temp; } if (i3 > i2) { i_temp = i2; idx_temp = idx2; i2 = i3; idx2 = idx3; i3 = i_temp; idx3 = idx_temp; } if (i2 > i1) { i_temp = i1; idx_temp = idx1; i1 = i2; idx1 = idx2; i2 = i_temp; idx2 = idx_temp; } /* Write the output. */ target[left] = i1; target[left+1] = i2; target[right] = i3; itarget[left] = idx1; itarget[left+1] = idx2; itarget[right] = idx3; break; case 4: i1 = ping[left]; i2 = ping[left+1]; i3 = ping[left+2]; i4 = ping[right]; idx1 = iping[left]; idx2 = iping[left+1]; idx3 = iping[left+2]; idx4 = iping[right]; /* Sort the four entries. */ if (i2 > i1) { i_temp = i1; idx_temp = idx1; i1 = i2; idx1 = idx2; i2 = i_temp; idx2 = idx_temp; } if (i4 > i3) { i_temp = i4; idx_temp = idx4; i4 = i3; idx4 = idx3; i3 = i_temp; idx3 = idx_temp; } if (i3 > i2) { i_temp = i2; idx_temp = idx2; i2 = i3; idx2 = idx3; i3 = i_temp; idx3 = idx_temp; } if (i2 > i1) { i_temp = i1; idx_temp = idx1; i1 = i2; idx1 = idx2; i2 = i_temp; idx2 = idx_temp; } if (i4 > i3) { i_temp = i4; idx_temp = idx4; i4 = i3; idx4 = idx3; i3 = i_temp; idx3 = idx_temp; } if (i3 > i2) { i_temp = i2; idx_temp = idx2; i2 = i3; idx2 = idx3; i3 = i_temp; idx3 = idx_temp; } /* Write the output. */ target[left] = i1; target[left+1] = i2; target[left+2] = i3; target[right] = i4; itarget[left] = idx1; itarget[left+1] = idx2; itarget[left+2] = idx3; itarget[right] = idx4; break; } }

static uint32_t partition_same_values(SORT_KEY_TYPE * restrict keys, SORT_INDEX_TYPE * restrict values, SORT_KEY_TYPE * restrict orig_keys, SORT_INDEX_TYPE * restrict orig_values, SORT_KEY_TYPE pivotValue, int size) { uint32_t storeIndex = 0, position; SORT_KEY_TYPE * restrict l_keys = keys; SORT_INDEX_TYPE * restrict l_values = values;

/* This function is used whenever it has been detected that several items have the same value. */ for(position = 0; position < size; position++) { if (keys[position] == pivotValue) { SORT_KEY_TYPE key_tmp = l_keys[storeIndex]; SORT_INDEX_TYPE val_tmp = l_values[storeIndex]; orig_keys[storeIndex] = keys[position]; orig_values[storeIndex++] = values[position]; keys[position] = key_tmp; values[position] = val_tmp; } } return storeIndex; }

static void fast_top_k_aux(SORT_KEY_TYPE * restrict ping, SORT_KEY_TYPE * restrict pong, SORT_INDEX_TYPE * restrict iping, SORT_INDEX_TYPE * restrict ipong, int left, int right, int k, int depth, int alert) { uint32_t pivotIndex, pivotNewIndex; uint32_t equalCount; int i, new_alert = alert; SORT_KEY_TYPE pivotValue;

/* Select a pivot index. */ if ((right - left) > 25 && alert) { /* In case an alert has been previously raised we use a median-based pivot selection algorithm rather than just take the middle point. This is to avoid worst-case scenarios where bad pivots are selected one after the other. */ pivotIndex = median_of_twenty_five(ping, left); } else { /* If no alert has been raised, or if the number of items is now too small we just pick the middle of the array as the pivot. */ pivotIndex = (left+right)/2; } pivotValue = ping[pivotIndex]; pivotNewIndex = fast_partition(ping, pong, iping, ipong, left, right, pivotIndex); equalCount = (pivotNewIndex & 0xffff0000) >> 16; pivotNewIndex = (pivotNewIndex & 0x0000ffff); /* equalCount is the number of items that had the same value as the pivot during the partitioning. */ if (equalCount > 1) { /* Take care of all the items that have the same value as the pivot. */ if (depth & 1) { partition_same_values(pong+pivotNewIndex+1, ipong+pivotNewIndex+1, pong+pivotNewIndex+1, ipong+pivotNewIndex+1, pivotValue, right - pivotNewIndex); } else { partition_same_values(pong+pivotNewIndex+1, ipong+pivotNewIndex+1, ping+pivotNewIndex+1, iping+pivotNewIndex+1, pivotValue, right - pivotNewIndex); } } else if ((right - left>31) && ((6 * (pivotNewIndex - left)) < (right - left) || (6 * (right - pivotNewIndex)) < (right - left)) ) { new_alert = 1; } /* Do a real quicksort on the first k entries. */ if ((pivotNewIndex - left) <= 4) { /* If there are 4 items or less to be sorted on the left of the new pivot then use an optimized small sort function. */ fast_small_sort(pong, ping, ipong, iping, left, pivotNewIndex - 1, k, depth+1); } else { /* If there are more than 4 items to be sorted on the left of the new pivot then we recurse. */ fast_top_k_aux(pong, ping, ipong, iping, left, pivotNewIndex - 1, k, depth+1, new_alert); } if (pivotNewIndex + equalCount > k) { /* Do nothing with the right-hand side if the new pivot is beyond the first k items. */ if (depth & 1) { for (i=pivotNewIndex+equalCount+1;i<=right;i++) { /* If needed we copy the sorted values from the scratch memory to the original memory. */ pong[i] = ping[i]; ipong[i] = iping[i]; } } } else { /* Only keep recursing on the right-hand side if the pivot is on the left of the k-th items. */ if ((right - pivotNewIndex - equalCount) <= 4) { /* If there are 4 items or less to be sorted on the right of the new pivot then use an optimized small sort function. */ fast_small_sort(pong, ping, ipong, iping, pivotNewIndex + equalCount + 1, right, k, depth+1); } else { /* If there are more than 4 items to be sorted on the right of the new pivot then we recurse. */ fast_top_k_aux(pong, ping, ipong, iping, pivotNewIndex + equalCount + 1, right, k, depth+1, new_alert); } } }

1. pragma DATA_ALIGN(scratch,64);

static SORT_KEY_TYPE scratch; /* Scratch memory used by the algorithm, this is NOT re-entrant. */ void fast_top_k(SORT_KEY_TYPE * restrict keys, SORT_INDEX_TYPE * restrict indexes, int size, int k) {

if (size > 4) { fast_top_k_aux(keys, scratch, indexes, (SORT_INDEX_TYPE *)(scratch+size), 0, size - 1, k, 0, 0 /* No alert for now. */); } else { /* Use a simple fast sort algorithm for small arrays (size <= 4). */ fast_small_sort(keys, scratch, indexes, (SORT_INDEX_TYPE *)(scratch+size), 0, size - 1, k, 0); } } </syntaxhighlight>

# Optimized Maximum Search function

The function `int find_max_position(SORT_KEY_TYPE * restrict array, int size);` return the index of the maximum value in an array `array` of size `size`.

<syntaxhighlight lang='c'> int find_max_position(SORT_KEY_TYPE * restrict array, int size) { int i; SORT_KEY_TYPE * restrict array1 = array; SORT_KEY_TYPE * restrict array2 = array + size/4; SORT_KEY_TYPE * restrict array3 = array + 2*(size/4); SORT_KEY_TYPE * restrict array4 = array + 3*(size/4); SORT_KEY_TYPE max1 = SORT_KEY_MIN, max2 = SORT_KEY_MIN, max3 = SORT_KEY_MIN, max4 = SORT_KEY_MIN; int max_idx = 0, max1_idx = 0, max2_idx = 0, max3_idx = 0, max4_idx = 0; /* Do 4 max searches in parallel. */ for(i=0;i<size/4;i++) { if (array1[i] > max1) {max1 = array1[i]; max1_idx = i;} if (array2[i] > max2) {max2 = array2[i]; max2_idx = i;} if (array3[i] > max3) {max3 = array3[i]; max3_idx = i;} if (array4[i] > max4) {max4 = array4[i]; max4_idx = i;} } /* These index calculations are moved out of the loop to keep the kernel small. */ max2_idx += size/4; max3_idx += 2*(size/4); max4_idx += 3*(size/4); /* Find the max of the last 3-2-1 elements in the array. */ switch (size & 0x3) { case 3: if (array[size-3] > max3) {max3 = array[size-3]; max3_idx = size-3;} case 2: if (array[size-2] > max2) {max2 = array[size-2]; max2_idx = size-2;} case 1: if (array[size-1] > max1) {max1 = array[size-1]; max1_idx = size-1;} } if (max1 > max2) {max2 = max1; max2_idx = max1_idx;} if (max3 > max4) {max4 = max3; max4_idx = max3_idx;} if (max2 > max4) { max_idx = max2_idx; } else { max_idx = max4_idx; } return max_idx; }

</syntaxhighlight> {{ switchcategory:MultiCore= For technical support on MultiCore devices, please post your questions in the C6000 MultiCore Forum For questions related to the BIOS MultiCore SDK (MCSDK), please use the BIOS Forum Please post only comments related to the article Top K Elements Sort For DSP With Index Ordering here. Keystone= For technical support on MultiCore devices, please post your questions in the C6000 MultiCore Forum For questions related to the BIOS MultiCore SDK (MCSDK), please use the BIOS Forum Please post only comments related to the article Top K Elements Sort For DSP With Index Ordering here. C2000=For technical support on the C2000 please post your questions on The C2000 Forum. Please post only comments about the article Top K Elements Sort For DSP With Index Ordering here. DaVinci=For technical support on DaVincoplease post your questions on The DaVinci Forum. Please post only comments about the article Top K Elements Sort For DSP With Index Ordering here. MSP430=For technical support on MSP430 please post your questions on The MSP430 Forum. Please post only comments about the article Top K Elements Sort For DSP With Index Ordering here. OMAP35x=For technical support on OMAP please post your questions on The OMAP Forum. Please post only comments about the article Top K Elements Sort For DSP With Index Ordering here. OMAPL1=For technical support on OMAP please post your questions on The OMAP Forum. Please post only comments about the article Top K Elements Sort For DSP With Index Ordering here. MAVRK=For technical support on MAVRK please post your questions on The MAVRK Toolbox Forum. Please post only comments about the article Top K Elements Sort For DSP With Index Ordering here. For technical support please post your questions at http://e2e.ti.com. Please post only comments about the article Top K Elements Sort For DSP With Index Ordering here. }}