Linux 非阻塞 fifo(按需日志记录)

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/7360473/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-05 06:06:04  来源:igfitidea点击:

Linux non-blocking fifo (on demand logging)

linuxbashloggingfifo

提问by dronus

I like to log a programs output 'on demand'. Eg. the output is logged to the terminal, but another process can hook on the current output at any time.

我喜欢记录“按需”输出的程序。例如。输出记录到终端,但另一个进程可以随时挂接当前输出。

The classic way would be:

经典的方法是:

myprogram 2>&1 | tee /tmp/mylog

and on demand

和按需

tail /tmp/mylog

However, this would create a ever growing log file even if not used until the drive runs out of space. So my attempt was:

但是,即使在驱动器空间用完之前不使用,这也会创建一个不断增长的日志文件。所以我的尝试是:

mkfifo /tmp/mylog
myprogram 2>&1 | tee /tmp/mylog

and on demand

和按需

cat /tmp/mylog

Now I can read /tmp/mylog at any time. However, any output blocks the program until the /tmp/mylog is read. I like the fifo to flush any incoming data not read back. How to do that?

现在我可以随时阅读 /tmp/mylog。但是,在读取 /tmp/mylog 之前,任何输出都会阻止程序。我喜欢 fifo 刷新任何未读回的传入数据。怎么做?

采纳答案by racic

Inspired by your question I've written a simple program that will let you do this:

受您问题的启发,我编写了一个简单的程序,可以让您执行以下操作:

$ myprogram 2>&1 | ftee /tmp/mylog

$ myprogram 2>&1 | ftee /tmp/mylog

It behaves similarly to teebut clones the stdin to stdout and to a named pipe (a requirement for now) without blocking. This means that if you want to log this way it may happen that you're gonna lose your log data, but I guess it's acceptable in your scenario. The trick is to block SIGPIPE signal and to ignore error on writing to a broken fifo. This sample may be optimized in various ways of course, but so far, it does the job I guess.

它的行为类似于tee但将 stdin 克隆到 stdout 和命名管道(目前需要)而不会阻塞。这意味着如果您想以这种方式记录日志,您可能会丢失日志数据,但我想这在您的场景中是可以接受的。诀窍是阻止 SIGPIPE 信号并忽略写入损坏的 fifo 时的错误。当然,这个示例可能会以各种方式进行优化,但到目前为止,我猜它可以完成这项工作。

/* ftee - clone stdin to stdout and to a named pipe 
(c) racic@stackoverflow
WTFPL Licence */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>

