elhacker.net cabecera Bienvenido(a), Visitante. Por favor Ingresar o Registrarse
¿Perdiste tu email de activación?.

 

 


Tema destacado: Sigue las noticias más importantes de seguridad informática en el Twitter! de elhacker.NET


+  Foro de elhacker.net
|-+  Programación
| |-+  Programación C/C++ (Moderadores: Eternal Idol, Littlehorse, K-YreX)
| | |-+  condicion de carrera c++ threads
0 Usuarios y 1 Visitante están viendo este tema.
Páginas: [1] Ir Abajo Respuesta Imprimir
Autor Tema: condicion de carrera c++ threads  (Leído 2,301 veces)
julio1

Desconectado Desconectado

Mensajes: 9


Ver Perfil
condicion de carrera c++ threads
« en: 3 Abril 2019, 18:35 pm »

Estoy mirando el problema del productor/consumidor con threads en c+11. Pues bien, el siguiente código me da un problema con helgrind detecta una condicion de carrera y debuggeando un poco creo que el problema esta en la funcion consumer pero no estoy seguro y no se como resolverlo.
Código:
#include <iostream>
#include <sstream>
#include <vector>
#include <stack>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <chrono>
using namespace std;
 
// print function for "thread safe" printing using a stringstream
void print(ostream& s) { cout << s.rdbuf(); cout.flush(); s.clear(); }
 
const int num_producers = 5;
const int num_consumers = 10;
const int producer_delay_to_produce = 10;   // in miliseconds
const int consumer_delay_to_consume = 30;   // in miliseconds
const int consumer_max_wait_time = 200;     // in miliseconds - max time that a consumer can wait for a product to be produced.
const int max_production = 10;              // When producers has produced this quantity they will stop to produce
const int max_products = 10;                // Maximum number of products that can be stored
 
atomic<int> num_producers_working(0);       // When there's no producer working the consumers will stop, and the program will stop.
stack<int> products;                        // The products stack, here we will store our products
mutex xmutex;                               // Our mutex, without this mutex our program will cry
 
condition_variable is_not_full;             // to indicate that our stack is not full between the thread operations
condition_variable is_not_empty;            // to indicate that our stack is not empty between the thread operations
 

void produce(int producer_id)
{
        unique_lock<mutex> lock(xmutex);
        int product;
 
        is_not_full.wait(lock, [] { return products.size() != max_products; });
        product = products.size();
        products.push(product);
 
        print(stringstream() << "Producer " << producer_id << " produced " << product << "\n");
        is_not_empty.notify_one();
}
 
//      Consume function, consumer_id will consume a product
void consume(int consumer_id)
{
        unique_lock<mutex> lock(xmutex);
        int product;
 
        if(is_not_empty.wait_for(lock, chrono::milliseconds(consumer_max_wait_time),
                [] { return products.size() > 0; }))
        {
                product = products.top();
                products.pop();
 
                print(stringstream() << "Consumer " << consumer_id << " consumed " << product << "\n");
                is_not_full.notify_one();
        }
}
 
void producer(int id)
{
        ++num_producers_working;
        for(int i = 0; i < max_production; ++i)
        {
                produce(id);
                this_thread::sleep_for(chrono::milliseconds(producer_delay_to_produce));
        }
 
        print(stringstream() << "Producer " << id << " has exited\n");
        --num_producers_working;
}
 
void consumer(int id)
{
        // Wait until there is any producer working
        while(num_producers_working == 0) this_thread::yield();
 
        while(num_producers_working != 0 || products.size() > 0)
        {
                consume(id);
                this_thread::sleep_for(chrono::milliseconds(consumer_delay_to_consume));
        }
 
        print(stringstream() << "Consumer " << id << " has exited\n");
}
 
int main()
{
        vector<thread> producers_and_consumers;
 
        // Create producers
        for(int i = 0; i < num_producers; ++i)
                producers_and_consumers.push_back(thread(producer, i));
 
        // Create consumers
        for(int i = 0; i < num_consumers; ++i)
                producers_and_consumers.push_back(thread(consumer, i));
 
        // Wait for consumers and producers to finish
        for(auto& t : producers_and_consumers)
                t.join();
}


« Última modificación: 3 Abril 2019, 18:37 pm por julio1 » En línea

Loretz

Desconectado Desconectado

Mensajes: 117


Ver Perfil
Re: condicion de carrera c++ threads
« Respuesta #1 en: 3 Abril 2019, 21:06 pm »

No tengo el valgrind y estoy con el Visual Studio, pero veo dos cosas:

