Проблема с потоками во встроенной системе

У меня 4 ядра Rocket RISC-V и каждое подключено к 4 одинаковым ускорителям. Я пытаюсь использовать многопоточность для одновременного запуска всех или некоторых ускорителей и сравнения результатов производительности между ними. Мне нужно измерить время между тем, когда первый запущенный поток начинает свою работу, и до того, как последний запущенный поток завершит свою задачу, используя счетчики csr. Задача для всех потоков одинакова и они не зависят от других.

Однако я изо всех сил пытаюсь измерить правильное время и вижу, что потоки запускаются один за другим, а не одновременно (я не ожидаю, что они начнутся точно в одно и то же время, но мне нужно увидеть некоторое перекрытие).

Что я вижу например для двух потоков на одном процессоре за 10 тестов:

Thread 0 started at 12612628151 and ended at 12621280072
Thread 1 started at 16418324070 and ended at 16426815938

или два потока на двух процессорах:

Thread 0 started at 12599644918 and ended at 12608560166
Thread 1 started at 15271446547 and ended at 15280014314
// expecting Thread 1 starts at 12599900000 for example

Как правильно измерить время? Вот мой код:

#define _GNU_SOURCE

#include <pthread.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <errno.h>
#include <math.h>
#include <limits.h>
#include <stdbool.h>
#include <time.h>


#include "encoding.h"
#include "rocc.h"
#include "data/tests.h"

//----------------------------------------------------------------\\
//------------------  UTILITY-------------------------------------\\
//----------------------------------------------------------------\\

uint64_t freq = 25000000; // 25 MHz
volatile uint64_t total_tasks = 0;
volatile double total_time = 0.0;
uint64_t total_clk_cycles = 0;
uint64_t total_insts = 0;
double inst_per_cycles = 0.0;


void print_measurments(double total_time) {
    double tasks_per_second = total_tasks / total_time;
    printf("\nTotal number of instructions: %lu \n", total_insts);
    printf("Total number of cycles: %lu \n", total_clk_cycles);
    printf("Total number of tasks finished: %lu \n", total_tasks);
    printf("Tasks per second with gettime(): %.5f\n", tasks_per_second);
    printf("Tasks per second with csr_cycles(): %f\n", (double)total_tasks / ((double)total_clk_cycles / (double)freq));
}


uint64_t concatenate_ints(unsigned int x, unsigned int y) {
    return ((uint64_t)x << 32) | y;
}

uint64_t concatenate_3ints(uint64_t x, uint64_t y, uint64_t z) {
    return ((uint64_t)x << 48) | (((uint64_t)y << 32) | z);
}

uint64_t concatenate_arrays(int64_t* x, int64_t* y) {
    return ((uint64_t)x << 32) | (uint64_t)y;
}

//################################################################\\

//----------------------------------------------------------------\\
//------------------  THREAD -------------------------------------\\
//----------------------------------------------------------------\\

#define handle_error_en(en, msg) \
       do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)

#define handle_error(msg) \
       do { perror(msg); exit(EXIT_FAILURE); } while (0)

pthread_mutex_t timing_mutex = PTHREAD_MUTEX_INITIALIZER;

struct timespec start_time, end_time;

struct thread_info {
    pthread_t thread_id;
    int thread_num;
    int res;
    int cpumask;
    int accelid;
    test_struct* tests;
    size_t num_tests;
    uint64_t start_time;
    uint64_t end_time;
};

int run(struct thread_info *tinfo);
static void *thread_start(void *arg) {
    struct thread_info *tinfo = arg;
    tinfo->res = run(tinfo);
    return (void *) &(tinfo->res);
}

void initialize_threads(struct thread_info *tinfo, int nthreads, char *argv[], test_struct* tests, size_t num_tests) {
    pthread_attr_t attr;
    int s;

    s = pthread_attr_init(&attr);
    if (s != 0) {
        handle_error_en(s, "pthread_attr_init");
    }

    size_t size = PTHREAD_STACK_MIN + 0x1000000;
    s = pthread_attr_setstacksize(&attr, size);
    if (s != 0) {
        handle_error_en(s, "pthread_attr_setstacksize");
    }

    for (int tnum = 0; tnum < nthreads; tnum++) {
        tinfo[tnum].thread_num = tnum;
        tinfo[tnum].cpumask = atoi(argv[tnum + 2]);
        tinfo[tnum].accelid = tnum; // Match accelid with thread_num
        tinfo[tnum].tests = tests;
        tinfo[tnum].num_tests = num_tests;
        s = pthread_create(&tinfo[tnum].thread_id, &attr, &thread_start, &tinfo[tnum]);
        if (s != 0) {
            handle_error_en(s, "pthread_create");
        }
    }

    s = pthread_attr_destroy(&attr);
    if (s != 0) {
        handle_error_en(s, "pthread_attr_destroy");
    }
}

void join_threads(struct thread_info *tinfo, int nthreads) {
    int s;
    void *tres;

    for (int tnum = 0; tnum < nthreads; tnum++) {
        s = pthread_join(tinfo[tnum].thread_id, &tres);
        if (s != 0) {
            handle_error_en(s, "pthread_join");
        }
    }
}

void set_thread_affinity(pthread_t thread, int cpumask) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);

    for (int i = 0; i < 3; ++i) {
        if (cpumask & (1 << i)) {
            CPU_SET(i, &cpuset);
        }
    }

    int result = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
    if (result != 0) {
        perror("pthread_setaffinity_np");
    }
}

void record_start_time(struct thread_info *tinfo) {
    tinfo->start_time = rdcycle();
}

void record_end_time(struct thread_info *tinfo) {
    tinfo->end_time = rdcycle();
    pthread_mutex_lock(&timing_mutex);
    //total_clk_cycles += (tinfo->end_time - tinfo->start_time);
    pthread_mutex_unlock(&timing_mutex);
}

uint64_t th0_start_cycle = 0, th0_end_cycle = 0;
uint64_t th1_start_cycle = 0, th1_end_cycle = 0;
uint64_t th2_start_cycle = 0, th2_end_cycle = 0;
uint64_t th3_start_cycle = 0, th3_end_cycle = 0;

uint64_t max4(uint64_t a, uint64_t b, uint64_t c, uint64_t d) {
     uint64_t max_value = 0;

    if (a != 0) {
        max_value = a;
    }
    if (b != 0 && b > max_value) {
        max_value = b;
    }
    if (c != 0 && c > max_value) {
        max_value = c;
    }
    if (d != 0 && d > max_value) {
        max_value = d;
    }

    return max_value;
}
uint64_t min4(uint64_t a, uint64_t b, uint64_t c, uint64_t d) {
    uint64_t min_value = UINT64_MAX;

    if (a != 0) {
        min_value = a;
    }
    if (b != 0 && b < min_value) {
        min_value = b;
    }
    if (c != 0 && c < min_value) {
        min_value = c;
    }
    if (d != 0 && d < min_value) {
        min_value = d;
    }

    return min_value;
}

