#include <u.h>
#include <libc.h>
#include <ureg.h>
#include "linuxsys.h"
#include "linux.h"
typedef struct Buf Buf;
typedef struct Bufq Bufq;
typedef struct Buffd Buffd;
struct Buf
{
Buf *next;
uchar *bp;
uchar *ep;
uchar data[];
};
struct Bufq
{
Buf *head;
Buf **tail;
};
struct Buffd
{
QLock lock;
int fd;
int pid;
int shutdown;
int eof;
int error;
Bufq rq;
Rendez queuewait;
Rendez readwait;
Rendez killwait;
};
enum
{
MAXREADSIZE = 1024*8,
MAXQUEUESIZE = 1024*64,
};
static int
queuesize(Bufq *q)
{
Buf *b;
int n;
n = 0;
for(b=q->head; b; b=b->next)
n += (b->ep - b->bp);
return n;
}
static void
flushqueue(Bufq *q)
{
Buf *b, *x;
for(b = q->head; b; b = x){
x = b->next;
free(b);
}
q->head = nil;
q->tail = &q->head;
}
static void
killbuffd(Buffd *bfd)
{
if(bfd->fd != -1){
bfd->fd = -1;
if(bfd->pid != -1){
int pid;
pid = bfd->pid;
rwakeup(&bfd->queuewait);
rwakeup(&bfd->readwait);
if(!postnote(PNPROC, pid, "rocktimer"))
rsleep(&bfd->killwait);
}
}
}
static void
destroybuffdtag(void *tag)
{
Buffd *bfd;
bfd = *fdtagp(tag);
*fdtagp(tag) = nil;
qlock(&bfd->lock);
killbuffd(bfd);
flushqueue(&bfd->rq);
qunlock(&bfd->lock);
free(bfd);
}
static void
forkbuffdtag(void *tag)
{
Buffd *bfd;
bfd = *fdtagp(tag);
*fdtagp(tag) = nil;
flushqueue(&bfd->rq);
free(bfd);
unlinkfdtag(tag);
}
static int
readprocnote(void *, char *msg)
{
if(threadp->pid != 0)
return 0;
if(strcmp(msg, "rocktimer")==0)
return 1;
return 0;
}
static void
readproc(void *aux)
{
Buffd *bfd;
Buf *b;
int n;
int z;
int fd;
bfd = aux;
z = 0;
b = nil;
qlock(&bfd->lock);
fd = bfd->fd;
if(bfd->fd < 0)
goto die;
qunlock(&bfd->lock);
for(;;){
if(b==nil){
b = malloc(sizeof(*b) + MAXREADSIZE);
if(b == nil){
qlock(&bfd->lock);
bfd->error = -ENOMEM;
goto die;
}
}
n = read(fd, b->data, MAXREADSIZE);
qlock(&bfd->lock);
if(bfd->fd < 0)
goto die;
if(n == 0){
if(++z <= 3){
qunlock(&bfd->lock);
continue;
}
bfd->eof = 1;
epollevent(fd, POLLIN, 0);
goto die;
}
z = 0;
if(n < 0){
int e;
switch(e = mkerror()){
case -ESHUTDOWN:
bfd->eof = 1;
break;
default:
bfd->error = e;
}
epollevent(fd, POLLIN, 0);
goto die;
}
if(realloc(b, sizeof(*b) + n) == nil){
bfd->error = -ENOMEM;
goto die;
}
b->ep = b->bp = b->data;
b->ep += n;
b->next = nil;
*bfd->rq.tail = b;
bfd->rq.tail = &b->next;
b = nil;
epollevent(fd, POLLIN, 0);
rwakeup(&bfd->readwait);
if(queuesize(&bfd->rq) > MAXQUEUESIZE)
rsleep(&bfd->queuewait);
if(bfd->fd < 0)
goto die;
qunlock(&bfd->lock);
}
die:
bfd->pid = -1;
rwakeup(&bfd->killwait);
rwakeup(&bfd->readwait);
qunlock(&bfd->lock);
if(b)
free(b);
}
static void
buffdproc(void *aux)
{
Buffd *bfd;
bfd = aux;
threadp->pid = 0;
atnotify(readprocnote, 1);
readproc(bfd);
}
void
buffd(int fd)
{
void *tag;
Buffd *bfd;
if((tag = openfdtag(fd, TAG_BUFFD, 1))==nil)
return;
if(*fdtagp(tag) != nil){
closefdtag(tag);
return;
}
bfd = malloc(sizeof(*bfd));
memset(bfd, 0, sizeof(*bfd));
qlock(&bfd->lock);
bfd->fd = fd;
bfd->pid = -1;
bfd->shutdown = 0;
bfd->eof = 0;
bfd->error = 0;
bfd->rq.head = nil;
bfd->rq.tail = &bfd->rq.head;
bfd->queuewait.l = &bfd->lock;
bfd->readwait.l = &bfd->lock;
bfd->killwait.l = &bfd->lock;
bfd->pid =
createxproc(buffdproc, bfd, RFMEM|RFPROC, 16 * 1024);
*fdtagp(tag) = bfd;
atdestroyfdtag(tag, destroybuffdtag);
atforkfdtag(tag, forkbuffdtag);
qunlock(&bfd->lock);
closefdtag(tag);
}
void
buffdpoll(int fd)
{
void *tag;
Buffd *bfd;
ulong e;
if((tag = openfdtag(fd, TAG_BUFFD, 0))==nil)
return;
if((bfd = *fdtagp(tag)) == nil)
return;
qlock(&bfd->lock);
closefdtag(tag);
e = 0;
if(!((bfd->shutdown&(1<<SHUT_WR)) || (bfd->shutdown&(1<<SHUT_RDWR))))
e |= POLLOUT;
if(bfd->error || bfd->eof || bfd->rq.head)
e |= POLLIN;
if(bfd->error)
e |= POLLRDHUP;
epollevent(fd, e, 0);
qunlock(&bfd->lock);
}
int
buffdionread(int fd)
{
void *tag;
int n;
Buffd *bfd;
tag = openfdtag(fd, TAG_BUFFD, 0);
if(tag == nil)
return -1;
bfd = (Buffd*)*fdtagp(tag);
qlock(&bfd->lock);
closefdtag(tag);
if((bfd->shutdown&(1<<SHUT_RDWR)) || (bfd->shutdown&(1<<SHUT_RD))){
n = 0;
} else {
n = queuesize(&bfd->rq);
}
qunlock(&bfd->lock);
return n;
}
void
shutdownbuffd(int fd, int how)
{
void *tag;
Buffd *bfd;
tag = openfdtag(fd, TAG_BUFFD, 0);
if(tag == nil)
return;
bfd = (Buffd*)*fdtagp(tag);
qlock(&bfd->lock);
closefdtag(tag);
bfd->shutdown = (1<<how);
if(how==SHUT_RD || how==SHUT_RDWR)
flushqueue(&bfd->rq);
qunlock(&bfd->lock);
}
int
peekbuffd(int fd, void *data, int len, int noblock)
{
int t;
void *tag;
Buffd *bfd;
Buf *b;
if((tag = openfdtag(fd, TAG_BUFFD, 0))==nil)
return -EBADF;
if((bfd = *fdtagp(tag)) == nil)
return -EBADF;
t = 0;
qlock(&bfd->lock);
closefdtag(tag);
if((bfd->shutdown&(1<<SHUT_RD)) || (bfd->shutdown&(1<<SHUT_RDWR))){
t = -EBADF;
goto out;
}
for(;;){
for(b=bfd->rq.head; (t < len) && b; b=b->next){
int x;
x = b->ep - b->bp;
if(x > len-t)
x = len-t;
memcpy(((uchar*)data) + t, b->bp, x);
t += x;
}
if(t > 0){
break;
}
if(bfd->eof){
t = 0;
break;
}
if(bfd->error || (bfd->fd < 0)){
t = bfd->error ? bfd->error : -1;
break;
}
if(noblock){
t = -EAGAIN;
break;
}
rsleep(&bfd->readwait);
}
out:
qunlock(&bfd->lock);
return t;
}
int
readbuffd(int fd, void *data, int len, int noblock)
{
int t;
void *tag;
Buffd *bfd;
Buf *b;
if((tag = openfdtag(fd, TAG_BUFFD, 0))==nil)
return -EBADF;
if((bfd = *fdtagp(tag)) == nil)
return -EBADF;
t = 0;
qlock(&bfd->lock);
closefdtag(tag);
if((bfd->shutdown&(1<<SHUT_RD)) || (bfd->shutdown&(1<<SHUT_RDWR))){
t = -EBADF;
goto out;
}
for(;;){
while((b=bfd->rq.head) && (t < len)){
int x;
x = b->ep - b->bp;
if(x > len-t)
x = len-t;
memcpy(((uchar*)data) + t, b->bp, x);
t += x;
b->bp += x;
if(b->bp == b->ep){
if((bfd->rq.head = b->next) == nil){
bfd->rq.tail = &bfd->rq.head;
}
free(b);
}
}
if(t > 0){
rwakeup(&bfd->queuewait);
break;
}
if(bfd->eof){
t = 0;
break;
}
if(bfd->error || (bfd->fd < 0)){
t = bfd->error ? bfd->error : -1;
break;
}
if(noblock){
t = -EAGAIN;
break;
}
rsleep(&bfd->readwait);
}
if(bfd->rq.head == nil)
epollevent(fd, 0, POLLIN);
out:
qunlock(&bfd->lock);
return t;
}
int
writebuffd(int fd, void *data, int len, int noblock)
{
int ret;
Buffd *bfd;
void *tag;
USED(noblock);
tag = openfdtag(fd, TAG_BUFFD, 0);
if(tag == nil)
return -EBADF;
bfd = (Buffd*)*fdtagp(tag);
qlock(&bfd->lock);
closefdtag(tag);
if((bfd->shutdown&(1<<SHUT_WR)) || (bfd->shutdown&(1<<SHUT_RDWR))){
ret = -EBADF;
goto out;
}
ret = write(fd, data, len);
if(ret != len){
epollevent(fd, POLLERR|POLLHUP, 0);
ret = mkerror();
}
out:
qunlock(&bfd->lock);
return ret;
}
|