1) [que no tiene nada que ver con tu pregunta, pero...] La función void print(ostream& s) no debería poder invocarse con un argumento temporal (un prvalue), como en
Código:
print(stringstream() << "Producer " << producer_id << " produced " << product << "\n");
Por alguna extraña razón tu compilador lo permite. Supongo que puedes sobrecargar la función print con una versión que acepte una rvalue reference:
Citar
void print(ostream&& s) {
   cout << s.rdbuf() << flush;
   s.clear();
}


Y 2) en
Código
  1. is_not_full.wait(lock, [] { return products.size() != max_products; });
el predicado
Citar
products.size() != max_products;
no garantiza que el lock no se levante debido a un wakeup espurio, haciendo que se intente ejecutar el push() por dos o más threads simultáneamente.



En línea

julio1

Desconectado Desconectado

Mensajes: 9


Ver Perfil
Re: condicion de carrera c++ threads
« Respuesta #2 en: 3 Abril 2019, 21:18 pm »

Con respecto a la primera estoy de acuerdo contigo, la verdad es que no me había fijado, es curioso porque no estoy en un compilador raro estoy usando g++ en su versión 7, y porque g++ permitirá esto? Si mal no recuerdo al tomar una referencia el parametro de una función se puede llamar como un rvalue si el parámetro es const.
Y yendo a la segunda nunca había oído hablar de un wakeup espurio.
« Última modificación: 3 Abril 2019, 21:23 pm por julio1 » En línea

Loretz

Desconectado Desconectado

Mensajes: 117


Ver Perfil
Re: condicion de carrera c++ threads
« Respuesta #3 en: 3 Abril 2019, 21:31 pm »

Citar
wait causes the current thread to block until the condition variable is notified or a spurious wakeup occurs,

https://en.cppreference.com/w/cpp/thread/condition_variable/wait
En línea

julio1

Desconectado Desconectado

Mensajes: 9


Ver Perfil
Re: condicion de carrera c++ threads
« Respuesta #4 en: 3 Abril 2019, 21:36 pm »

Gracias, lo miro ahora. Lo que dijiste de la funcion print en el visual studio te da error al compilar o te compila sin problemas?
En línea

Loretz

Desconectado Desconectado

Mensajes: 117


Ver Perfil
Re: condicion de carrera c++ threads
« Respuesta #5 en: 3 Abril 2019, 21:43 pm »

Citar
Lo que dijiste de la funcion print en el visual studio te da error al compilar o te compila sin problemas?
Me da error al compilar.
En línea

julio1

Desconectado Desconectado

Mensajes: 9


Ver Perfil
Re: condicion de carrera c++ threads
« Respuesta #6 en: 4 Abril 2019, 00:00 am »

He solucionado el problema de la condición de carrera, helgrind ya no se queja, la culpa la tenía el método size en la línea while(num_producers_working != 0 || products.size() > 0)
la solución más simple pasa por meter a size dentro de una sección crítica:
Código:
size_t stack_size(const stack<int> &obj ){
    lock_guard<mutex> guardian(xmutex);
    return obj.size();

}
Modificando así la línea anterior ahora: while(num_producers_working != 0 || stack_size(products) > 0)
« Última modificación: 4 Abril 2019, 00:09 am por julio1 » En línea

dijsktra

Desconectado Desconectado

Mensajes: 110


Mr Edsger Dijsktra (Tribute to)


Ver Perfil
Re: condicion de carrera c++ threads
« Respuesta #7 en: 4 Abril 2019, 15:01 pm »

Que bueno que la gente se meta con problemas de concurrente!

He solucionado el problema de la condición de carrera, helgrind ya no se queja, la culpa la tenía el método size en la línea while(num_producers_working != 0 || products.size() > 0)...

Cuidado! Yo no tengo ni idea de helgrind, no lo había oido en mi vida, pero la depuración en concurrente es algo impracticable! Necesitas un cálculo para razonar sobre la corrección de un programa, con invariantes, etc...
Conviene tener princpios sólidos sobre los que montar tus soluciones concurrentes.
Un buen libro: "Concurrent Programming. Gregory R. Andrews".
No vas a aprender nada de C++, pero si mucho de programaci'on concurrente!