int run(struct thread_info *tinfo) {
    int threadid = tinfo->thread_num;
    int cpumask = tinfo->cpumask;
    int accelid = tinfo->accelid;
    test_struct *tests = tinfo->tests;
    size_t num_tests = tinfo->num_tests;

    
    // Get the current thread ID and set the affinity
    pthread_t thread = pthread_self();
    set_thread_affinity(thread, cpumask);

    int64_t task_done = 0;
    int num_test_passed = 0;


    uint64_t *data1 = (uint64_t *)malloc(sizeof(uint64_t) * tests[0].data1_packs);
    uint64_t *data2 = (uint64_t *)malloc(sizeof(uint64_t) * tests[0].data2_packs);

    printf("[thread %d] Starting tests\n", threadid);

    record_start_time(tinfo);

    for (size_t i = 0; i < num_tests; i++) {

        uint64_t local_param = ceil(((double)(tests[i].data2_len) / 11));
        uint64_t data1_len_size = concatenate_ints(tests[i].data1_len, tests[i].data1_packs);
        uint64_t data2_len_size_var = concatenate_3ints(tests[i].data2_len, tests[i].data2_packs, local_param);

        if (tests[i].data1 != (i > 0 ? tests[i - 1].data1 : NULL)) {
            for (size_t j = 0; j < tests[i].data1_packs; j++) {
                data1[j] = tests[i].data1[j];
            }
            switch (accelid) {
                case 0:
                    ROCC_INSTRUCTION_SS(0, data1, data1_len_size, 0);
                    break;
                case 1:
                    ROCC_INSTRUCTION_SS(1, data1, data1_len_size, 0);
                    break;
                case 2:
                    ROCC_INSTRUCTION_SS(2, data1, data1_len_size, 0);
                    break;
                case 3:
                    ROCC_INSTRUCTION_SS(3, data1, data1_len_size, 0);
                    break;
                default:
                    break;
            }
        }

        for (size_t j = 0; j < tests[i].data2_packs; j++) {
            data2[j] = tests[i].data2[j];
        }

        switch (accelid) {
            case 0:
                th0_start_cycle = rdcycle();
                ROCC_INSTRUCTION_DSS(0, task_done, data2, data2_len_size_var, 1);
                th0_end_cycle = rdcycle();
                total_tasks++;
                break;
            case 1:
                th1_start_cycle = rdcycle();
                ROCC_INSTRUCTION_DSS(1, task_done, data2, data2_len_size_var, 1);
                th1_end_cycle = rdcycle();
                total_tasks++;
                break;
            case 2:
                th2_start_cycle = rdcycle();
                ROCC_INSTRUCTION_DSS(2, task_done, data2, data2_len_size_var, 1);
                th2_end_cycle = rdcycle();
                total_tasks++;
                break;
            case 3:
                th3_start_cycle = rdcycle();
                ROCC_INSTRUCTION_DSS(3, task_done, data2, data2_len_size_var, 1);
                th3_end_cycle = rdcycle();
                total_tasks++;
                break;
            default:
                break;
        }
        
    }

    uint64_t th_start_time = min4(th0_start_cycle, th1_start_cycle, th2_start_cycle, th3_start_cycle);
    uint64_t th_end_time   = max4(th0_end_cycle,   th1_end_cycle,   th2_end_cycle,   th3_end_cycle);
    
    printf( "th%d start = %ld, th1 start = %ld, th2 start = %ld, th2 start = %ld  => min = %ld \n", threadid, th0_start_cycle, th1_start_cycle, th2_start_cycle, th3_start_cycle, th_start_time);
    printf( "th%d end   = %ld, th1 end   = %ld, th2 end   = %ld, th2 end   = %ld  => max = %ld \n", threadid, th0_end_cycle, th1_end_cycle, th2_end_cycle, th3_end_cycle, th_end_time);
    
    total_clk_cycles += (th_end_time - th_start_time); 
    record_end_time(tinfo);

    free(data2);
    free(data1);


    printf("\n[thread %d] Tests finished.\n", threadid, num_test_passed);
    return 0;
}
/*---------------------------------------------------------------*/
/*---------------------MAIN FUNCTION-----------------------------*/
/*---------------------------------------------------------------*/