int main(int argc, char *argv[])
{
    int readfd, writefd;
    struct stat status;
    char *fifonam;
    char buffer[BUFSIZ];
    ssize_t bytes;

    signal(SIGPIPE, SIG_IGN);

    if(2!=argc)
    {
        printf("Usage:\n someprog 2>&1 | %s FIFO\n FIFO - path to a"
            " named pipe, required argument\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    fifonam = argv[1];

    readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
    if(-1==readfd)
    {
        perror("ftee: readfd: open()");
        exit(EXIT_FAILURE);
    }

    if(-1==fstat(readfd, &status))
    {
        perror("ftee: fstat");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    if(!S_ISFIFO(status.st_mode))
    {
        printf("ftee: %s in not a fifo!\n", fifonam);
        close(readfd);
        exit(EXIT_FAILURE);
    }

    writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
    if(-1==writefd)
    {
        perror("ftee: writefd: open()");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    close(readfd);

    while(1)
    {
        bytes = read(STDIN_FILENO, buffer, sizeof(buffer));
        if (bytes < 0 && errno == EINTR)
            continue;
        if (bytes <= 0)
            break;

        bytes = write(STDOUT_FILENO, buffer, bytes);
        if(-1==bytes)
            perror("ftee: writing to stdout");
        bytes = write(writefd, buffer, bytes);
        if(-1==bytes);//Ignoring the errors
    }
    close(writefd); 
    return(0);
}

You can compile it with this standard command:

您可以使用以下标准命令编译它:

$ gcc ftee.c -o ftee

$ gcc ftee.c -o ftee

You can quickly verify it by running e.g.:

您可以通过运行例如:

$ ping www.google.com | ftee /tmp/mylog

$ ping www.google.com | ftee /tmp/mylog

$ cat /tmp/mylog

$ cat /tmp/mylog

Also note - this is no multiplexer. You can only have one process doing $ cat /tmp/mylogat a time.

另请注意 - 这不是多路复用器。一次只能执行一个进程$ cat /tmp/mylog

回答by MattH

However, this would create a ever growing log file even if not used until the drive runs out of space.

但是,即使在驱动器空间用完之前不使用,这也会创建一个不断增长的日志文件。

Why not periodically rotate the logs? There's even a program to do it for you logrotate.

为什么不定期轮换日志?甚至有一个程序可以为你做这件事logrotate

There's also a system for generating log messages and doing different things with them according to type. It's called syslog.

还有一个系统用于生成日志消息并根据类型对它们做不同的事情。它被称为syslog

You could even combine the two. Have your program generate syslog messages, configure syslog to place them in a file and use logrotate to ensure they don't fill the disk.

你甚至可以将两者结合起来。让您的程序生成 syslog 消息,配置 syslog 将它们放在一个文件中,并使用 logrotate 确保它们不会填满磁盘。



If it turned out that you were writing for a small embedded system and the program's output is heavy there are a variety of techniques you might consider.

如果事实证明您正在为一个小型嵌入式系统编写程序并且程序的输出很重,那么您可能会考虑多种技术。

  • Remote syslog: send the syslog messages to a syslog server on the network.
  • Use the severity levels availble in syslog to do different things with the messages. E.g. discard "INFO" but log and forward "ERR" or greater. E.g. to console
  • Use a signal handler in your program to reread configuration on HUP and vary log generation "on demand" this way.
  • Have your program listen on a unix socket and write messages down it when open. You could even implement and interactive console into your program this way.
  • Using a configuration file, provide granular control of logging output.
  • 远程系统日志:将系统日志消息发送到网络上的系统日志服务器。
  • 使用 syslog 中可用的严重性级别对消息执行不同的操作。例如丢弃“INFO”但记录并转发“ERR”或更大的。例如安慰
  • 在您的程序中使用信号处理程序重新读取 HUP 上的配置并以这种方式“按需”改变日志生成。
  • 让您的程序侦听 unix 套接字并在打开时将消息写下来。您甚至可以通过这种方式在您的程序中实现和交互式控制台。
  • 使用配置文件,提供对日志输出的精细控制。

回答by holygeek

If you can install screen on the embedded device then you can run 'myprogram' in it and detach it, and reattach it anytime you want to see the log. Something like:

如果您可以在嵌入式设备上安装屏幕,那么您可以在其中运行“myprogram”并将其分离,并在您想要查看日志的任何时候重新附加它。就像是:

$ screen -t sometitle myprogram
Hit Ctrl+A, then d to detach it.

Whenever you want to see the output, reattach it:

每当您想查看输出时,请重新附加它:

$ screen -DR sometitle
Hit Ctrl-A, then d to detach it again.

This way you won't have to worry about the program output using disk space at all.

这样您就不必担心使用磁盘空间的程序输出。

回答by dronus

BusyBox often used on embedded devices can create a ram buffered log by

嵌入式设备上常用的 BusyBox 可以通过以下方式创建 ram 缓冲日志

syslogd -C

which can be filled by

可以填写

logger

and read by

并阅读

logread

Works quite well, but only provides one global log.

工作得很好,但只提供一个全局日志。

回答by chad

The problem with the given fifoapproach is that the whole thing will hang when the pipe buffer is getting filled up and no reading process is taking place.

给定fifo方法的问题在于,当管道缓冲区被填满并且没有进行读取过程时,整个事情都会挂起。

For the fifoapproach to work I think you would have to implement a named pipe client-server model similar to the one mentioned in BASH: Best architecture for reading from two input streams(see slightly modified code below, sample code 2).

对于fifo工作方法,我认为您必须实现一个类似于BASH 中提到的命名管道客户端-服务器模型:从两个输入流读取的最佳架构(请参阅下面稍微修改的代码,示例代码 2)。

For a workaround you could also use a while ... readconstruct instead of teeing stdout to a named pipe by implementing a counting mechanism inside the while ... readloop that will overwrite the log file periodically by a specified number of lines. This would prevent an ever growing log file (sample code 1).

对于变通方法,您还可以通过在循环内实现计数机制来使用while ... read构造而不是将teestdout 连接到命名管道,该机制while ... read将按指定的行数定期覆盖日志文件。这将阻止不断增长的日志文件(示例代码 1)。

# sample code 1

# terminal window 1
rm -f /tmp/mylog
touch /tmp/mylog
while sleep 2; do date '+%Y-%m-%d_%H.%M.%S'; done 2>&1 | while IFS="" read -r line; do 
  lno=$((lno+1))
  #echo $lno
  array[${lno}]="${line}"
  if [[ $lno -eq 10 ]]; then
    lno=$((lno+1))
    array[${lno}]="-------------"
    printf '%s\n' "${array[@]}" > /tmp/mylog
    unset lno array
  fi
  printf '%s\n' "${line}"
done

# terminal window 2
tail -f /tmp/mylog


#------------------------


# sample code 2

# code taken from: 
# https://stackoverflow.com/questions/6702474/bash-best-architecture-for-reading-from-two-input-streams
# terminal window 1

# server
(
rm -f /tmp/to /tmp/from
mkfifo /tmp/to /tmp/from
while true; do 
  while IFS="" read -r -d $'\n' line; do 
    printf '%s\n' "${line}"
  done </tmp/to >/tmp/from &
  bgpid=$!
  exec 3>/tmp/to
  exec 4</tmp/from
  trap "kill -TERM $bgpid; exit" 0 1 2 3 13 15
  wait "$bgpid"
  echo "restarting..."
done
) &
serverpid=$!
#kill -TERM $serverpid

# client
(
exec 3>/tmp/to;
exec 4</tmp/from;
while IFS="" read -r -d $'\n' <&4 line; do
  if [[ "${line:0:1}" == $'7' ]]; then 
    printf 'line from stdin: %s\n' "${line:1}"  > /dev/null
  else       
    printf 'line from fifo: %s\n' "$line"       > /dev/null
  fi
done &
trap "kill -TERM $"'!; exit' 1 2 3 13 15
while IFS="" read -r -d $'\n' line; do
  # can we make it atomic?
  # sleep 0.5
  # dd if=/tmp/to iflag=nonblock of=/dev/null  # flush fifo
  printf '7%s\n' "${line}"
done >&3
) &
# kill -TERM $!


# terminal window 2
# tests
echo hello > /tmp/to
yes 1 | nl > /tmp/to
yes 1 | nl | tee /tmp/to
while sleep 2; do date '+%Y-%m-%d_%H.%M.%S'; done 2>&1 | tee -a /tmp/to


# terminal window 3
cat /tmp/to | head -n 10

回答by Fabraxias

This is a (very) old thread, but I've run into a similar problem of late. In fact, what I needed is a cloning of stdin to stdout with a copy to a pipe that is non blocking. the proposed ftee in the first answer really helped there, but was (for my use case) too volatile. Meaning I lost data I could have processed if I had gotten to it in time.

这是一个(非常)旧的线程,但我最近遇到了类似的问题。事实上,我需要的是将 stdin 克隆到 stdout 并复制到非阻塞管道。第一个答案中提议的 ftee 确实有帮助,但(对于我的用例)太不稳定了。这意味着我丢失了如果我及时处理本可以处理的数据。

The scenario I was faced with is that I have a process (some_process) that aggregates some data and writes its results every three seconds to stdout. The (simplified) setup looked like this (in the real setup I am using a named pipe):

我遇到的情况是,我有一个进程 (some_process) 聚合一些数据并将其结果每三秒写入一次标准输出。(简化的)设置看起来像这样(在实际设置中,我使用的是命名管道):

some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz

Now, raw_data.gz has to be compressed and has to be complete. ftee does this job very well. But the pipe I am using in the middle was too slow to grab the data flushed out - but it was fast enough to process everything if it could get to it, which was tested with a normal tee. However, a normal tee blocks if anything happens to the unnamed pipe, and as I want to be able to hook in on demand, tee is not an option. Back to the topic: It got better when I put a buffer in between, resulting in:

现在,raw_data.gz 必须被压缩并且必须是完整的。ftee 很好地完成了这项工作。但是我在中间使用的管道太慢了,无法获取刷新的数据 - 但是如果可以到达它,它足以处理所有内容,这是用普通 T 恤测试过的。但是,如果未命名的管道发生任何事情,普通的发球台会阻塞,并且由于我希望能够按需连接,发球台不是一种选择。回到主题:当我在两者之间放置一个缓冲区时它变得更好了,结果是:

some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz

But that was still losing data I could have processed. So I went ahead and extended the ftee proposed before to a buffered version (bftee). It still has all the same properties, but uses an (inefficient ?) internal buffer in case a write fails. It still loses data if the buffer runs full, but it works beautifully for my case. As always there is a lot of room for improvement, but as I copied the code off of here I'd like to share it back to people that might have a use for it.

但这仍然丢失了我可以处理的数据。所以我继续将之前提出的 ftee 扩展为缓冲版本 (bftee)。它仍然具有所有相同的属性,但使用(低效?)内部缓冲区以防写入失败。如果缓冲区已满,它仍然会丢失数据,但它对我的情况来说效果很好。与往常一样,仍有很大的改进空间,但是当我从这里复制代码时,我想将其分享给可能会使用它的人。

/* bftee - clone stdin to stdout and to a buffered, non-blocking pipe 
    (c) racic@stackoverflow
    (c) fabraxias@stackoverflow
    WTFPL Licence */

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <fcntl.h>
    #include <errno.h>
    #include <signal.h>
    #include <unistd.h>

    // the number of sBuffers that are being held at a maximum
    #define BUFFER_SIZE 4096
    #define BLOCK_SIZE 2048

    typedef struct {
      char data[BLOCK_SIZE];
      int bytes;
    } sBuffer;

    typedef struct {
      sBuffer *data;  //array of buffers
      int bufferSize; // number of buffer in data
      int start;      // index of the current start buffer
      int end;        // index of the current end buffer
      int active;     // number of active buffer (currently in use)
      int maxUse;     // maximum number of buffers ever used
      int drops;      // number of discarded buffer due to overflow
      int sWrites;    // number of buffer written to stdout
      int pWrites;    // number of buffers written to pipe
    } sQueue;

    void InitQueue(sQueue*, int);              // initialized the Queue
    void PushToQueue(sQueue*, sBuffer*, int);  // pushes a buffer into Queue at the end 
    sBuffer *RetrieveFromQueue(sQueue*);       // returns the first entry of the buffer and removes it or NULL is buffer is empty
    sBuffer *PeakAtQueue(sQueue*);             // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer
    void ShrinkInQueue(sQueue *queue, int);    // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty
    void DelFromQueue(sQueue *queue);          // removes the first entry of the queue

    static void sigUSR1(int);                  // signal handled for SUGUSR1 - used for stats output to stderr
    static void sigINT(int);                   // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ?

    sQueue queue;                              // Buffer storing the overflow
    volatile int quit;                         // for quiting the main loop

    int main(int argc, char *argv[])
    {   
        int readfd, writefd;
        struct stat status;
        char *fifonam;
        sBuffer buffer;
        ssize_t bytes;
        int bufferSize = BUFFER_SIZE;

        signal(SIGPIPE, SIG_IGN);
        signal(SIGUSR1, sigUSR1);
        signal(SIGTERM, sigINT);
        signal(SIGINT,  sigINT);

        /** Handle commandline args and open the pipe for non blocking writing **/

        if(argc < 2 || argc > 3)
        {   
            printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n"
                   "FIFO - path to a named pipe, required argument\n"
                   "BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]);
            exit(EXIT_FAILURE);
        }

        fifonam = argv[1];
        if (argc == 3) {
          bufferSize = atoi(argv[2]);
          if (bufferSize == 0) bufferSize = BUFFER_SIZE;
        }

        readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
        if(-1==readfd)
        {   
            perror("bftee: readfd: open()");
            exit(EXIT_FAILURE);
        }

        if(-1==fstat(readfd, &status))
        {
            perror("bftee: fstat");
            close(readfd);
            exit(EXIT_FAILURE);
        }

        if(!S_ISFIFO(status.st_mode))
        {
            printf("bftee: %s in not a fifo!\n", fifonam);
            close(readfd);
            exit(EXIT_FAILURE);
        }

        writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
        if(-1==writefd)
        {
            perror("bftee: writefd: open()");
            close(readfd);
            exit(EXIT_FAILURE);
        }

        close(readfd);


        InitQueue(&queue, bufferSize);
        quit = 0;

        while(!quit)
        {
            // read from STDIN
            bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data));

            // if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading
            if (bytes < 0 && errno == EINTR) continue;
            if (bytes <= 0) break;

            // save the number if read bytes in the current buffer to be processed
            buffer.bytes = bytes;

            // this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux
            // thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less.
            bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes);
            queue.sWrites++;

            if(-1==bytes) {
                perror("ftee: writing to stdout");
                break;
            }

            sBuffer *tmpBuffer = NULL;

            // if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write
            // the buffered data to the pipe. This continues until the Buffer is empty or the write fails.
            // NOTE: bytes cannot be -1  (that would have failed just before) when the loop is entered. 
            while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) {
               // write the oldest buffer to the pipe
               bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);

               // the  written bytes are equal to the buffer size, the write is successful - remove the buffer and continue
               if (bytes == tmpBuffer->bytes) {
                 DelFromQueue(&queue);
                 queue.pWrites++;
               } else if (bytes > 0) {
                 // on a positive bytes value there was a partial write. we shrink the current buffer
                 //  and handle this as a write failure
                 ShrinkInQueue(&queue, bytes);
                 bytes = -1;
               }
            }
            // There are several cases here:
            // 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe
            // 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data
            // 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped.
            if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes);

            // again, there are several cases what can happen here
            // 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens
            // 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue
            // 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue
            if (bytes != buffer.bytes)
              PushToQueue(&queue, &buffer, bytes);
            else 
              queue.pWrites++;
        }

        // once we are done with STDIN, try to flush the buffer to the named pipe
        if (queue.active > 0) {
           //set output buffer to block - here we wait until we can write everything to the named pipe
           // --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell. 
           int saved_flags = fcntl(writefd, F_GETFL);
           int new_flags = saved_flags & ~O_NONBLOCK;
           int res = fcntl(writefd, F_SETFL, new_flags);

           sBuffer *tmpBuffer = NULL;
           //TODO: this does not handle partial writes yet
           while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) {
             int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);
             if (bytes != -1) DelFromQueue(&queue);
           }
        }

        close(writefd);

    }


    /** init a given Queue **/
    void InitQueue (sQueue *queue, int bufferSize) {
      queue->data = calloc(bufferSize, sizeof(sBuffer));
      queue->bufferSize = bufferSize;
      queue->start = 0;
      queue->end = 0;
      queue->active = 0;
      queue->maxUse = 0;
      queue->drops = 0;
      queue->sWrites = 0;
      queue->pWrites = 0;
    }

    /** push a buffer into the Queue**/
    void PushToQueue(sQueue *queue, sBuffer *p, int offset)
    {

        if (offset < 0) offset = 0;      // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead
        if (offset == p->bytes) return;  // in this case there are 0 bytes to add to the queue. Nothing to write

        // this should never happen - offset cannot be bigger than the buffer itself. Panic action
        if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);}

        // debug output on a partial write. TODO: remove this line
        // if (offset > 0 ) fprintf(stderr, "partial write to buffer\n");

        // copy the data from the buffer into the queue and remember its size
        memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset);
        queue->data[queue->end].bytes = p->bytes - offset;

        // move the buffer forward
        queue->end = (queue->end + 1) % queue->bufferSize;

        // there is still space in the buffer
        if (queue->active < queue->bufferSize)
        {
            queue->active++;
            if (queue->active > queue->maxUse) queue->maxUse = queue->active;
        } else {
            // Overwriting the oldest. Move start to next-oldest
            queue->start = (queue->start + 1) % queue->bufferSize;
            queue->drops++;
        }
    }

    /** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/
    sBuffer *RetrieveFromQueue(sQueue *queue)
    {
        if (!queue->active) { return NULL; }

        queue->start = (queue->start + 1) % queue->bufferSize;
        queue->active--;
        return &(queue->data[queue->start]);
    }

    /** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/
    sBuffer *PeakAtQueue(sQueue *queue)
    {
        if (!queue->active) { return NULL; }
        return &(queue->data[queue->start]);
    }

    /*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/
    void ShrinkInQueue(sQueue *queue, int bytes) {

      // cannot remove negative amount of bytes - this is an error case. Ignore it
      if (bytes <= 0) return;

      // remove the entry if the offset is equal to the buffer size
      if (queue->data[queue->start].bytes == bytes) {
        DelFromQueue(queue);
        return;
      };

      // this is a partial delete
      if (queue->data[queue->start].bytes > bytes) {
        //shift the memory by the offset
        memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes);
        queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes;
        return;
      }

      // panic is the are to remove more than we have the buffer
      if (queue->data[queue->start].bytes < bytes) {
        perror("we wrote more than we had - this should never happen\n");
        exit(EXIT_FAILURE);
        return;
      }
    }

    /** delete the oldest entry from the queue. Do nothing if the Queue is empty **/
    void DelFromQueue(sQueue *queue)
    {
        if (queue->active > 0) {
          queue->start = (queue->start + 1) % queue->bufferSize;
          queue->active--;
        }
    }

    /** Stats output on SIGUSR1 **/
    static void sigUSR1(int signo) {
      fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops);
    }

    /** handle signal for terminating **/
    static void sigINT(int signo) {
      quit++;
      if (quit > 1) exit(EXIT_FAILURE);
    }

