Foro de elhacker.net

Programación => Programación C/C++ => Mensaje iniciado por: julio1 en 3 Abril 2019, 18:35 pm



Título: condicion de carrera c++ threads
Publicado por: julio1 en 3 Abril 2019, 18:35 pm
Estoy mirando el problema del productor/consumidor con threads en c+11. Pues bien, el siguiente código me da un problema con helgrind detecta una condicion de carrera y debuggeando un poco creo que el problema esta en la funcion consumer pero no estoy seguro y no se como resolverlo.
Código:
#include <iostream>
#include <sstream>
#include <vector>
#include <stack>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <chrono>
using namespace std;
 
// print function for "thread safe" printing using a stringstream
void print(ostream& s) { cout << s.rdbuf(); cout.flush(); s.clear(); }
 
const int num_producers = 5;
const int num_consumers = 10;
const int producer_delay_to_produce = 10;   // in miliseconds
const int consumer_delay_to_consume = 30;   // in miliseconds
const int consumer_max_wait_time = 200;     // in miliseconds - max time that a consumer can wait for a product to be produced.
const int max_production = 10;              // When producers has produced this quantity they will stop to produce
const int max_products = 10;                // Maximum number of products that can be stored
 
atomic<int> num_producers_working(0);       // When there's no producer working the consumers will stop, and the program will stop.
stack<int> products;                        // The products stack, here we will store our products
mutex xmutex;                               // Our mutex, without this mutex our program will cry
 
condition_variable is_not_full;             // to indicate that our stack is not full between the thread operations
condition_variable is_not_empty;            // to indicate that our stack is not empty between the thread operations
 

void produce(int producer_id)
{
        unique_lock<mutex> lock(xmutex);
        int product;
 
        is_not_full.wait(lock, [] { return products.size() != max_products; });
        product = products.size();
        products.push(product);
 
        print(stringstream() << "Producer " << producer_id << " produced " << product << "\n");
        is_not_empty.notify_one();
}
 
//      Consume function, consumer_id will consume a product
void consume(int consumer_id)
{
        unique_lock<mutex> lock(xmutex);
        int product;
 
        if(is_not_empty.wait_for(lock, chrono::milliseconds(consumer_max_wait_time),
                [] { return products.size() > 0; }))
        {
                product = products.top();
                products.pop();
 
                print(stringstream() << "Consumer " << consumer_id << " consumed " << product << "\n");
                is_not_full.notify_one();
        }
}
 
void producer(int id)
{
        ++num_producers_working;
        for(int i = 0; i < max_production; ++i)
        {
                produce(id);
                this_thread::sleep_for(chrono::milliseconds(producer_delay_to_produce));
        }
 
        print(stringstream() << "Producer " << id << " has exited\n");
        --num_producers_working;
}
 
void consumer(int id)
{
        // Wait until there is any producer working
        while(num_producers_working == 0) this_thread::yield();
 
        while(num_producers_working != 0 || products.size() > 0)
        {
                consume(id);
                this_thread::sleep_for(chrono::milliseconds(consumer_delay_to_consume));
        }
 
        print(stringstream() << "Consumer " << id << " has exited\n");
}
 
int main()
{
        vector<thread> producers_and_consumers;
 
        // Create producers
        for(int i = 0; i < num_producers; ++i)
                producers_and_consumers.push_back(thread(producer, i));
 
        // Create consumers
        for(int i = 0; i < num_consumers; ++i)
                producers_and_consumers.push_back(thread(consumer, i));
 
        // Wait for consumers and producers to finish
        for(auto& t : producers_and_consumers)
                t.join();
}


Título: Re: condicion de carrera c++ threads
Publicado por: Loretz en 3 Abril 2019, 21:06 pm
No tengo el valgrind y estoy con el Visual Studio, pero veo dos cosas:

1) [que no tiene nada que ver con tu pregunta, pero...] La función void print(ostream& s) no debería poder invocarse con un argumento temporal (un prvalue), como en
Código:
print(stringstream() << "Producer " << producer_id << " produced " << product << "\n");
Por alguna extraña razón tu compilador lo permite. Supongo que puedes sobrecargar la función print con una versión que acepte una rvalue reference:
Citar
void print(ostream&& s) {
   cout << s.rdbuf() << flush;
   s.clear();
}


Y 2) en
Código
  1. is_not_full.wait(lock, [] { return products.size() != max_products; });
el predicado
Citar
products.size() != max_products;
no garantiza que el lock no se levante debido a un wakeup espurio, haciendo que se intente ejecutar el push() por dos o más threads simultáneamente.



Título: Re: condicion de carrera c++ threads
Publicado por: julio1 en 3 Abril 2019, 21:18 pm
Con respecto a la primera estoy de acuerdo contigo, la verdad es que no me había fijado, es curioso porque no estoy en un compilador raro estoy usando g++ en su versión 7, y porque g++ permitirá esto? Si mal no recuerdo al tomar una referencia el parametro de una función se puede llamar como un rvalue si el parámetro es const.
Y yendo a la segunda nunca había oído hablar de un wakeup espurio.