int main(int argc, char *argv[]) {
    static int num_tests, num_threads;

    printf("start\n");
    if (argc < 2) {
        printf("Usage: %s <num_tests> <num_threads> <cpumask1> <cpumask2> ... <accelid1> <accelid2> ...\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    num_tests = atoi(argv[1]);
    num_threads = atoi(argv[2]);
    printf("num_tests = %d & num_threads = %d \n", num_tests, num_threads);

    struct thread_info *tinfo = calloc(num_threads, sizeof(*tinfo));
    if (tinfo == NULL) {
        handle_error("calloc");
    }

    clock_gettime(CLOCK_MONOTONIC, &start_time);


    initialize_threads(tinfo, num_threads, argv, tests, num_tests);

    join_threads(tinfo, num_threads);

    clock_gettime(CLOCK_MONOTONIC, &end_time);
    total_time = (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9;

    print_measurments(total_time);

    // Print the start and end times of each thread
    for (int i = 0; i < num_threads; i++) {
        printf("Thread %d started at %lu and ended at %lu\n", tinfo[i].thread_num, tinfo[i].start_time, tinfo[i].end_time);
    }

    free(tinfo);

    return EXIT_SUCCESS;
}

И отпечатки внутри функции run() показывают (опять же для 10 тестов):

[thread 0] Starting tests
[thread 1] Starting tests
th0 start = 12620313755, th1 start = 16425843909, th2 start = 0, th2 start = 0  => min = 12620313755 
th0 end   = 12621128480, th1 end   = 16425836178, th2 end   = 0, th2 end   = 0  => max = 16425836178 
th1 start = 12620313755, th1 start = 16425843909, th2 start = 0, th2 start = 0  => min = 12620313755 
th1 end   = 12621128480, th1 end   = 16426650923, th2 end   = 0, th2 end   = 0  => max = 16426650923 

[thread 0] Tests finished. 

[thread 1] Tests finished.

Теоретическое количество циклов для каждой задачи составляет около 80 000 для одного теста. Для 10 испытаний оно должно составлять 800 000 циклов. Так, например, для потока 0 выше, если я вычитаю конечные циклы (12621128480) и начальные циклы (12620313755), я получаю 814725, что близко к 800 000.

В конечном итоге я хочу запустить 16 потоков (почти) одновременно на 4 ядрах для 16 ускорителей и измерить время между временем начала 1-го потока и временем окончания 16-го потока. Также отметим, что количество выполняемых задач в этом случае будет в 16 раз больше (при 10 тестах выполненных задач = 160).

Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.

Обновление (tests.h):

uint64_t data2[] = {
    0x5e54c42e48f95681,
    0x6817f0ac737ce56c,
    0x2c25c96e2f08878e,
    0x76e63b8e74593080,
    0x5495a7174a9738cb,
    0x766b718558de643e,
    0x112af5be56f0419d,
    0x39dade335211fa50,
    0x2da48dcf3d051c37,
    0x0c7b1f97089158d8,
    0x51c71c37647dd215,
    0x193c2e614ee73ee9,
    0x6369f4c63ea7bf2d,
    0x1340c33d264fad65,
    0x705d49c404b1a154,
    0x266694b14eb20df2,
    0x4daaf7d50e7e855d,
    0x422ef35f79d0c143,
    0x3d870ceb39152eed,
    0x6e29f1c3121cb402,
    0x03ac67b864956348,
    0x6afb184014d75d76,
    0x3b85a4e524d5f673,
    0x66e957c6692a32b5,
    0x61db12aa7364775d,
    0x71bb8b8d33a22ee2,
    0x57e249720af7b9ee,
    0x02896dcb3b4c3e39,
    0x499f791b15ca3108,
    0x619beb9e39fcc2df,
    0x1a7bd25c0802804f,
    0x08aed0d26826ca32
};
uint64_t data1[] =
{
    0xBBE6139D23887ACF,
    0xA5C3E51BDB1C7101,
    0xE5E8E8377D210A02,
    0xCE1D0EB443C754E8,
    0x52FE52BE649D946B,
    0x841A847EABE22232,
    0xFB6DD72788419F4A,
    0xDAB563352538E68C,
    0xCD809B2A690DD264,
    0x767A3E544BD27E8,
    0xC43B11A46691E3F1,
    0x9D709F1788087B54,
    0x233AC62E6F14D8F0,
    0x9B31EFD15B268F5E,
    0x911AA32FFC2C5485,
    0x52E9C7B33B6B1A8D,
    0xE590F5241B3E896A,
    0x8F15C4A2891F1BD0,
    0xA1831064E81917BF,
    0xA66EE7901ADE7A05,
    0x499600463D2E2DBE,
    0x8FBAFAC1084EA740,
    0x7A3CDBD78AA1F6F8,
    0x229EB0F8F889B710,
    0xB4E48DBDF6F101E1,
    0x1DC8989C5D226B6C,
    0xBC49E8E6570BDB0D,
    0x45446206D4087C,
    0xA945EA2DA761AC00,
    0xC544955BD883EAD7,
    0x2A3772AFA29009F1,
    0xA28C1A9DD2648D10,
    0xDEC2D76F0EEB931C
    };

//structure for the test
typedef struct {
    uint64_t *data1;   
    uint64_t *data2; 
    int data1_packs;   
    int data2_packs;   
    int data1_len;     
    int data2_len;     
} test_struct;
// 6 tests
test_struct tests[] = {
    {data1, data2, 1566, 32, 50000, 1000},
    {data1, data2, 1566, 32, 50000, 1000},
    {data1, data2, 1566, 32, 50000, 1000},
    {data1, data2, 1566, 32, 50000, 1000},
    {data1, data2, 1566, 32, 50000, 1000},
    {data1, data2, 1566, 32, 50000, 1000}
   };

Многоядерный тест:

Я использовал приведенный ниже сценарий, чтобы использовать набор задач для запуска многопоточной программы на 4 процессорах:

#!/bin/bash

gcc -o fix1 fix1.c -pthread -lm

for i in 0 1 2 3; do
  (taskset -c $i ./fix1 -g 300 4 2>&1 | sed "s/^/CPU$i: /") &
done

wait

И вывод такой (я показываю только 7 самых правильных цифр для времени окончания и начала, а минимальное и максимальное время для каждого процессора — это время начала первого запущенного потока и время окончания последнего завершенного потока соответственно):

CPU0: start
CPU0: t=1 g=1
CPU0: num_tests = 300 & num_threads = 4 
CPU0: tests = 2500 elements
CPU0: d1cnt=1566 d2cnt=32
CPU0: T1 start 4403660 end 4526744 ELAPSED: 123084
CPU0: T2 start 4598096 end 4684928 ELAPSED: 86832
CPU0: T3 start 4717724 end 4804556 ELAPSED: 86832
CPU0: T0 start 4880624 end 4967708 ELAPSED: 87084
CPU0: Minimum start time: 3602931164403660
CPU0: Maximum end time: 3602931164967708
CPU0: Time difference: 564048
CPU0: 
CPU0: Total number of instructions: 0 
CPU0: Total number of cycles: 564048 
CPU0: Total number of Tasks finished: 1204 
CPU0: Tasks per second with gettime(): 4344359.00859
CPU0: Tasks per second with csr_cycles(): 53364.252688
CPU2: start
CPU2: t=1 g=1
CPU2: num_tests = 300 & num_threads = 4 
CPU2: tests = 2500 elements
CPU2: d1cnt=1566 d2cnt=32
CPU2: T1 start 4659980 end 4770068 ELAPSED: 110088
CPU2: T2 start 4854488 end 4941212 ELAPSED: 86724
CPU2: T3 start 4977392 end 5063900 ELAPSED: 86508
CPU2: T0 start 5122508 end 5209736 ELAPSED: 87228
CPU2: Minimum start time: 3602931164659980
CPU2: Maximum end time: 3602931165209736
CPU2: Time difference: 549756
CPU2: 
CPU2: Total number of instructions: 0 
CPU2: Total number of cycles: 549756 
CPU2: Total number of Tasks finished: 1204 
CPU2: Tasks per second with gettime(): 4423364.47568
CPU2: Tasks per second with csr_cycles(): 54751.562511
CPU3: start
CPU3: t=1 g=1
CPU3: num_tests = 300 & num_threads = 4 
CPU3: tests = 2500 elements
CPU3: d1cnt=1566 d2cnt=32
CPU3: T1 start 4951220 end 5055224 ELAPSED: 104004
CPU3: T2 start 5119988 end 5206748 ELAPSED: 86760
CPU3: T3 start 5239832 end 5326556 ELAPSED: 86724
CPU3: T0 start 5390636 end 5477720 ELAPSED: 87084
CPU3: Minimum start time: 3602931164951220
CPU3: Maximum end time: 3602931165477720
CPU3: Time difference: 526500
CPU3: 
CPU3: Total number of instructions: 0 
CPU3: Total number of cycles: 526500 
CPU3: Total number of Tasks finished: 1204 
CPU3: Tasks per second with gettime(): 4134033.32635
CPU3: Tasks per second with csr_cycles(): 57169.990503
CPU1: start
CPU1: t=1 g=1
CPU1: num_tests = 300 & num_threads = 4 
CPU1: tests = 2500 elements
CPU1: d1cnt=1566 d2cnt=32
CPU1: T1 start 991928 end 1097804 ELAPSED: 105876
CPU1: T2 start 1170452 end 1257428 ELAPSED: 86976
CPU1: T3 start 1297820 end 1384400 ELAPSED: 86580
CPU1: T0 start 1448336 end 1535456 ELAPSED: 87120
CPU1: Minimum start time: 3602931190991928
CPU1: Maximum end time: 3602931191535456
CPU1: Time difference: 543528
CPU1: 
CPU1: Total number of instructions: 0 
CPU1: Total number of cycles: 543528 
CPU1: Total number of Tasks finished: 1204 
CPU1: Tasks per second with gettime(): 4486845.04733
CPU1: Tasks per second with csr_cycles(): 55378.931720

Нам нужен MRE или SSCCE. Для C — один файл .c: со всеми (например) #include <stdio.h>, но без #include "myprogram.h" (для них скопируйте и вставьте содержимое .h). Ошибки могут быть там, где вы не подозреваете, поэтому опубликуйте полный код (с main). В случае ошибок во время выполнения он должен скомпилироваться без ошибок. Мы хотели бы проверить/создать/запустить код в наших системах. Мы/вы можете: (1) скомпилировать с помощью -Wall и -fsanitize=address -O0 -g (2) запустить его под gdb или strace (3) добавить отладку printf

Craig Estey 26.05.2024 19:35

Вы были довольно близки к MRE. Но у нас нет ваших трех нестандартных .h файлов, поэтому мы не можем попытаться запустить вашу программу в наших системах. Тогда даже их комментирование ROCC* не определено. И ваш код, связанный с тестированием (например, test_struct), также не определен.

Craig Estey 26.05.2024 19:38

@CraigEstey Спасибо за ваш комментарий. Вы можете найти rocc.h иcoding.h в github.com/riscv-software-src/riscv-isa-sim/tree/master/risc‌​v Я отредактирую сообщение для другого заголовка, надеюсь, часть тестов будет достаточно.

student_11 26.05.2024 23:13

@CraigEstey, извините, это неправильная ссылка, правильные заголовки здесь: github.com/ucb-bar/hwacha-template/blob/master/tests

student_11 27.05.2024 01:00
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
4
137
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Предостережение: это немного хрупко, потому что мне пришлось адаптировать код для работы на моем ПК.

Ваша проблема больше связана с запуском/запуском потоков «последовательно», а не с фактической выполняемой работой. Таким образом, это не особо зависит от выполняемой работы (симуляция инструкций), поэтому проблему можно диагностировать, запустив ее на ПК.

Проблемы с кодом:

  1. Если время выполнения фактической работы потока меньше времени выполнения pthread_create, выполнение потока является (будет казаться) последовательным.

  2. То есть своего рода состояние гонки. После того, как основной поток вызывает pthread_create, порожденный поток может запуститься и завершиться до возвращения pthread_create в основной поток.

  3. Можно улучшить, увеличив время работы/количество тестов.

  4. Можно улучшить, установив флаг «выпуск» (например, gonow в коде ниже). Тогда потоки будут «запускаться» только после того, как все потоки будут созданы и запущены.

  5. Наличие любого printf внутри функции потока искажает время (т. е. мы измеряем больше времени printf, чем реального времени. И printf выполняет межпоточную блокировку.

  6. В run вызовы malloc предполагают, что все data1_packs и data2_packs имеют одно и то же значение (верно на данный момент). Лучше использовать максимальные значения.

  7. num_tests может превышать количество тестов в массиве tests.

  8. Вам не совсем удобно использовать указатели на struct. В циклах их много (например) tinfo[tnum].whatever Лучше иметь «текущий» указатель (например, struct thread_info *tcur = &tinfo[tnum]; и использовать tcur->whatever).

  9. Наличие \\ в конце комментария помечается компилятором как попытка создать многострочное продолжение комментария (т. е. \ в конце строки сообщает препроцессору о необходимости объединения строк).

Изменения для запуска на ПК:

  1. Я заменил ROCC_* на простой nop, чтобы он работал на ПК (например, взломал rocc.h).
  2. Добавлена ​​простая/фальшивая версия rdcycle.

ФАЙЛ: fix1.c

#define _GNU_SOURCE

#include <pthread.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <errno.h>
#include <math.h>
#include <limits.h>
#include <stdbool.h>
#include <time.h>
#include <stdatomic.h>

#include "encoding.h"
#include "rocc.h"
#if 0
#include "data/tests.h"
#else
#include "tests.h"
#endif

#ifdef DEBUG
#define dbgprt(_fmt...) \
    printf(_fmt)
#else
#define dbgprt(_fmt...) \
    do { } while (0)
#endif

#if 1
#define ARRCOUNT(_arr) \
    (sizeof(_arr) / sizeof(_arr[0]))

typedef long long tsc_t;
tsc_t tsczero;
tsc_t rdzero;

tsc_t
tscget(void)
{
    struct timespec ts;
    tsc_t tsc;

    clock_gettime(CLOCK_MONOTONIC,&ts);
    tsc = ts.tv_sec;
    tsc *= 1000000000;
    tsc += ts.tv_nsec;

    tsc -= tsczero;

    return tsc;
}

double
tscsec(tsc_t tsc)
{
    double sec = tsc;
    return sec / 1e9;
}

int gonow;
int opt_g = 0;
int opt_t = 1;
int d1cnt;
int d2cnt;
size_t tests_max;
#endif

//----------------------------------------------------------------
//------------------  UTILITY-------------------------------------
//----------------------------------------------------------------

uint64_t freq = 25000000;               // 25 MHz
volatile uint64_t total_tasks = 0;
volatile double total_time = 0.0;
tsc_t total_clk_cycles = 0;
uint64_t total_insts = 0;
double inst_per_cycles = 0.0;

#if 1
tsc_t
rdcycle(void)
{
    tsc_t ret;

    if (opt_t)
        ret = tscget();
    else {
        static tsc_t ctr;
        ret = atomic_fetch_add(&ctr,1);
    }

    return ret;
}
#endif

void
print_measurments(double total_time)
{
    double tasks_per_second = total_tasks / total_time;

    printf("\nTotal number of instructions: %lu \n", total_insts);
    printf("Total number of cycles: %lld \n", total_clk_cycles);
    printf("Total number of tasks finished: %lu \n", total_tasks);
    printf("Tasks per second with gettime(): %.5f\n", tasks_per_second);
    printf("Tasks per second with csr_cycles(): %f\n",
        (double) total_tasks / ((double) total_clk_cycles / (double) freq));
}

uint64_t
concatenate_ints(unsigned int x, unsigned int y)
{
    return ((uint64_t) x << 32) | y;
}

uint64_t
concatenate_3ints(uint64_t x, uint64_t y, uint64_t z)
{
    return ((uint64_t) x << 48) | (((uint64_t) y << 32) | z);
}

uint64_t
concatenate_arrays(int64_t * x, int64_t * y)
{
    return ((uint64_t) x << 32) | (uint64_t) y;
}

//################################################################

//----------------------------------------------------------------
//------------------  THREAD -------------------------------------
//----------------------------------------------------------------

#define handle_error_en(en, msg) \
       do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)

#define handle_error(msg) \
       do { perror(msg); exit(EXIT_FAILURE); } while (0)

pthread_mutex_t timing_mutex = PTHREAD_MUTEX_INITIALIZER;

tsc_t start_time, end_time;

struct thread_info {
    pthread_t thread_id;
    int thread_num;
    int res;
    int cpumask;
    int accelid;
    test_struct *tests;
    size_t num_tests;
    tsc_t start_time;
    tsc_t end_time;
};

int run(struct thread_info *tinfo);

int thread_first;

static void *
thread_start(void *arg)
{
    struct thread_info *tinfo = arg;

// NOTE/FIX -- add a "release" flag
#if 1
    while (opt_g) {
        int go = atomic_load(&gonow);
        if (go)
            break;
    }

    // make the cycle counter relative (optional)?
    if (opt_g) {
        if (atomic_fetch_add(&thread_first,1) == 0)
            rdzero = rdcycle();
    }
#endif

    tinfo->res = run(tinfo);

    return (void *) &(tinfo->res);
}

void
initialize_threads(struct thread_info *tinfo, int nthreads, char *argv[],
    test_struct *tests, size_t num_tests)
{
    pthread_attr_t attr;
    int s;

    s = pthread_attr_init(&attr);
    if (s != 0) {
        handle_error_en(s, "pthread_attr_init");
    }

    size_t size = PTHREAD_STACK_MIN + 0x1000000;

    s = pthread_attr_setstacksize(&attr, size);
    if (s != 0) {
        handle_error_en(s, "pthread_attr_setstacksize");
    }

    for (int tnum = 0; tnum < nthreads; tnum++) {
        struct thread_info *tcur = &tinfo[tnum];

        tcur->thread_num = tnum;
#if 0
        tcur->cpumask = atoi(argv[tnum]);
#else
        tcur->cpumask = 0;
#endif
        tcur->accelid = tnum;       // Match accelid with thread_num
        tcur->tests = tests;
        tcur->num_tests = num_tests;

        s = pthread_create(&tcur->thread_id, &attr, &thread_start, tcur);
        if (s != 0) {
            handle_error_en(s, "pthread_create");
        }
    }

    s = pthread_attr_destroy(&attr);
    if (s != 0) {
        handle_error_en(s, "pthread_attr_destroy");
    }
}

void
join_threads(struct thread_info *tinfo, int nthreads)
{
    int s;
    void *tres;

    for (int tnum = 0; tnum < nthreads; tnum++) {
        s = pthread_join(tinfo[tnum].thread_id, &tres);
        if (s != 0) {
            handle_error_en(s, "pthread_join");
        }
    }
}

void
set_thread_affinity(pthread_t thread, int cpumask)
{
#if 0
    cpu_set_t cpuset;

    CPU_ZERO(&cpuset);

    for (int i = 0; i < 3; ++i) {
        if (cpumask & (1 << i)) {
            CPU_SET(i, &cpuset);
        }
    }

    int result = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);

    if (result != 0) {
        perror("pthread_setaffinity_np");
    }
#endif
}

void
record_start_time(struct thread_info *tinfo)
{
    tinfo->start_time = rdcycle();
}

void
record_end_time(struct thread_info *tinfo)
{
    tinfo->end_time = rdcycle();
#if 0
    pthread_mutex_lock(&timing_mutex);
    // total_clk_cycles += (tinfo->end_time - tinfo->start_time);
    pthread_mutex_unlock(&timing_mutex);
#endif
}

tsc_t th0_start_cycle = 0,
    th0_end_cycle = 0;
tsc_t th1_start_cycle = 0,
    th1_end_cycle = 0;
tsc_t th2_start_cycle = 0,
    th2_end_cycle = 0;
tsc_t th3_start_cycle = 0,
    th3_end_cycle = 0;

tsc_t
max4(tsc_t a, tsc_t b, tsc_t c, tsc_t d)
{
    tsc_t max_value = 0;

    if (a != 0) {
        max_value = a;
    }
    if (b != 0 && b > max_value) {
        max_value = b;
    }
    if (c != 0 && c > max_value) {
        max_value = c;
    }
    if (d != 0 && d > max_value) {
        max_value = d;
    }

    return max_value;
}

tsc_t
min4(tsc_t a, tsc_t b, tsc_t c, tsc_t d)
{
    tsc_t min_value = INT64_MAX;

    if (a != 0) {
        min_value = a;
    }
    if (b != 0 && b < min_value) {
        min_value = b;
    }
    if (c != 0 && c < min_value) {
        min_value = c;
    }
    if (d != 0 && d < min_value) {
        min_value = d;
    }

    return min_value;
}

int
run(struct thread_info *tinfo)
{
#if DEBUG
    int threadid = tinfo->thread_num;
#endif
    int cpumask = tinfo->cpumask;
    int accelid = tinfo->accelid;
    test_struct *tests = tinfo->tests;
    size_t num_tests = tinfo->num_tests;

    // Get the current thread ID and set the affinity
    pthread_t thread = pthread_self();

    set_thread_affinity(thread, cpumask);

    int64_t task_done = 0;
#if 0
    int num_test_passed = 0;
#endif

#if 0
    // NOTE/BUG: this assumes all data1_packs/data2_packs are the same
    uint64_t *data1 = malloc(sizeof(uint64_t) * tests[0].data1_packs);
    uint64_t *data2 = malloc(sizeof(uint64_t) * tests[0].data2_packs);
#else
    // NOTE/FIX: use maximum counts that we need
    uint64_t *data1 = malloc(sizeof(uint64_t) * d1cnt);
    uint64_t *data2 = malloc(sizeof(uint64_t) * d2cnt);
#endif

    dbgprt("[thread %d] Starting tests\n", threadid);

    record_start_time(tinfo);

// NOTE/BUG: num_tests can exceed number of tests in the "tests" array
    for (size_t i = 0; i < num_tests; i++) {
        size_t tidx = i % tests_max;
        test_struct *tcur = &tests[tidx];

        uint64_t local_param = ceil(((double) (tcur->data2_len) / 11));
        uint64_t data1_len_size = concatenate_ints(tcur->data1_len,
            tcur->data1_packs);
        uint64_t data2_len_size_var = concatenate_3ints(tcur->data2_len,
            tcur->data2_packs, local_param);

#if 0
// NOTE/BUG: there is a cleaner way to express this
        if (tests[i].data1 != (i > 0 ? tests[i - 1].data1 : NULL)) {
#else
        test_struct *tprev = tcur - 1;
        if ((tidx > 0) && (tcur->data1 != tprev->data1)) {
#endif
            for (size_t j = 0; j < tcur->data1_packs; j++) {
                data1[j] = tcur->data1[j];
            }
            switch (accelid) {
            case 0:
                ROCC_INSTRUCTION_SS(0, data1, data1_len_size, 0);
                break;
            case 1:
                ROCC_INSTRUCTION_SS(1, data1, data1_len_size, 0);
                break;
            case 2:
                ROCC_INSTRUCTION_SS(2, data1, data1_len_size, 0);
                break;
            case 3:
                ROCC_INSTRUCTION_SS(3, data1, data1_len_size, 0);
                break;
            default:
                break;
            }
        }

#if 1
        tprev = tcur;
#endif

        for (size_t j = 0; j < tcur->data2_packs; j++) {
            data2[j] = tcur->data2[j];
        }

        switch (accelid) {
        case 0:
            th0_start_cycle = rdcycle();
            ROCC_INSTRUCTION_DSS(0, task_done, data2, data2_len_size_var, 1);
            th0_end_cycle = rdcycle();
            total_tasks++;
            break;
        case 1:
            th1_start_cycle = rdcycle();
            ROCC_INSTRUCTION_DSS(1, task_done, data2, data2_len_size_var, 1);
            th1_end_cycle = rdcycle();
            total_tasks++;
            break;
        case 2:
            th2_start_cycle = rdcycle();
            ROCC_INSTRUCTION_DSS(2, task_done, data2, data2_len_size_var, 1);
            th2_end_cycle = rdcycle();
            total_tasks++;
            break;
        case 3:
            th3_start_cycle = rdcycle();
            ROCC_INSTRUCTION_DSS(3, task_done, data2, data2_len_size_var, 1);
            th3_end_cycle = rdcycle();
            total_tasks++;
            break;
        default:
            break;
        }
    }

    tsc_t th_start_time = min4(th0_start_cycle, th1_start_cycle, th2_start_cycle, th3_start_cycle);
    tsc_t th_end_time = max4(th0_end_cycle, th1_end_cycle, th2_end_cycle, th3_end_cycle);

    dbgprt("th%d start = %ldd, th1 start = %ldd, th2 start = %ldd, th2 start = %ldd  => min = %ldd \n",
        threadid, th0_start_cycle, th1_start_cycle, th2_start_cycle,
        th3_start_cycle, th_start_time);

    dbgprt("th%d end   = %lld, th1 end   = %lld, th2 end   = %lld, th2 end   = %lld  => max = %lld \n",
        threadid, th0_end_cycle, th1_end_cycle, th2_end_cycle,
        th3_end_cycle, th_end_time);

    total_clk_cycles += (th_end_time - th_start_time);
    record_end_time(tinfo);

    do {
        if (0)
            break;
        if (task_done)
            ++task_done;
    } while (0);

    free(data2);
    free(data1);

    dbgprt("\n[thread %d] Tests finished.\n", threadid, num_test_passed);
    return 0;
}

/*---------------------------------------------------------------*/
/*---------------------MAIN FUNCTION-----------------------------*/
/*---------------------------------------------------------------*/

static int num_tests, num_threads;

void
show_elapsed(struct thread_info *tinfo,int sorted)
{

    printf("\n");

    struct thread_info *tprev = tinfo;
    for (int i = 0; i < num_threads; i++, tinfo++) {
        printf("T%d start %lld end %lld",
            tinfo->thread_num, tinfo->start_time, tinfo->end_time);

        printf(" ELAPSED: %lld",tinfo->end_time - tinfo->start_time);

        printf(" (%lld / %lld)",
            tinfo->start_time - tprev->start_time,
            tinfo->end_time - tprev->end_time);

        printf("\n");
        tprev = tinfo;
    }
}

int
tinfocmp(const void *vlhs,const void *vrhs)
{
    const struct thread_info *tlhs = vlhs;
    const struct thread_info *trhs = vrhs;
    return tlhs->start_time - trhs->start_time;
}

int
main(int argc, char **argv)
{

    tsczero = tscget();

    printf("start\n");
    if (argc < 2) {
        printf("Usage: %s <num_tests> <num_threads> <cpumask1> <cpumask2> ... <accelid1> <accelid2> ...\n", argv[0]);
        exit(EXIT_FAILURE);
    }

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        char *cp = *argv;
        if (*cp != '-')
            break;

        cp += 2;
        switch (cp[-1]) {
        case 'g':
            opt_g = ! opt_g;
            break;
        case 't':
            opt_t = ! opt_t;
            break;
        }
    }
    printf("t=%d g=%d\n",opt_t,opt_g);

    num_tests = atoi(argv[0]);
    num_threads = atoi(argv[1]);
    printf("num_tests = %d & num_threads = %d \n", num_tests, num_threads);

    tests_max = ARRCOUNT(tests);
    printf("tests = %zu elements\n",tests_max);
    if (num_tests > tests_max)
        printf("WARNING num_tests too large -- test will wrap the index\n");

    struct thread_info *tinfo = calloc(num_threads, sizeof(*tinfo));
    if (tinfo == NULL) {
        handle_error("calloc");
    }

    // NOTE/FIX: get maximum counts that we need
#if 1
    d1cnt = 0;
    d2cnt = 0;
    for (size_t i = 0; i < tests_max; i++) {
        test_struct *tcur = &tests[i];

        if (tcur->data1_packs > d1cnt)
            d1cnt = tcur->data1_packs;

        if (tcur->data2_packs > d2cnt)
            d2cnt = tcur->data2_packs;
    }
    printf("d1cnt=%d d2cnt=%d\n",d1cnt,d2cnt);
#endif

    start_time = tscget();

    initialize_threads(tinfo, num_threads, argv, tests, num_tests);

    atomic_store(&gonow,1);

    join_threads(tinfo, num_threads);

    end_time = tscget();
    total_time = tscsec(end_time - start_time);

    print_measurments(total_time);

    // Print the start and end times of each thread
    show_elapsed(tinfo,0);

    qsort(tinfo,num_threads,sizeof(*tinfo),tinfocmp);
    show_elapsed(tinfo,1);

    free(tinfo);

    return EXIT_SUCCESS;
}

ФАЙЛ: rocc.h

// Based on code by Schuyler Eldridge. Copyright (c) Boston University
// https://github.com/seldridge/rocket-rocc-examples/blob/master/src/main/c/rocc.h

#ifndef SRC_MAIN_C_ROCC_H
#define SRC_MAIN_C_ROCC_H

#include <stdint.h>

#define STR1(x) #x
#define STR(x) STR1(x)
#define EXTRACT(a, size, offset) (((~(~0 << size) << offset) & a) >> offset)

#define CUSTOMX_OPCODE(x) CUSTOM_ ## x
#define CUSTOM_0 0b0001011
#define CUSTOM_1 0b0101011
#define CUSTOM_2 0b1011011
#define CUSTOM_3 0b1111011

#define CUSTOMX(X, xd, xs1, xs2, rd, rs1, rs2, funct) \
  CUSTOMX_OPCODE(X)                     |             \
  (rd                 << (7))           |             \
  (xs2                << (7+5))         |             \
  (xs1                << (7+5+1))       |             \
  (xd                 << (7+5+2))       |             \
  (rs1                << (7+5+3))       |             \
  (rs2                << (7+5+3+5))     |             \
  (EXTRACT(funct, 7, 0) << (7+5+3+5+5))

// Standard macro that passes rd, rs1, and rs2 via registers
#define ROCC_INSTRUCTION_DSS(X, rd, rs1, rs2, funct) \
    ROCC_INSTRUCTION_R_R_R(X, rd, rs1, rs2, funct, 10, 11, 12)

#define ROCC_INSTRUCTION_DS(X, rd, rs1, funct) \
    ROCC_INSTRUCTION_R_R_I(X, rd, rs1, 0, funct, 10, 11)

#define ROCC_INSTRUCTION_D(X, rd, funct) \
    ROCC_INSTRUCTION_R_I_I(X, rd, 0, 0, funct, 10)

#define ROCC_INSTRUCTION_SS(X, rs1, rs2, funct) \
    ROCC_INSTRUCTION_I_R_R(X, 0, rs1, rs2, funct, 11, 12)

#define ROCC_INSTRUCTION_S(X, rs1, funct) \
    ROCC_INSTRUCTION_I_R_I(X, 0, rs1, 0, funct, 11)

#define ROCC_INSTRUCTION(X, funct) \
    ROCC_INSTRUCTION_I_I_I(X, 0, 0, 0, funct)

// rd, rs1, and rs2 are data
// rd_n, rs_1, and rs2_n are the register numbers to use
#define ROCC_INSTRUCTION_R_R_R(X, rd, rs1, rs2, funct, rd_n, rs1_n, rs2_n) { \
    uint64_t rd_ ;                                 \
    uint64_t rs1_ = (uint64_t) rs1;               \
    uint64_t rs2_ = (uint64_t) rs2;               \
    asm volatile (                                                           \
        "\tnop\n"  \
        : "=r" (rd_)                                                         \
        : [_rs1] "r" (rs1_), [_rs2] "r" (rs2_));                             \
    rd = rd_;                                                                \
  }

#define ROCC_INSTRUCTION_R_R_I(X, rd, rs1, rs2, funct, rd_n, rs1_n) {     \
    uint64_t rd_ ;                              \
    uint64_t rs1_ = (uint64_t) rs1;            \
    asm volatile (                                                        \
        "\tnop\n" \
        : "=r" (rd_) : [_rs1] "r" (rs1_));                                \
    rd = rd_;                                                             \
  }