This version takes one more (optional) argument which specifies the number of the blocks that are to buffered for the pipe. My sample call now looks like this:

此版本采用另一个(可选)参数,该参数指定要为管道缓冲的块数。我的示例调用现在看起来像这样:

some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz

resulting in 16384 blocks to be buffered before discards happen. this uses about 32 Mbyte more memory, but... who cares ?

导致在丢弃发生之前缓冲 16384 个块。这使用了大约 32 MB 的内存,但是...谁在乎?

Of course, in the real environment I am using a named pipe so that I can attach and detach as needed. There is looks like this:

当然,在实际环境中,我使用的是命名管道,以便我可以根据需要进行附加和分离。有这样的:

mkfifo named_pipe
some_process | bftee named_pipe 16384 | gzip > raw_data.gz &
cat named_pipe | onlineAnalysis.pl > results

Also, the process reacts on signals as follows: SIGUSR1 -> print counters to STDERR SIGTERM, SIGINT -> first exits the main loop and flushed the buffer to the pipe, the second terminated the program immediatly.

此外,该进程对信号的反应如下:SIGUSR1 -> 打印计数器到 STDERR SIGTERM,SIGINT -> 首先退出主循环并将缓冲区刷新到管道,第二个立即终止程序。

Maybe this helps someone in the future... Enjoy

