Título: condicion de carrera c++ threads
Publicado por: julio1 en 3 Abril 2019, 18:35 pm
Estoy mirando el problema del productor/consumidor con threads en c+11. Pues bien, el siguiente código me da un problema con helgrind detecta una condicion de carrera y debuggeando un poco creo que el problema esta en la funcion consumer pero no estoy seguro y no se como resolverlo. #include <iostream> #include <sstream> #include <vector> #include <stack> #include <thread> #include <mutex> #include <atomic> #include <condition_variable> #include <chrono> using namespace std; // print function for "thread safe" printing using a stringstream void print(ostream& s) { cout << s.rdbuf(); cout.flush(); s.clear(); } const int num_producers = 5; const int num_consumers = 10; const int producer_delay_to_produce = 10; // in miliseconds const int consumer_delay_to_consume = 30; // in miliseconds const int consumer_max_wait_time = 200; // in miliseconds - max time that a consumer can wait for a product to be produced. const int max_production = 10; // When producers has produced this quantity they will stop to produce const int max_products = 10; // Maximum number of products that can be stored atomic<int> num_producers_working(0); // When there's no producer working the consumers will stop, and the program will stop. stack<int> products; // The products stack, here we will store our products mutex xmutex; // Our mutex, without this mutex our program will cry condition_variable is_not_full; // to indicate that our stack is not full between the thread operations condition_variable is_not_empty; // to indicate that our stack is not empty between the thread operations
void produce(int producer_id) { unique_lock<mutex> lock(xmutex); int product; is_not_full.wait(lock, [] { return products.size() != max_products; }); product = products.size(); products.push(product); print(stringstream() << "Producer " << producer_id << " produced " << product << "\n"); is_not_empty.notify_one(); } // Consume function, consumer_id will consume a product void consume(int consumer_id) { unique_lock<mutex> lock(xmutex); int product; if(is_not_empty.wait_for(lock, chrono::milliseconds(consumer_max_wait_time), [] { return products.size() > 0; })) { product = products.top(); products.pop(); print(stringstream() << "Consumer " << consumer_id << " consumed " << product << "\n"); is_not_full.notify_one(); } } void producer(int id) { ++num_producers_working; for(int i = 0; i < max_production; ++i) { produce(id); this_thread::sleep_for(chrono::milliseconds(producer_delay_to_produce)); } print(stringstream() << "Producer " << id << " has exited\n"); --num_producers_working; } void consumer(int id) { // Wait until there is any producer working while(num_producers_working == 0) this_thread::yield(); while(num_producers_working != 0 || products.size() > 0) { consume(id); this_thread::sleep_for(chrono::milliseconds(consumer_delay_to_consume)); } print(stringstream() << "Consumer " << id << " has exited\n"); } int main() { vector<thread> producers_and_consumers; // Create producers for(int i = 0; i < num_producers; ++i) producers_and_consumers.push_back(thread(producer, i)); // Create consumers for(int i = 0; i < num_consumers; ++i) producers_and_consumers.push_back(thread(consumer, i)); // Wait for consumers and producers to finish for(auto& t : producers_and_consumers) t.join(); }
Título: Re: condicion de carrera c++ threads
Publicado por: Loretz en 3 Abril 2019, 21:06 pm
No tengo el valgrind y estoy con el Visual Studio, pero veo dos cosas: 1) [que no tiene nada que ver con tu pregunta, pero...] La función void print(ostream& s) no debería poder invocarse con un argumento temporal (un prvalue), como en print(stringstream() << "Producer " << producer_id << " produced " << product << "\n"); Por alguna extraña razón tu compilador lo permite. Supongo que puedes sobrecargar la función print con una versión que acepte una rvalue reference: void print(ostream&& s) { cout << s.rdbuf() << flush; s.clear(); } Y 2) en is_not_full.wait(lock, [] { return products.size() != max_products; });
el predicado products.size() != max_products; no garantiza que el lock no se levante debido a un wakeup espurio, haciendo que se intente ejecutar el push() por dos o más threads simultáneamente.
Título: Re: condicion de carrera c++ threads
Publicado por: julio1 en 3 Abril 2019, 21:18 pm
Con respecto a la primera estoy de acuerdo contigo, la verdad es que no me había fijado, es curioso porque no estoy en un compilador raro estoy usando g++ en su versión 7, y porque g++ permitirá esto? Si mal no recuerdo al tomar una referencia el parametro de una función se puede llamar como un rvalue si el parámetro es const. Y yendo a la segunda nunca había oído hablar de un wakeup espurio.
Título: Re: condicion de carrera c++ threads
Publicado por: Loretz en 3 Abril 2019, 21:31 pm
wait causes the current thread to block until the condition variable is notified or a spurious wakeup occurs, https://en.cppreference.com/w/cpp/thread/condition_variable/wait
Título: Re: condicion de carrera c++ threads
Publicado por: julio1 en 3 Abril 2019, 21:36 pm
Gracias, lo miro ahora. Lo que dijiste de la funcion print en el visual studio te da error al compilar o te compila sin problemas?
Título: Re: condicion de carrera c++ threads
Publicado por: Loretz en 3 Abril 2019, 21:43 pm
Lo que dijiste de la funcion print en el visual studio te da error al compilar o te compila sin problemas? Me da error al compilar.
Título: Re: condicion de carrera c++ threads
Publicado por: julio1 en 4 Abril 2019, 00:00 am
He solucionado el problema de la condición de carrera, helgrind ya no se queja, la culpa la tenía el método size en la línea while(num_producers_working != 0 || products.size() > 0)la solución más simple pasa por meter a size dentro de una sección crítica: size_t stack_size(const stack<int> &obj ){ lock_guard<mutex> guardian(xmutex); return obj.size();
} Modificando así la línea anterior ahora: while(num_producers_working != 0 || stack_size(products) > 0)
Título: Re: condicion de carrera c++ threads
Publicado por: dijsktra en 4 Abril 2019, 15:01 pm
Que bueno que la gente se meta con problemas de concurrente! He solucionado el problema de la condición de carrera, helgrind ya no se queja, la culpa la tenía el método size en la línea while(num_producers_working != 0 || products.size() > 0)...
Cuidado! Yo no tengo ni idea de helgrind, no lo había oido en mi vida, pero la depuración en concurrente es algo impracticable! Necesitas un cálculo para razonar sobre la corrección de un programa, con invariantes, etc... Conviene tener princpios sólidos sobre los que montar tus soluciones concurrentes. Un buen libro: "Concurrent Programming. Gregory R. Andrews". No vas a aprender nada de C++, pero si mucho de programaci'on concurrente! Algunas observaciones a tu programa: - Productores consumidores se ejecutan indefinidamente. De otro modo, puedes llegar a bloqueos, (imagina 1 productor y 1000 consumidores, a 1 items cada uno... Es obvio que los consumidores se quedarán esperando indefinidamente
- El buffer, o es un escalar, o un buffer limitado. Nada se gana con los stacks... Interesa FIFO, no LIFO
- Nada se gana leyendo los manuales de la libreria <thread> o <condition variable> si no se conoce en que consisten los monitores
#include <cassert> #include <iostream> #include <vector> // as bounded buffer. #include <thread> #include <mutex> #include <condition_variable> #include <chrono> #include <atomic> #include <cassert> #define MAX_THREADS 1000 #define SIZE_BUFFER 5 #define TIMEOUT 200 using namespace std; // Patemeters overwritten on input. static int P = 5; // Producers static int C = 10; // Consumers static int P_DELAY=1000 ; // Prod-Delay static int C_DELAY=500; // MONITOR VARIABLES static mutex xmutex; // Our mutex, monitor methods in exclusion static condition_variable if_full; // to indicate that our stack is // not full between the thread operations static condition_variable if_empty; // to indicate that our stack is // not empty between the thread operations static int front = 0; static int rear = 0; static vector<int> buf(SIZE_BUFFER); static int holes = SIZE_BUFFER; static int items = 0; // Monitor invariants: // inv. items = (front "-" rear) // inv. holes = SIZE_BUFFER - (front "-" rear) // inv. 0 <= front < SIZE_BUFFER, 0 <= rear < SIZE_BUFFER static atomic<int> m(0); // simulating a fresh message; // MONITOR METHODS void init() { front=rear=items=0; holes = SIZE_BUFFER; } void deposit(const int id,const int msg) { unique_lock<mutex> lock(xmutex); // await holes > 0 if_full.wait(lock, [] { return holes > 0; }); buf[rear] = msg; rear = ((rear+1) % SIZE_BUFFER); cout << "p[" << id << "]\tdeposit\t:\t" << msg << endl; holes--; items++; if_empty.notify_all(); } // Consume function, consumer_id will consume a product int fetch(const int id) { unique_lock<mutex> lock(xmutex); int mess; // await items > 0 if_empty.wait(lock, [] { return items > 0; }); mess = buf[front]; front = ((front+1) % SIZE_BUFFER); cout << "c[" << id << "]\t\t\t\t\tfetch\t:\t" << mess << endl; holes++;items--; if_full.notify_all(); return mess; } // Real code for threads. void producer(int id) { for (; 1; ) { deposit(id, ++m); this_thread::sleep_for(chrono::milliseconds(P_DELAY)); } return; } void consumer(int id) { int msg; for (; 1; ) { msg = fetch(id); this_thread::sleep_for(chrono::milliseconds(C_DELAY)); } return ; } void simulation() { vector<thread> threads; init(); // Create producers for (int p = 0; p < P; ++p) threads.push_back(thread(producer, p)); // Create consumers for (int c = 0; c < C; ++c) threads.push_back(thread(consumer, c)); // Wait for consumers and producers to finish for (auto& t : threads) t.join(); } // IO staff. int main(int argc, char *args[]) { cout << "C P P_DELAY C_DELAY " << endl; for( ; cin >> C >> P >> P_DELAY >> C_DELAY; ) { assert((C<MAX_THREADS) && (P<MAX_THREADS) ); assert((P_DELAY > 0) && (C_DELAY > 0) ); simulation(); } return 0; }
Aqui un ejemplo de salida. Tu puedes experimientar variando , el numero de p/c, la velocidad relativa... Con independencia de la velocidad relativa, dentro de un mismo grupo no está determinado el orden de acceso al monitor. Así, es posible que el productor del mensaje 9 tenga acceso al monitor antes que el productor del mensaje 8. Eso sí, entre dos consumidores el primero en acceder extraerá el mnesaje 9, y el segundo el 8, por ser FIFO. g++ conc.cpp -o main -lpthread && ./main C P P_DELAY C_DELAY 5 5 500 500 p[1] deposit : 1 p[2] deposit : 2 p[0] deposit : 3 p[3] deposit : 4 c[0] fetch : 1 p[4] deposit : 5 c[1] fetch : 2 c[3] fetch : 3 c[2] fetch : 4 c[4] fetch : 5 p[1] deposit : 6 p[3] deposit : 7 p[0] deposit : 9 p[2] deposit : 8 c[0] fetch : 6 p[4] deposit : 10 c[1] fetch : 7 c[3] fetch : 9 c[2] fetch : 8 c[4] fetch : 10 p[1] deposit : 11 p[3] deposit : 12 p[0] deposit : 13 p[2] deposit : 14 c[0] fetch : 11 p[4] deposit : 15 c[1] fetch : 12 c[3] fetch : 13 c[2] fetch : 14 c[4] fetch : 15 p[1] deposit : 16 p[0] deposit : 18 p[3] deposit : 17 p[2] deposit : 19 c[0] fetch : 16 p[4] deposit : 20 c[1] fetch : 18 c[3] fetch : 17 c[2] fetch : 19 c[4] fetch : 20 p[0] deposit : 21 p[1] deposit : 22 p[3] deposit : 23 p[2] deposit : 24 p[4] deposit : 25 c[0] fetch : 21 c[1] fetch : 22 c[3] fetch : 23 c[2] fetch : 24 c[4] fetch : 25 p[0] deposit : 26 c[0] fetch : 26 p[3] deposit : 29 c[4] fetch : 29 p[4] deposit : 30 c[2] fetch : 30 p[1] deposit : 27 c[1] fetch : 27 p[2] deposit : 28 c[3] fetch : 28 p[0] deposit : 31 p[3] deposit : 32 c[4] fetch : 31 c[0] fetch : 32 p[4] deposit : 33 c[2] fetch : 33 p[1] deposit : 34 c[1] fetch : 34 p[2] deposit : 35 c[3] fetch : 35 p[4] deposit : 37 c[0] fetch : 37 p[3] deposit : 38 c[2] fetch : 38 p[2] deposit : 40 p[0] deposit : 36 p[1] deposit : 39 c[4] fetch : 40 c[1] fetch : 36 c[3] fetch : 39 p[4] deposit : 41 c[0] fetch : 41 p[3] deposit : 42 c[2] fetch : 42 p[2] deposit : 43 p[0] deposit : 44 p[1] deposit : 45 c[4] fetch : 43 c[1] fetch : 44 c[3] fetch : 45 ... C-c C-c
|