Python: is os.read() / os.write() on an os.pipe() threadsafe?
Tag : python , By : Jason Jennings
Date : March 29 2020, 07:55 AM
it helps some times os.read and os.write on the two fds returned from os.pipe is threadsafe, but you appear to demand more than that. Sub (1), yes, there is no "atomicity" guarantee for sinle reads or writes -- the scenario you depict (a single short write ends up producing two reads) is entirely possible. (In general, os.whatever is a thin wrapper on operating system functionality, and it's up to the OS to ensure, or fail to ensure, the kind of functionality you require; in this case, the Posix standard doesn't require the OS to ensure this kind of "atomicity"). You're guaranteed to get all data that was written, and in the correct order, but that's it. A single write of a large piece of data might stall once it's filled the OS-supplied buffer and only proceed once some other thread has read some of the initial data (beware deadlocks, of course!), etc, etc. Sub (2), yes, the logging module is threadsafe AND "atomic" in that data produced by a single call to logging.info, logging.warn, logging.error, etc, "stays in one piece" in terms of calls to the underlying handler (however if that handler in turn uses non-atomic means such as os.write, it may still e.g. stall in the kernel until the underlying buffer gets unclogged, etc, etc, as above).
|
How can the child process read stdout from the pipe and the parent process write stdin to the pipe?
Tag : c , By : Gerhard Miller
Date : March 29 2020, 07:55 AM
I think the issue was by ths following , The result of fork() is that one process becomes two (by asexual reproduction). So while it is still the case that exactly one branch of the if/else block will be taken in a process, there are two processes, and one path will be taken by each. More specifically, look at what fork() returns: a PID to the parent, and 0 to the new child. Apart from that the two processes are almost identical. So the if (cpid == 0) check is a common pattern after fork() so that you can proceed with distinct logic in each process. In your case, that's reading in one process and writing in the other.
|
Trying to create an asynchronous example using Core.Async.Pipe, but Pipe.write seems to block awaiting a Pipe.read
Date : March 29 2020, 07:55 AM
may help you . Indeed, pipe is a queue, but by default its length is set to 0. So that, when you're pushbacking, a producer will stop immediately and wait. You can control the size with a set_size_budget function.
|
Read blocking even closing the Write ends
Date : March 29 2020, 07:55 AM
Any of those help The problem is that you're not closing all your unused pipe file descriptors. For instance, in your last branch where you exec("/bin/grep", argv), you're closing p1[1] and dup2()ing p1[0], but you're not closing p[0] or p[1]. So when ls has finished writing to uniq, that pipe stays open because you still have dangling references to it. You're also not checking any of your system calls for errors, which you should. #define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main(void)
{
int p1to2[2];
int p2to3[2];
if ( pipe(p1to2) == -1 || pipe(p2to3) == -1 ) {
perror("error calling pipe()");
return EXIT_FAILURE;
}
pid_t pid;
if ( (pid = fork()) == -1 ) {
perror("error calling first fork()");
return EXIT_FAILURE;
}
else if (pid == 0) {
if ( close(p1to2[0]) == -1 ) {
perror("error calling close() on p1to2[0]");
return EXIT_FAILURE;
}
if ( p1to2[1] != STDOUT_FILENO ) {
if ( dup2(p1to2[1], STDOUT_FILENO) == -1 ) {
perror("error calling dup2() on p1to2[1]");
return EXIT_FAILURE;
}
if ( close(p1to2[1]) == -1 ) {
perror("error calling close() on p1to2[1]");
return EXIT_FAILURE;
}
}
if ( close(p2to3[0]) == -1 || close(p2to3[1]) == -1 ) {
perror("error calling close() on p2to3");
return EXIT_FAILURE;
}
char *argv[] = {"ls", "-l", NULL};
if ( execv("/bin/ls", argv) == -1 ) {
perror("couldn't execute /bin/ls");
return EXIT_FAILURE;
}
} else {
if ( (pid = fork()) == -1 ) {
perror("error calling second fork()");
return EXIT_FAILURE;
}
else if ( pid == 0 ) {
if ( close(p1to2[1]) == -1 ) {
perror("error calling close() on p1to2[1]");
return EXIT_FAILURE;
}
if ( p1to2[0] != STDIN_FILENO ) {
if ( dup2(p1to2[0], STDIN_FILENO) == -1 ) {
perror("error calling dup2() on p1to2[0]");
return EXIT_FAILURE;
}
if ( close(p1to2[0]) == -1 ) {
perror("error calling close() on p1to2[0]");
return EXIT_FAILURE;
}
}
if ( close(p2to3[0]) == -1 ) {
perror("error calling close() on p2to3[0]");
return EXIT_FAILURE;
}
if ( p2to3[1] != STDOUT_FILENO ) {
if ( dup2(p2to3[1], STDOUT_FILENO) == -1 ) {
perror("error calling dup2() on p2to3[1]");
return EXIT_FAILURE;
}
if ( close(p2to3[1]) == -1 ) {
perror("error calling close() on p2to3[1]");
return EXIT_FAILURE;
}
}
char *argv[] = {"uniq", NULL};
if ( execv("/usr/bin/uniq", argv) == -1 ) {
perror("couldn't execute /usr/bin/uniq");
return EXIT_FAILURE;
}
} else {
if ( close(p1to2[0]) == -1 || close(p1to2[1]) == -1 ) {
perror("error calling close() on p1to2");
return EXIT_FAILURE;
}
if ( close(p2to3[1]) == -1 ){
perror("error calling close() on p2to3[1]");
return EXIT_FAILURE;
}
if ( p2to3[0] != STDIN_FILENO ) {
if ( dup2(p2to3[0], STDIN_FILENO) == -1 ) {
perror("error calling dup2() on p2to3[0]");
return EXIT_FAILURE;
}
if ( close(p2to3[0]) == -1 ) {
perror("error calling close() on p2to3[0]");
return EXIT_FAILURE;
}
}
char *argv[] = {"grep", "pipes", NULL};
if ( execv("/usr/bin/grep", argv) == -1 ) {
perror("couldn't execute /usr/bin/grep");
return EXIT_FAILURE;
}
}
}
}
paul@horus:~/src/sandbox$ ./pipes
-rwxr-xr-x 1 paul staff 8812 Oct 25 12:21 pipes
-rw-r--r-- 1 paul staff 3817 Oct 25 12:21 pipes.c
-rw------- 1 paul staff 660 Oct 25 11:03 pipes.c.BAK
paul@horus:~/src/sandbox$
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
void make_pipes(int * p, ...);
pid_t fork_or_die(void);
void close_pipe_pair(int * p);
void make_std_reader(int * p);
void make_std_writer(int * p);
void execv_or_die(const char * path, char * const argv[]);
/* Main function */
int main(void)
{
pid_t pid;
int p1to2[2], p2to3[2];
make_pipes(p1to2, p2to3, NULL);
if ( (pid = fork_or_die()) == 0 ) {
make_std_writer(p1to2);
close_pipe_pair(p2to3);
char * args[] = {"ls", "-l", NULL};
execv_or_die("/bin/ls", args);
} else {
if ( (pid = fork_or_die()) == 0 ) {
make_std_reader(p1to2);
make_std_writer(p2to3);
char * args[] = {"uniq", NULL};
execv_or_die("/usr/bin/uniq", args);
} else {
close_pipe_pair(p1to2);
make_std_reader(p2to3);
char * args[] = {"grep", "pipes", NULL};
execv_or_die("/usr/bin/grep", args);
}
}
}
/* Creates a pipe for each array in the NULL terminated arg list */
void make_pipes(int * p, ...)
{
va_list ap;
va_start(ap, p);
while ( p ) {
if ( pipe(p) == -1 ) {
perror("error calling pipe()");
exit(EXIT_FAILURE);
}
p = va_arg(ap, int *);
}
va_end(ap);
}
/* Calls fork() and exits on error */
pid_t fork_or_die(void)
{
pid_t p = fork();
if ( p == -1 ) {
perror("error calling fork()");
exit(EXIT_FAILURE);
}
return p;
}
/* Closes a pipe pair and exits on error */
void close_pipe_pair(int * p)
{
if ( close(p[0]) == -1 || close(p[1]) == -1 ) {
perror("error calling close() in close_pipe_pair()");
exit(EXIT_FAILURE);
}
}
/* Closes the write end of a pipe and duplicates
* the read end into STDIN_FILENO, exiting on error */
void make_std_reader(int * p)
{
static const int read_end = 0;
static const int write_end = 1;
if ( close(p[write_end]) == -1 ) {
perror("error calling close() in make_std_reader()");
exit(EXIT_FAILURE);
}
if ( p[read_end] != STDIN_FILENO ) {
if ( dup2(p[read_end], STDIN_FILENO) == -1 ) {
perror("error calling dup2() in make_std_reader()");
exit(EXIT_FAILURE);
}
if ( close(p[read_end]) == -1 ) {
perror("error calling close() in make_std_reader()");
exit(EXIT_FAILURE);
}
}
}
/* Closes the read end of a pipe and duplicates
* the write end into STDOUT_FILENO, exiting on error */
void make_std_writer(int * p)
{
static const int read_end = 0;
static const int write_end = 1;
if ( close(p[read_end]) == -1 ) {
perror("error calling close() in make_std_writer()");
exit(EXIT_FAILURE);
}
if ( p[write_end] != STDOUT_FILENO ) {
if ( dup2(p[write_end], STDOUT_FILENO) == -1 ) {
perror("error calling dup2() in make_std_writer()");
exit(EXIT_FAILURE);
}
if ( close(p[write_end]) == -1 ) {
perror("error calling close() in make_std_writer()");
exit(EXIT_FAILURE);
}
}
}
/* Calls execv() and exits on error */
void execv_or_die(const char * path, char * const argv[])
{
if ( execv(path, argv) == -1 ) {
perror("error calling execv()");
exit(EXIT_FAILURE);
}
}
|
read integer from user and write into child pipe and then parent pipe read from it
Tag : cpp , By : Ronnie Carlin
Date : March 29 2020, 07:55 AM
To fix this issue There are several issues here: You're prompting for N in the child process, however the parent process is the one that receives input from the terminal. So by the time you enter "6", the parent process has exited and you're entering that value to the shell. Move the printf and scanf to before the fork and you should be able to read that value properly. Your case 1 will not be entered for the case when fork() returns from the parent, which seems to be your intent. fork() returns the pid of the child to the parent. Because the special init process has pid 1, this will never be true. Change case 1: to default to process the parent process. You're not using break at the end of your switch cases. In C, switch cases "fall through", meaning that once the statement for one case are complete, it will continue running the statements for the following case. The break statement at the end of each case prevents that from happening. #include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<fcntl.h>
#define BUFSIZE 80
int main()
{
int fd[2],n,i,h;
pid_t pid; // to capture the child pid
char buf[BUFSIZE];
// prompt for N before forking
printf("Enter N :");
scanf("%d",&n);
pipe(fd);
switch((pid=fork())) // saving the child pid in case we want to use it later
{
case -1 :
printf("Fork Error");
exit(0); // no break needed here because of exit
case 0 :
close(fd[0]);
write(fd[1],&n,sizeof(n));
close(fd[1]);
break; // end of case 0
default :
close(fd[1]);
n=read(fd[0],&h,sizeof(h));
printf("h=%d\n",h);
for(i=1;i<=h;i++)
{
if(i%2==1)
{
//write(1,&i,n);
printf("%d\n",i);
}
}
close(fd[0]);
break; // end of default case
}
exit(0);
}
|