也许这可以帮助将来的某个人......享受

回答by Piotr Dobrogost

It seems like bash <>redirection operator (3.6.10 Opening File Descriptors for Reading and WritingSee) makes writing to file/fifo opened with it non-blocking. This should work:

似乎 bash<>重定向操作符(3.6.10 Opening File Descriptors for Reading and WritingSee)使写入文件/fifo 以非阻塞方式打开。这应该有效:

$ mkfifo /tmp/mylog
$ exec 4<>/tmp/mylog
$ myprogram 2>&1 | tee >&4
$ cat /tmp/mylog # on demend

Solution given by gniourf_gniourfon #bash IRC channel.

gniourf_gniourf在#bash IRC 频道上给出的解决方案。

回答by teknopaul

If your process writes to any log file and then wipes the file and starts again every now and again, so it doesn't get too big, or uses logrotate.

如果您的进程写入任何日志文件,然后擦除该文件并时不时地重新启动,则它不会变得太大,或者使用logrotate.

tail --follow=name --retry my.log

Is all you need. You will get as much scroll-back as your terminal.

是你所需要的全部。您将获得与终端一样多的回滚。

Nothing non standard is needed. I've not tried it with small log files but all our logs rotate like this and I have never noticed loosing lines.

不需要任何非标准。我没有用小日志文件尝试过它,但我们所有的日志都像这样旋转,我从来没有注意到丢失的行。