Título: Re: condicion de carrera c++ threads
Publicado por: Loretz en 3 Abril 2019, 21:31 pm
Citar
wait causes the current thread to block until the condition variable is notified or a spurious wakeup occurs,

https://en.cppreference.com/w/cpp/thread/condition_variable/wait


Título: Re: condicion de carrera c++ threads
Publicado por: julio1 en 3 Abril 2019, 21:36 pm
Gracias, lo miro ahora. Lo que dijiste de la funcion print en el visual studio te da error al compilar o te compila sin problemas?


Título: Re: condicion de carrera c++ threads
Publicado por: Loretz en 3 Abril 2019, 21:43 pm
Citar
Lo que dijiste de la funcion print en el visual studio te da error al compilar o te compila sin problemas?
Me da error al compilar.


Título: Re: condicion de carrera c++ threads
Publicado por: julio1 en 4 Abril 2019, 00:00 am
He solucionado el problema de la condición de carrera, helgrind ya no se queja, la culpa la tenía el método size en la línea while(num_producers_working != 0 || products.size() > 0)
la solución más simple pasa por meter a size dentro de una sección crítica:
Código:
size_t stack_size(const stack<int> &obj ){
    lock_guard<mutex> guardian(xmutex);
    return obj.size();

}
Modificando así la línea anterior ahora: while(num_producers_working != 0 || stack_size(products) > 0)


Título: Re: condicion de carrera c++ threads
Publicado por: dijsktra en 4 Abril 2019, 15:01 pm
Que bueno que la gente se meta con problemas de concurrente!

He solucionado el problema de la condición de carrera, helgrind ya no se queja, la culpa la tenía el método size en la línea while(num_producers_working != 0 || products.size() > 0)...

Cuidado! Yo no tengo ni idea de helgrind, no lo había oido en mi vida, pero la depuración en concurrente es algo impracticable! Necesitas un cálculo para razonar sobre la corrección de un programa, con invariantes, etc...
Conviene tener princpios sólidos sobre los que montar tus soluciones concurrentes.
Un buen libro: "Concurrent Programming. Gregory R. Andrews".
No vas a aprender nada de C++, pero si mucho de programaci'on concurrente!