Algunas observaciones a tu programa:

  • Productores consumidores se ejecutan indefinidamente. De otro modo, puedes llegar a bloqueos, (imagina 1 productor y 1000 consumidores, a 1 items cada uno... Es obvio que los consumidores se quedarán esperando indefinidamente
  • El buffer, o es un escalar, o un buffer limitado. Nada se gana con los stacks... Interesa FIFO, no LIFO
  • Nada se gana leyendo los manuales de la libreria <thread> o <condition variable> si no se conoce en que consisten los monitores



Código
  1. #include <cassert>
  2. #include <iostream>
  3. #include <vector> // as bounded buffer.
  4. #include <thread>
  5. #include <mutex>
  6. #include <condition_variable>
  7. #include <chrono>
  8. #include <atomic>
  9. #include <cassert>
  10.  
  11. #define MAX_THREADS 1000
  12. #define SIZE_BUFFER 5
  13. #define TIMEOUT  200
  14.  
  15. using namespace std;
  16.  
  17. // Patemeters overwritten on input.
  18. static int P = 5; // Producers
  19. static int C = 10; // Consumers
  20. static int P_DELAY=1000 ; // Prod-Delay
  21. static int C_DELAY=500;
  22.  
  23.  
  24. // MONITOR VARIABLES
  25. static mutex xmutex;                    // Our mutex, monitor methods in exclusion
  26. static condition_variable if_full;             // to indicate that our stack is
  27. // not full  between the thread operations
  28. static condition_variable if_empty;            // to indicate that our stack is
  29. // not empty between the thread operations
  30.  
  31. static int front = 0;
  32. static int rear = 0;
  33. static vector<int>  buf(SIZE_BUFFER);
  34. static int holes = SIZE_BUFFER;
  35. static int items = 0;
  36. // Monitor invariants:
  37. // inv. items =  (front "-" rear)
  38. // inv. holes = SIZE_BUFFER - (front "-" rear)
  39. // inv. 0 <= front < SIZE_BUFFER, 0 <= rear < SIZE_BUFFER
  40.  
  41. static atomic<int> m(0); // simulating a fresh message;
  42.  
  43. // MONITOR METHODS
  44. void init()
  45. {
  46.  front=rear=items=0;
  47.  holes = SIZE_BUFFER;
  48. }
  49.  
  50. void deposit(const int id,const int msg)
  51. {
  52. unique_lock<mutex> lock(xmutex);
  53.  
  54. // await holes > 0
  55. if_full.wait(lock, [] { return holes > 0; });
  56. buf[rear] = msg; rear = ((rear+1) % SIZE_BUFFER);
  57. cout << "p[" << id << "]\tdeposit\t:\t" << msg << endl;
  58. holes--; items++;
  59. if_empty.notify_all();
  60. }
  61.  
  62. //      Consume function, consumer_id will consume a product
  63. int fetch(const int id)
  64. {
  65. unique_lock<mutex> lock(xmutex);
  66. int mess;
  67. // await items > 0
  68. if_empty.wait(lock, [] { return items > 0; });
  69. mess = buf[front]; front = ((front+1) % SIZE_BUFFER);
  70. cout << "c[" << id << "]\t\t\t\t\tfetch\t:\t" << mess << endl;
  71. holes++;items--;
  72. if_full.notify_all();
  73. return mess;
  74. }
  75.  
  76.  
  77. // Real code for threads.
  78. void producer(int id)
  79. {
  80. for (; 1; )
  81. {
  82. deposit(id, ++m);
  83. this_thread::sleep_for(chrono::milliseconds(P_DELAY));
  84. }
  85. return;
  86. }
  87.  
  88. void consumer(int id)
  89. {
  90. int msg;
  91. for (; 1; )
  92. {
  93. msg = fetch(id);
  94. this_thread::sleep_for(chrono::milliseconds(C_DELAY));
  95. }
  96. return ;
  97. }
  98.  
  99.  
  100. void simulation()
  101. {
  102. vector<thread> threads;
  103.  
  104. init();
  105. // Create producers
  106. for (int p = 0; p < P; ++p)
  107.  threads.push_back(thread(producer, p));
  108.  
  109. // Create consumers
  110. for (int c = 0; c < C; ++c)
  111.  threads.push_back(thread(consumer, c));
  112.  
  113. // Wait for consumers and producers to finish
  114. for (auto& t : threads)
  115. t.join();
  116. }
  117.  
  118.  
  119. // IO staff.
  120. int main(int argc, char *args[])
  121. {
  122.  cout << "C P P_DELAY C_DELAY " << endl;
  123.  for( ; cin >> C >> P >> P_DELAY >> C_DELAY; )
  124.    {
  125.      assert((C<MAX_THREADS) && (P<MAX_THREADS) );
  126.      assert((P_DELAY > 0) && (C_DELAY > 0) );
  127.      simulation();
  128.    }
  129.  return 0;
  130. }
  131.  
  132.  

Aqui un ejemplo de salida. Tu puedes experimientar variando , el numero de p/c, la velocidad relativa...

Con independencia de la velocidad relativa, dentro de un mismo grupo no está determinado el orden de acceso al monitor. Así,  es posible que el productor del mensaje 9 tenga acceso al monitor antes que el productor del mensaje 8.  Eso sí, entre dos consumidores el primero en acceder extraerá el mnesaje 9, y el segundo el 8, por ser FIFO.



Código:
g++ conc.cpp -o main -lpthread && ./main
C P P_DELAY C_DELAY
5 5 500 500
p[1] deposit : 1
p[2] deposit : 2
p[0] deposit : 3
p[3] deposit : 4
c[0] fetch : 1
p[4] deposit : 5
c[1] fetch : 2
c[3] fetch : 3
c[2] fetch : 4
c[4] fetch : 5
p[1] deposit : 6
p[3] deposit : 7
p[0] deposit : 9
p[2] deposit : 8
c[0] fetch : 6
p[4] deposit : 10
c[1] fetch : 7
c[3] fetch : 9
c[2] fetch : 8
c[4] fetch : 10
p[1] deposit : 11
p[3] deposit : 12
p[0] deposit : 13
p[2] deposit : 14
c[0] fetch : 11
p[4] deposit : 15
c[1] fetch : 12
c[3] fetch : 13
c[2] fetch : 14
c[4] fetch : 15
p[1] deposit : 16
p[0] deposit : 18
p[3] deposit : 17
p[2] deposit : 19
c[0] fetch : 16
p[4] deposit : 20
c[1] fetch : 18
c[3] fetch : 17
c[2] fetch : 19
c[4] fetch : 20
p[0] deposit : 21
p[1] deposit : 22
p[3] deposit : 23
p[2] deposit : 24
p[4] deposit : 25
c[0] fetch : 21
c[1] fetch : 22
c[3] fetch : 23
c[2] fetch : 24
c[4] fetch : 25
p[0] deposit : 26
c[0] fetch : 26
p[3] deposit : 29
c[4] fetch : 29
p[4] deposit : 30
c[2] fetch : 30
p[1] deposit : 27
c[1] fetch : 27
p[2] deposit : 28
c[3] fetch : 28
p[0] deposit : 31
p[3] deposit : 32
c[4] fetch : 31
c[0] fetch : 32
p[4] deposit : 33
c[2] fetch : 33
p[1] deposit : 34
c[1] fetch : 34
p[2] deposit : 35
c[3] fetch : 35
p[4] deposit : 37
c[0] fetch : 37
p[3] deposit : 38
c[2] fetch : 38
p[2] deposit : 40
p[0] deposit : 36
p[1] deposit : 39
c[4] fetch : 40
c[1] fetch : 36
c[3] fetch : 39
p[4] deposit : 41
c[0] fetch : 41
p[3] deposit : 42
c[2] fetch : 42
p[2] deposit : 43
p[0] deposit : 44
p[1] deposit : 45
c[4] fetch : 43
c[1] fetch : 44
c[3] fetch : 45
...
C-c C-c



« Última modificación: 8 Abril 2019, 14:18 pm por dijsktra » En línea

Si la depuración es el proceso de eliminar fallos en el software, entonces programar debe ser el proceso de ponerlos dentro. (Edsger Dijsktra)
Páginas: [1] Ir Arriba Respuesta Imprimir 

Ir a:  

Mensajes similares
Asunto Iniciado por Respuestas Vistas Último mensaje
Duda_con condicion if
Java
Tuplado 3 2,078 Último mensaje 28 Diciembre 2012, 17:01 pm
por 1mpuls0
ayuda con condicion en php
PHP
General Dmitry Vergadoski 4 1,607 Último mensaje 25 Septiembre 2015, 02:20 am
por #!drvy
condición IF en consulta SQL
Bases de Datos
JonaLamper 2 4,169 Último mensaje 23 Septiembre 2016, 15:27 pm
por Pablo Videla
Duda en una condición if
Java
JonaLamper 1 1,602 Último mensaje 6 Noviembre 2016, 15:17 pm
por ivancea96
Condicion con cadenas (gets)
Programación C/C++
Reyna_esr 2 1,191 Último mensaje 28 Octubre 2018, 02:28 am
por Beginner Web
WAP2 - Aviso Legal - Powered by SMF 1.1.21 | SMF © 2006-2008, Simple Machines