#define ROCC_INSTRUCTION_R_I_I(X, rd, rs1, rs2, funct, rd_n) {           \
    uint64_t rd_ ;                             \
    asm volatile (                                                       \
        "\tnop\n"  \
        : "=r" (rd_));                                                   \
    rd = rd_;                                                            \
  }

#define ROCC_INSTRUCTION_I_R_R(X, rd, rs1, rs2, funct, rs1_n, rs2_n) {    \
    uint64_t rs1_ = (uint64_t) rs1;            \
    uint64_t rs2_ = (uint64_t) rs2;            \
    asm volatile (                                                        \
        "\tnop\n" \
        :: [_rs1] "r" (rs1_), [_rs2] "r" (rs2_));                         \
  }

#define ROCC_INSTRUCTION_I_R_I(X, rd, rs1, rs2, funct, rs1_n) {         \
    uint64_t rs1_ = (uint64_t) rs1;          \
    asm volatile (                                                      \
        "\tnop\n" \
        :: [_rs1] "r" (rs1_));                                          \
  }

#define ROCC_INSTRUCTION_I_I_I(X, rd, rs1, rs2, funct) {                 \
    asm volatile (                                                       \
        "\tnop\n" ); \
  }

#endif  // SRC_MAIN_C_ACCUMULATOR_H