Algunas observaciones a tu programa:

  • Productores consumidores se ejecutan indefinidamente. De otro modo, puedes llegar a bloqueos, (imagina 1 productor y 1000 consumidores, a 1 items cada uno... Es obvio que los consumidores se quedarán esperando indefinidamente
  • El buffer, o es un escalar, o un buffer limitado. Nada se gana con los stacks... Interesa FIFO, no LIFO
  • Nada se gana leyendo los manuales de la libreria <thread> o <condition variable> si no se conoce en que consisten los monitores



Código
  1. #include <cassert>
  2. #include <iostream>
  3. #include <vector> // as bounded buffer.
  4. #include <thread>
  5. #include <mutex>
  6. #include <condition_variable>
  7. #include <chrono>
  8. #include <atomic>
  9. #include <cassert>
  10.  
  11. #define MAX_THREADS 1000
  12. #define SIZE_BUFFER 5
  13. #define TIMEOUT  200
  14.  
  15. using namespace std;
  16.  
  17. // Patemeters overwritten on input.
  18. static int P = 5; // Producers
  19. static int C = 10; // Consumers
  20. static int P_DELAY=1000 ; // Prod-Delay
  21. static int C_DELAY=500;
  22.  
  23.  
  24. // MONITOR VARIABLES
  25. static mutex xmutex;                    // Our mutex, monitor methods in exclusion
  26. static condition_variable if_full;             // to indicate that our stack is
  27. // not full  between the thread operations
  28. static condition_variable if_empty;            // to indicate that our stack is
  29. // not empty between the thread operations
  30.  
  31. static int front = 0;
  32. static int rear = 0;
  33. static vector<int>  buf(SIZE_BUFFER);
  34. static int holes = SIZE_BUFFER;
  35. static int items = 0;
  36. // Monitor invariants:
  37. // inv. items =  (front "-" rear)
  38. // inv. holes = SIZE_BUFFER - (front "-" rear)
  39. // inv. 0 <= front < SIZE_BUFFER, 0 <= rear < SIZE_BUFFER
  40.  
  41. static atomic<int> m(0); // simulating a fresh message;
  42.  
  43. // MONITOR METHODS
  44. void init()
  45. {
  46.  front=rear=items=0;
  47.  holes = SIZE_BUFFER;
  48. }
  49.  
  50. void deposit(const int id,const int msg)
  51. {
  52. unique_lock<mutex> lock(xmutex);
  53.  
  54. // await holes > 0
  55. if_full.wait(lock, [] { return holes > 0; });
  56. buf[rear] = msg; rear = ((rear+1) % SIZE_BUFFER);
  57. cout << "p[" << id << "]\tdeposit\t:\t" << msg << endl;
  58. holes--; items++;
  59. if_empty.notify_all();
  60. }
  61.  
  62. //      Consume function, consumer_id will consume a product
  63. int fetch(const int id)
  64. {
  65. unique_lock<mutex> lock(xmutex);
  66. int mess;
  67. // await items > 0
  68. if_empty.wait(lock, [] { return items > 0; });
  69. mess = buf[front]; front = ((front+1) % SIZE_BUFFER);
  70. cout << "c[" << id << "]\t\t\t\t\tfetch\t:\t" << mess << endl;
  71. holes++;items--;
  72. if_full.notify_all();
  73. return mess;
  74. }
  75.  
  76.  
  77. // Real code for threads.
  78. void producer(int id)
  79. {
  80. for (; 1; )
  81. {
  82. deposit(id, ++m);
  83. this_thread::sleep_for(chrono::milliseconds(P_DELAY));
  84. }
  85. return;
  86. }
  87.  
  88. void consumer(int id)
  89. {
  90. int msg;
  91. for (; 1; )
  92. {
  93. msg = fetch(id);
  94. this_thread::sleep_for(chrono::milliseconds(C_DELAY));
  95. }
  96. return ;
  97. }
  98.  
  99.  
  100. void simulation()
  101. {
  102. vector<thread> threads;
  103.  
  104. init();
  105. // Create producers
  106. for (int p = 0; p < P; ++p)
  107.  threads.push_back(thread(producer, p));
  108.  
  109. // Create consumers
  110. for (int c = 0; c < C; ++c)
  111.  threads.push_back(thread(consumer, c));
  112.  
  113. // Wait for consumers and producers to finish
  114. for (auto& t : threads)
  115. t.join();
  116. }
  117.  
  118.  
  119. // IO staff.
  120. int main(int argc, char *args[])
  121. {
  122.  cout << "C P P_DELAY C_DELAY " << endl;
  123.  for( ; cin >> C >> P >> P_DELAY >> C_DELAY; )
  124.    {
  125.      assert((C<MAX_THREADS) && (P<MAX_THREADS) );
  126.      assert((P_DELAY > 0) && (C_DELAY > 0) );
  127.      simulation();
  128.    }
  129.  return 0;
  130. }
  131.  
  132.  

Aqui un ejemplo de salida. Tu puedes experimientar variando , el numero de p/c, la velocidad relativa...

Con independencia de la velocidad relativa, dentro de un mismo grupo no está determinado el orden de acceso al monitor. Así,  es posible que el productor del mensaje 9 tenga acceso al monitor antes que el productor del mensaje 8.  Eso sí, entre dos consumidores el primero en acceder extraerá el mnesaje 9, y el segundo el 8, por ser FIFO.



Código:
g++ conc.cpp -o main -lpthread && ./main
C P P_DELAY C_DELAY
5 5 500 500
p[1] deposit : 1
p[2] deposit : 2
p[0] deposit : 3
p[3] deposit : 4
c[0] fetch : 1
p[4] deposit : 5
c[1] fetch : 2
c[3] fetch : 3
c[2] fetch : 4
c[4] fetch : 5
p[1] deposit : 6
p[3] deposit : 7
p[0] deposit : 9
p[2] deposit : 8
c[0] fetch : 6
p[4] deposit : 10
c[1] fetch : 7
c[3] fetch : 9
c[2] fetch : 8
c[4] fetch : 10
p[1] deposit : 11
p[3] deposit : 12
p[0] deposit : 13
p[2] deposit : 14
c[0] fetch : 11
p[4] deposit : 15
c[1] fetch : 12
c[3] fetch : 13
c[2] fetch : 14
c[4] fetch : 15
p[1] deposit : 16
p[0] deposit : 18
p[3] deposit : 17
p[2] deposit : 19
c[0] fetch : 16
p[4] deposit : 20
c[1] fetch : 18
c[3] fetch : 17
c[2] fetch : 19
c[4] fetch : 20
p[0] deposit : 21
p[1] deposit : 22
p[3] deposit : 23
p[2] deposit : 24
p[4] deposit : 25
c[0] fetch : 21
c[1] fetch : 22
c[3] fetch : 23
c[2] fetch : 24
c[4] fetch : 25
p[0] deposit : 26
c[0] fetch : 26
p[3] deposit : 29
c[4] fetch : 29
p[4] deposit : 30
c[2] fetch : 30
p[1] deposit : 27
c[1] fetch : 27
p[2] deposit : 28
c[3] fetch : 28
p[0] deposit : 31
p[3] deposit : 32
c[4] fetch : 31
c[0] fetch : 32
p[4] deposit : 33
c[2] fetch : 33
p[1] deposit : 34
c[1] fetch : 34
p[2] deposit : 35
c[3] fetch : 35
p[4] deposit : 37
c[0] fetch : 37
p[3] deposit : 38
c[2] fetch : 38
p[2] deposit : 40
p[0] deposit : 36
p[1] deposit : 39
c[4] fetch : 40
c[1] fetch : 36
c[3] fetch : 39
p[4] deposit : 41
c[0] fetch : 41
p[3] deposit : 42
c[2] fetch : 42
p[2] deposit : 43
p[0] deposit : 44
p[1] deposit : 45
c[4] fetch : 43
c[1] fetch : 44
c[3] fetch : 45
...
C-c C-c