回答by wally

The logging could be directed to a UDP socket. Since UDP is connection-less, it won't block the sending program. Of course logs will be lost if the receiver or network can't keep up.

日志记录可以定向到 UDP 套接字。由于 UDP 是无连接的,它不会阻止发送程序。当然,如果接收方或网络跟不上,日志就会丢失。

myprogram 2>&1 | socat - udp-datagram:localhost:3333

Then when you want to observe the logging:

然后当你想观察日志时:

socat udp-recv:3333 -

There are some other cool benefits like being able to attach multiple listeners at the same time or broadcast to multiple devices.

还有一些其他很酷的好处,比如能够同时附加多个侦听器或广播到多个设备。

回答by Tox

To follow in Fabraxias foot steps I'm going to share my small modification of racic's code. In one of my use cases I needed to suppress the writes to STDOUT, so I've added another parameter: swallow_stdout. If that is not0, then output to STDOUTwill be turned off.

为了跟随 Fabraxias 的脚步,我将分享我对 racic 代码的小修改。在我的一个用例中,我需要抑制对 的写入STDOUT,因此我添加了另一个参数:swallow_stdout. 如果不是0,则输出 toSTDOUT将被关闭。

Since I'm no Ccoder I've added comments while reading the code, maybe they are useful for others.

由于我不是C编码员,因此我在阅读代码时添加了注释,也许它们对其他人有用。