В приведенном выше коде я использовал условные обозначения cpp для обозначения старого и нового кода:

#if 0
// old code
#else
// new code
#endif

#if 1
// new code
#endif

Примечание: это можно исправить, пропустив файл через unifdef -k.


Вот результат ./fix1 -g 300 8. Обратите внимание, что из-за изменений в rdcycle отсчет циклов составляет наносекунды:

start
t=1 g=1
num_tests = 300 & num_threads = 8
tests = 6 elements
WARNING num_tests too large -- test will wrap the index
d1cnt=1566 d2cnt=32

Total number of instructions: 0
Total number of cycles: 104387
Total number of tasks finished: 878
Tasks per second with gettime(): 1800253.83989
Tasks per second with csr_cycles(): 210275.225842

T0 start 377562 end 443625 ELAPSED: 66063 (0 / 0)
T1 start 379604 end 455869 ELAPSED: 76265 (2042 / 12244)
T2 start 379700 end 466956 ELAPSED: 87256 (96 / 11087)
T3 start 378780 end 466888 ELAPSED: 88108 (-920 / -68)
T4 start 373805 end 404777 ELAPSED: 30972 (-4975 / -62111)
T5 start 380184 end 418223 ELAPSED: 38039 (6379 / 13446)
T6 start 438384 end 464627 ELAPSED: 26243 (58200 / 46404)
T7 start 500486 end 515786 ELAPSED: 15300 (62102 / 51159)

T4 start 373805 end 404777 ELAPSED: 30972 (0 / 0)
T0 start 377562 end 443625 ELAPSED: 66063 (3757 / 38848)
T3 start 378780 end 466888 ELAPSED: 88108 (1218 / 23263)
T1 start 379604 end 455869 ELAPSED: 76265 (824 / -11019)
T2 start 379700 end 466956 ELAPSED: 87256 (96 / 11087)
T5 start 380184 end 418223 ELAPSED: 38039 (484 / -48733)
T6 start 438384 end 464627 ELAPSED: 26243 (58200 / 46404)
T7 start 500486 end 515786 ELAPSED: 15300 (62102 / 51159)

ОБНОВЛЯТЬ:

Только один вопрос, почему вы закомментировали функцию set_affinity и cpu_mask?

По нескольким причинам:

  1. Тогда я бегло взглянул на ваш код сходства и не понял, как он работает (поэтому я его закомментировал).
  2. Теперь, после более тщательного изучения, в вашем коде привязки есть ошибка (подробнее об этом ниже).
  3. Удобство отсутствия необходимости указывать дополнительные N аргументов для cpumask и N аргументов для accelid. Ускоритель не имел никакого значения на моем компьютере.
  4. Планировщик Linux (особенно в наши дни) довольно хорошо распределяет нагрузку равномерно по N процессорам.
  5. Из-за динамического характера планирования с «небольшим» набором доступных/возможных процессоров и однородным набором потоков привязка потока к определенному подмножеству ядер может привести к снижению пропускной способности и/или задержки.