/* ftee - clone stdin to stdout and to a named pipe 
(c) racic@stackoverflow
WTFPL Licence */

// gcc /tmp/ftee.c -o /usr/local/bin/ftee

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>

int main(int argc, char *argv[])
{
    int readfd, writefd;        // read & write file descriptors
    struct stat status;         // read file descriptor status
    char *fifonam;              // name of the pipe
    int swallow_stdout;         // 0 = write to STDOUT
    char buffer[BUFSIZ];        // read/write buffer
    ssize_t bytes;              // bytes read/written

    signal(SIGPIPE, SIG_IGN);   

    if(3!=argc)
    {
        printf("Usage:\n someprog 2>&1 | %s [FIFO] [swallow_stdout] \n" 
            "FIFO           - path to a named pipe (created beforehand with mkfifo), required argument\n"
            "swallow_stdout - 0 = output to PIPE and STDOUT, 1 = output to PIPE only, required argument\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    fifonam = argv[1];
    swallow_stdout = atoi(argv[2]);

    readfd = open(fifonam, O_RDONLY | O_NONBLOCK);  // open read file descriptor in non-blocking mode

    if(-1==readfd)  // read descriptor error!
    {
        perror("ftee: readfd: open()");
        exit(EXIT_FAILURE);
    }

    if(-1==fstat(readfd, &status)) // read descriptor status error! (?)
    {
        perror("ftee: fstat");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    if(!S_ISFIFO(status.st_mode)) // read descriptor is not a FIFO error!
    {
        printf("ftee: %s in not a fifo!\n", fifonam);
        close(readfd);
        exit(EXIT_FAILURE);
    }

    writefd = open(fifonam, O_WRONLY | O_NONBLOCK); // open write file descriptor non-blocking
    if(-1==writefd) // write file descriptor error!
    {
        perror("ftee: writefd: open()");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    close(readfd); // reading complete, close read file descriptor

    while(1) // infinite loop
    {
        bytes = read(STDIN_FILENO, buffer, sizeof(buffer)); // read STDIN into buffer
        if (bytes < 0 && errno == EINTR)
            continue;   // skip over errors

        if (bytes <= 0) 
            break; // no more data coming in or uncaught error, let's quit since we can't write anything

        if (swallow_stdout == 0)
            bytes = write(STDOUT_FILENO, buffer, bytes); // write buffer to STDOUT
        if(-1==bytes) // write error!
            perror("ftee: writing to stdout");
        bytes = write(writefd, buffer, bytes); // write a copy of the buffer to the write file descriptor
        if(-1==bytes);// ignore errors
    }
    close(writefd); // close write file descriptor
    return(0); // return exit code 0
}