потому что, когда я хочу использовать их и потоки на двух процессорах (каждый процессор будет иметь 4 потока), потоки снова будут последовательными.

Не совсем так. Если мы не укажем привязку к процессору, планировщик запустит потоки на любом процессоре, который выберет. Он мог бы запустить их все на одном и том же процессоре, но, скорее всего, он назначит их в каком-то порядке (например, по кругу (?)).

Привязка к процессору имеет все больший и лучший эффект:

  1. Процессоры неоднородны. Например, некоторые ЦП имеют больше кэш-памяти, чем другие, система является NUMA (а некоторые ЦП имеют более быстрый доступ к ОЗУ или больший объем оперативной памяти), подмножество ЦП имеет подключенные сопроцессоры и т. д.
  2. Набор потоков для задания неоднороден. Например, декодер видео реального времени. Входные и выходные потоки должны иметь высокий приоритет и минимизировать задержку. Потоки декодирования (видео и/или аудио) должны иметь высокую пропускную способность, но задержка не вызывает особого беспокойства.
  3. Задержка прерывания должна быть минимизирована. Привязка IRQ данного аппаратного устройства привязана к подмножеству ядер ЦП (обычно только к одному), и поток, который должен проснуться для обработки данных, аналогично ограничен.
  4. Большое количество процессоров (например, 256), и мы хотим обеспечить переменное качество обслуживания в зависимости от политики. То есть определенные пользователи «доплачивают» (имеют приоритет) за исключительное использование подмножества ЦП.

Теперь о вашем коде близости... На самом деле он сломан.

Я попробовал такую ​​команду: ./program -g 8 0 0 0 0 1 1 1 1 0 1 2 3 0 1 2 3 Это означает, что я использую 8 потоков: первые четыре на ЦП 0, вторые четыре на ЦП 1, затем первый поток отправит инструкции для ускорения 0, а второй поток отправит инструкции для ускорения 1 и так далее.

Примечание: в только что приведенном вами примере вы исключили [первый] аргумент «количество тестов».

Может быть проще указать маску сходства в шестнадцатеричном формате. strtol (с основанием 0) разрешит/поймет 0xFF, но atoi вернет за это ноль.

Вот тестовая программа:

// mask.c -- cpu mask test

#include <stdio.h>
#include <stdlib.h>

void
orig(char *cp)
{

    int mask = strtol(cp,&cp,0);
    printf(" %8.8X %d --",mask,mask);

    for (int i = 0; i < 3; ++i) {
        if (mask & (1 << i))
            printf(" SET(%d)",i);
    }
}

void
fix1(char *cp)
{

    int i = strtol(cp,&cp,0);
    printf(" %8.8X %d --",i,i);

    printf(" SET(%d)",i);
}

void
fix2(char *cp)
{

    unsigned int mask = strtol(cp,&cp,0);
    printf(" %8.8X %d --",mask,mask);

    for (int i = 0;  mask != 0;  ++i, mask >>= 1) {
        if (mask & 1)
            printf(" SET(%d)",i);
    }
}

void
fix3(char *cp)
{

    while (*cp != 0) {
        int i = strtol(cp,&cp,10);

        printf(" SET(%d)",i);

        if (*cp != ',')
            break;

        ++cp;
    }
}

void
dofnc(void (*fnc)(char *),const char *sym,char *cp)
{

    printf("%s: argv='%s'",sym,cp);
    fnc(cp);
    printf("\n");
}

void
dotest(void (*fnc)(char *),const char *sym,char **argv)
{

    for (char **av = argv;  *av != NULL;  ++av)
        dofnc(fnc,sym,*av);
}

int
main(int argc,char **argv)
{
    int opt_f = 0;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        char *cp = *argv;
        if (*cp != '-')
            break;

        cp += 2;
        switch (cp[-1]) {
        case 'f':
            opt_f = (*cp != 0) ? atoi(cp) : 0;
            break;
        }
    }

    switch (opt_f) {
    case 1:
        dotest(fix1,"fix1",argv);
        break;
    case 2:
        dotest(fix2,"fix2",argv);
        break;
    case 3:
        dotest(fix3,"fix3",argv);
        break;
    default:
        dotest(orig,"orig",argv);
        break;
    }

    return 0;
}

#if 0
void
set_thread_affinity(pthread_t thread, int cpumask)
{
    cpu_set_t cpuset;

    CPU_ZERO(&cpuset);

    for (int i = 0; i < 3; ++i) {
        if (cpumask & (1 << i)) {
            CPU_SET(i, &cpuset);
        }
    }

    int result = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);

    if (result != 0) {
        perror("pthread_setaffinity_np");
    }
}
#endif

Вот вывод программы для ./mask -f0 0 1 2 3 4, который является вашим исходным кодом:

orig: argv='0' 00000000 0 --
orig: argv='1' 00000001 1 -- SET(0)
orig: argv='2' 00000002 2 -- SET(1)
orig: argv='3' 00000003 3 -- SET(0) SET(1)
orig: argv='4' 00000004 4 -- SET(2)

Обратите внимание, что аргумент, равный 0, должен устанавливать привязку к процессору 0. Но он ничего не устанавливает. Маска родства — все нули!

Это... может... быть... плохо...

В вашем понимании это не «маска» процессоров, а «номер» процессора, которому можно назначить поток. То есть мы можем назначить поток только одному процессору, а не его подмножеству (т. е. истинной маске).

И ваш код этого не делает.

Вот вывод ./mask -f1 0 1 2 3 4 (который устанавливает один процессор):

fix1: argv='0' 00000000 0 -- SET(0)
fix1: argv='1' 00000001 1 -- SET(1)
fix1: argv='2' 00000002 2 -- SET(2)
fix1: argv='3' 00000003 3 -- SET(3)
fix1: argv='4' 00000004 4 -- SET(4)

Улучшение заключается в использовании шестнадцатеричной маски. Вот результат ./mask -f2 0x0003 0x000C 0x0300 0x0C00:

fix2: argv='0x0003' 00000003 3 -- SET(0) SET(1)
fix2: argv='0x000C' 0000000C 12 -- SET(2) SET(3)
fix2: argv='0x0300' 00000300 768 -- SET(8) SET(9)
fix2: argv='0x0C00' 00000C00 3072 -- SET(10) SET(11)

Но указание маски процессора в шестнадцатеричном формате таким способом является обременительным. Было бы проще, если бы мы могли предоставить список процессоров для назначения. Вот результат: ./mask -f3 0,1 2,3 4,5 6,7:

fix3: argv='0,1' SET(0) SET(1)
fix3: argv='2,3' SET(2) SET(3)
fix3: argv='4,5' SET(4) SET(5)
fix3: argv='6,7' SET(6) SET(7)

Я мог бы изменить организацию аргументов. Трудно сопоставить сходство процессора с идентификатором его ускорителя. То есть:

0,1 2,3 4,5 6,7 0 1 2 3

Немного натянуто видеть, что поток 3 должен работать на процессорах 6 и 7 и что он должен выполняться на ускорителе 3. Возможно, нам нужен синтаксис вроде:

0,1@0 2,3@1 4,5@2 6,7@3

Или лучше использовать файл конфигурации:

[config_1]
    thread 0 cpu=0,1 accel=0
    thread 1 cpu=2,3 accel=1
    thread 2 cpu=4,5 accel=2
    thread 3 cpu=6,7 accel=3

[config_2]
    thread 0 cpu=0,1 accel=3
    thread 1 cpu=2,3 accel=2
    thread 2 cpu=4,5 accel=1
    thread 3 cpu=6,7 accel=0

@student_11 Всегда пожалуйста. Я только что заметил, что rocc.h — это (C) Бостонский университет. Есть ли шанс, что ты там студент? (Я получил там степень бакалавра/CE в 70-х).

Craig Estey 28.05.2024 00:19

Нет, к сожалению, я учился где-то в другом месте. Я всего лишь пользователь их проектов.

student_11 28.05.2024 00:58

Только один вопрос, почему вы закомментировали функцию set_affinity и cpu_mask? потому что, когда я хочу использовать их и потоки на двух процессорах (каждый процессор будет иметь 4 потока), потоки снова будут последовательными. Я попробовал команду вроде: ./program -g 8 0 0 0 0 1 1 1 1 0 1 2 3 0 1 2 3 Это означает, что я использую 8 потоков: первые четыре на ЦП 0, вторые четыре на ЦП 1, затем первый поток отправит инструкции для ускорения 0, второй поток отправит инструкции для ускорения 1 и так далее.

student_11 29.05.2024 04:14

@student_11 Сначала я думал, что мое изменение будет провальным, но, посмотрев еще раз на ваш код привязки, я думаю, что в вашем коде есть ошибка. Я обновил свой ответ.

Craig Estey 29.05.2024 18:56

Другие вопросы по теме