implement Oxport;
include "sys.m";
sys: Sys;
fprint, create, stat, sprint, QTDIR, pwrite, fwstat, OTRUNC, fildes, FD, ORCLOSE, Dir,
read, DMDIR, NEWPGRP, FORKNS,
open, pctl, sleep, nulldir, fstat, pread,
dial, remove, write, OREAD, OWRITE: import sys;
include "op.m";
op: Op;
OSTAT, ODATA, NOFD, OREMOVEC, OCREATE, OMORE, Tmsg, Rmsg, MAXDATA: import op;
include "draw.m";
include "arg.m";
arg: Arg;
usage: import arg;
include "names.m";
names: Names;
isprefix, basename, cleanname, rooted : import names;
include "error.m";
err: Error;
checkload, stderr, error, kill: import err;
include "env.m";
env: Env;
getenv: import env;
include "netutil.m";
util: Netutil;
netmkaddr, authfd, Client: import util;
Oxport: module
{
init: fn(nil: ref Draw->Context, nil: list of string);
};
debug:= 0;
uname, pname : string;
debuglatency:= 0;
xfspid := -1;
readall(fd: ref FD) : array of byte
{
max : con int 128*1024; # BUG: max dir size
data := array[max] of byte;
tot := nr := 0;
do {
nr = read(fd, data[tot:], len data - tot);
if (nr > 0)
tot += nr;
} while(nr > 0 && tot < len data);
return data[0:tot];
}
fds2text(fds: array of ref FD): string
{
s := "";
for (i := 0; i < len fds; i++)
if (fds[i] != nil)
s += sprint("%d:%d ", i, fds[i].fd);
return s;
}
# Process keeping file descriptors open, during Tputs/Tgets with OMORE
# A word of caution about the protocol:
# Note that fds are a cache, to try to alert applications about gone clients.
# A Tput/Tget with an invalid fd would still work, and recreate (and return)
# a different fd. They are NOT fids.
fdsc: chan of (ref FD, chan of int); # fd start channel
fdc: chan of (int, chan of ref FD); # fd lookup channel
fdec: chan of int; # fd end channel
fdproc()
{
Incr: con 8;
nfds := 0;
fds := array[Incr] of ref FD;
for(;;){
alt {
(fd, rc) := <- fdsc =>
if (fd == nil || rc == nil)
exit;
for (i:= 0; i < len fds && fds[i] != nil; i++)
;
if (i == len fds){
newfds := array[Incr + len fds] of ref FD;
newfds[0:] = fds;
fds = newfds;
}
fds[i] = fd; nfds++;
rc <-= i;
if (debug)
fprint(stderr, "fds: %s\n", fds2text(fds));
else if (nfds > 0 && (nfds%10) == 0)
fprint(stderr, "oxport: more than %d fds\n", nfds);
(i, rc) := <- fdc =>
if (i >= 0 && i < len fds && fds[i] != nil)
rc <-= fds[i];
else
rc <-= nil;
i := <- fdec =>
if (i >= 0 && i < len fds){
fds[i] = nil; nfds--;
}
if (debug)
fprint(stderr, "fds: %s\n", fds2text(fds));
}
}
}
# To keep arg lists reasonable.
Fdsc: type chan of (ref FD, chan of int); # FD start channel
Fdc: type chan of (int, chan of ref FD); # FD channel
Fdec: type chan of int; # FD end channel.
# BUG: mode for create should come from the put, and errors on wstat
# ignored for create.
serveput(dir: string, m : ref Tmsg.Put, fdsc: Fdsc, fdc: Fdc, fdec: Fdec) : ref Rmsg
{
fd : ref FD;
mode := 8r664;
m.mode &=(OSTAT|ODATA|OCREATE|OMORE|OREMOVEC);
repfd := NOFD;
isdir := 0;
path := dir + m.path;
if ((m.mode&OSTAT) && (m.stat.mode&DMDIR) != 0 && (m.stat.mode != ~0))
isdir = 1;
# 1. setup fd
if (m.fd != NOFD){
rc := chan of ref FD;
fdc <-= (m.fd, rc);
fd = <-rc;
rc = nil;
if (m.mode&OREMOVEC){
if (fd != nil)
fdec <-= m.fd;
return ref Rmsg.Error(m.tag, "put: remove on close not in first put");
}
if (m.mode&OCREATE){ # create always releases the old fd
if (fd != nil)
fdec <-= m.fd;
fd = nil;
m.fd = NOFD;
} else if (fd == nil) # fd was lost. recreate.
m.fd = NOFD;
else if (m.mode&OMORE) # keep fd valid
repfd = m.fd;
else # last Tput for file.
fdec <-= m.fd; # Release fd. Won't close while we keep a ref.
}
if (m.fd == NOFD){
# either fd was not set (first put) or it was lost (recovered link)
# use path to setup fd.
if (m.path == nil || m.path == "" || m.path[0] != '/')
return ref Rmsg.Error(m.tag, "put: bad op file name");
omode:= 0;
if (m.mode&OREMOVEC){
if ((m.mode&OMORE) == 0)
return ref Rmsg.Error(m.tag, "put: remove on close on single put: pointless");
omode |= ORCLOSE;
}
if (m.mode&OCREATE){
if ((m.mode&OSTAT) != 0 && isdir){
mode |= DMDIR;
fd = create(path, OREAD|omode, mode);
} else {
fd = open(path, OTRUNC|OWRITE|omode);
if (fd == nil)
fd = create(path, OWRITE|omode, mode);
}
} else if (isdir)
fd = open(path, OREAD|omode);
else
fd = open(path, OWRITE|omode);
if (fd == nil)
return ref Rmsg.Error(m.tag, sprint("put: fd: %r"));
if ((m.mode&OMORE) && !isdir){
rc := chan of int;
fdsc <-= (fd, rc);
repfd = <- rc;
rc = nil;
}
}
# 2. Data and Stat I/O. Errors close the fd used for further puts.
m.mode &= (OSTAT|ODATA); # Paranoia
cnt := 0;
if ((m.mode&ODATA) != 0 && !isdir){
cnt = pwrite(fd, m.data, len m.data, m.offset);
if (cnt < 0){
if (repfd != NOFD)
fdec <-= repfd;
return ref Rmsg.Error(m.tag, sprint("pwrite: %r"));
}
}
if (m.mode&OSTAT){
d := nulldir;
d.mode = m.stat.mode;
d.name = m.stat.name;
d.uid = m.stat.uid;
d.gid = m.stat.gid;
if (fwstat(fd, d) < 0){
# Try again without chown/chgrp
# BUG: but not if it is a create. It should have updated mode.
d.uid = nil;
d.gid = nil;
if (fwstat(fd, d) < 0){
if (repfd != NOFD)
fdec <-= repfd;
return ref Rmsg.Error(m.tag, sprint("wstat: %r"));
}
}
}
(e, d) := fstat(fd); # ouch! we must issue a Tstat to get the reply qid and mtime,
if (e < 0){ # with appropriate values after any write made by us.
if (repfd != NOFD)
fdec <-= repfd;
return ref Rmsg.Error(m.tag, sprint("put: I'm finding nemo"));
}
fd = nil;
return ref Rmsg.Put(m.tag, repfd, cnt, d.qid, d.mtime);
}
serveget(dir: string, m : ref Tmsg.Get, fdsc: Fdsc, fdc: Fdc, fdec: Fdec, outc: chan of ref Rmsg)
{
fd : ref FD;
m.mode &=(OSTAT|ODATA|OMORE);
repfd := NOFD;
# 1. setup fd
path := dir + m.path;
if (m.fd != NOFD){
rc := chan of ref FD;
fdc <-= (m.fd, rc);
fd = <-rc;
rc = nil;
if (fd == nil) # fd lost, recreate
m.fd = NOFD;
else if (m.mode&OMORE) # keep fd valid
repfd = m.fd;
else
fdec <-= m.fd;
}
if (m.fd == NOFD){
# either fd was not set (first get) or it was lost (recovered link)
# use path to setup fd.
if (m.path == nil || m.path == "" || m.path[0] != '/'){
outc <-= ref Rmsg.Error(m.tag, "bad Op file name");
return;
}
fd = open(path, OREAD);
# This may report a permission denied for -wx-wx-wx files
# if this is OSTAT, we should try to send just OSTAT back, because we
# could not open the file. Only for ODATA-only messages should we report an
# error back.
if (fd == nil){
if ((m.mode&OSTAT) == 0){
outc <-= ref Rmsg.Error(m.tag, sprint("%r"));
return;
} else
m.mode = OSTAT; # clear others
}
if (m.mode&OMORE){
rc := chan of int;
fdsc <-= (fd, rc);
repfd = <-rc;
rc = nil;
}
}
d := nulldir;
e: int;
m.mode &= (ODATA|OSTAT);
if (fd == nil) # may happen for -wx files and OSTAT gets
(e, d) = stat(path);
else
(e, d) = fstat(fd);
if (e < 0){
if (repfd != NOFD)
fdec <-= repfd;
outc <-= ref Rmsg.Error(m.tag, sprint("%r"));
return;
}
d.name = basename(path, nil);
if (d.name == "")
d.name = "/";
if (m.mode == OSTAT){
outc <-= ref Rmsg.Get(m.tag, repfd, OSTAT, d, array [0] of byte);
return;
}
# We must respond with up to m.nmsgs,
# considering that m.nmsgs is infinite for directories.
# The entire sequence of directory gets must be atomic.
# OMORE must be send when there's more data
# awating for further gets.
if (m.count > MAXDATA)
m.count = MAXDATA;
if ((d.qid.qtype&QTDIR) != 0){
if (repfd != NOFD)
fdec <-= repfd;
repfd = NOFD;
data := readall(fd);
sent := 0;
rest := len data;
mode : int;
do {
nr := m.count;
mode = m.mode;
if (nr > rest)
nr = rest;
else
mode |= OMORE;
m.mode &= ~OSTAT;
outc <-= ref Rmsg.Get(m.tag, repfd, mode, d, data[sent:sent+nr]);
sent += nr;
rest -= nr;
} while(mode&OMORE);
} else {
mode : int;
do {
data := array[m.count] of byte;
nr := pread(fd, data, m.count, m.offset);
if (nr < 0){
if (repfd != NOFD)
fdec <-= NOFD;
outc <-= ref Rmsg.Error(m.tag, sprint("%r"));
return;
}
if (nr == 0){
if (repfd != NOFD)
fdec <-= NOFD;
outc <-= ref Rmsg.Get(m.tag, NOFD, m.mode, d, data[0:nr]);
return;
}
m.offset += big nr;
mode = m.mode;
if (m.offset < d.length && nr > 0)
mode |= OMORE;
m.mode &= ~OSTAT;
outc <-= ref Rmsg.Get(m.tag, repfd, mode, d, data[0:nr]);
} while(--m.nmsgs != 0 && (mode&OMORE));
}
}
Xc: type chan of (string, ref Tmsg, chan of ref Rmsg);
xc: Xc;
xabort := 0; # make xprocs exit when done
xctlproc()
{
tprocs: list of Xc;
tprocs = nil;
idlec := chan of Xc;
for(;;){
alt {
(s, t, rc) := <- xc =>
tpc: Xc;
if (t == nil){ # abort
xabort = 1;
for (; tprocs != nil; tprocs = tl tprocs)
(hd tprocs) <-= (nil, nil, nil);
exit;
}
if (tprocs != nil){
tpc = hd tprocs;
tprocs = tl tprocs;
} else {
tpc = chan of (string, ref Tmsg, chan of ref Rmsg);
spawn xproc(tpc, idlec);
}
tpc <-= (s, t, rc);
tpc := <- idlec =>
tprocs = tpc::tprocs;
}
}
}
xproc(tpc: Xc, idlec: chan of Xc)
{
for(;;){
(s, t, rc) := <-tpc;
if (t == nil)
exit;
serve(s, t, rc);
if (xabort)
exit;
idlec <-= tpc;
}
}
serve(dir: string, t : ref Tmsg, outc: chan of ref Rmsg)
{
if (debuglatency > 0)
sleep(debuglatency);
if (dir == "/")
dir = ""; # so that dir + path makes sense.
pick m := t {
Attach =>
outc <-= ref Rmsg.Error(m.tag, "already attached");
Flush =>
outc <-= ref Rmsg.Flush(m.tag);
Remove =>
if (m.path == nil || m.path == "" || m.path[0] != '/'){
outc <-= ref Rmsg.Error(m.tag, "bad Op file name");
return;
}
path := dir + m.path;
if (remove(path) < 0){
outc <-= ref Rmsg.Error(m.tag, sprint("%r"));
return;
}
outc <-= ref Rmsg.Remove(m.tag);
Put =>
outc <-= serveput(dir, m, fdsc, fdc, fdec);
Get =>
serveget(dir, m, fdsc, fdc, fdec, outc);
}
}
outproc(fd: ref FD, outc: chan of ref Rmsg)
{
for(;;){
r := <- outc;
if (r == nil)
break;
if (debug)
fprint(stderr, "<= %s\n", r.text());
b := r.pack();
nw := write(fd, b, len b);
if (nw != len b){
if (debug)
fprint(stderr, "outproc: write error: %r\n");
kill(xfspid, "kill");
raise "fail: write error";
}
}
}
getmsg(fd: ref FD) : (ref Tmsg, string)
{
m := Tmsg.read(fd, 0);
if (m == nil)
return (nil, nil);
pick mm := m {
Readerror =>
fprint(stderr, "oxport: read error: %s\n", mm.error);
return (m, "read: " + mm.error);
}
if (debug)
fprint(stderr, "=> %s\n", m.text());
return (m, nil);
}
xfs(dir: string, fd : ref FD, outc: chan of ref Rmsg)
{
xfspid = pctl(0, nil);
attached := 0;
(am, ae) := getmsg(fd);
if (am == nil)
return;
if (ae != nil){
outc <-= nil;
raise "fail:"+ ae;
}
pick mm := am {
Attach =>
uname = mm.uname;
pname = mm.path;
if (mm.uname == nil){
outc <-= ref Rmsg.Error(am.tag, "no uname");
raise "fail: attach";
} else if (mm.path != "/"){
outc <-= ref Rmsg.Error(am.tag, "permission denied");
raise "fail: attach";
} else {
uname = mm.uname;
pname = mm.path;
outc <-= ref Rmsg.Attach(am.tag);
attached = 1;
}
* =>
outc <-= ref Rmsg.Error(am.tag, "not attached");
raise "fail: attach";
}
fdsc = chan of (ref FD, chan of int);
fdc = chan of (int, chan of ref FD);
fdec = chan of int;
spawn fdproc();
xc = chan of (string, ref Tmsg, chan of ref Rmsg);
spawn xctlproc();
while(attached){
(m, e) := getmsg(fd);
if (m == nil)
break;
if (e != nil){
outc <-= nil;
fdsc <-= (nil, nil);
raise "fail:"+ e;
}
xc <-= (dir, m, outc);
}
if (debug)
fprint(stderr, "oxport: eof\n");
outc <-= nil;
fdsc <-= (nil, nil);
xc <-= (nil, nil, nil);
}
export(fd: ref FD)
{
s := getenv("sysname");
if (s == nil)
s = "terminal";
data := array of byte s;
if (fprint(fd, "%08d\n", len data) < 0)
error("export failed: %r");
if (write(fd, data, len data) != len data)
error("export failed: %r");
if (debug){
fprint(stderr, "%08d\n", len data);
write(stderr, data, len data);
}
}
init(nil: ref Draw->Context, args: list of string)
{
sys = load Sys Sys->PATH;
err = load Error Error->PATH;
err->init();
names = checkload(load Names Names->PATH, Names->PATH);
op = checkload(load Op Op->PATH, Op->PATH);
util = checkload(load Netutil Netutil->PATH, Netutil->PATH);
env = checkload(load Env Env->PATH, Env->PATH);
arg = checkload(load Arg Arg->PATH, Arg->PATH);
arg->init(args);
arg->setusage("oxport [-Ad] [-L ms] [-x addr] dir");
calladdr: string;
doauth := 1;
while((opt := arg->opt()) != 0) {
case opt{
'A' =>
doauth = 0;
'L' =>
debuglatency = int arg->earg();
'd' =>
debug = 1;
'x' =>
calladdr = arg->earg();
* =>
usage();
}
}
args = arg->argv();
if (len args != 1)
usage();
dir := cleanname(hd args);
srvfd := fildes(0);
if (calladdr != nil){
# Only under "-x" do we call pctl(FORKNS,nil) to avoid deadlocks.
# To export the PC ns we do NOT want the ns to be forked.
# because we want to see mounts made after exporting the name space.
# However, terminals exporting devices should export a frozen copy
# of the ns that cannot deadlock with the files imported from the pc.
pctl(FORKNS, nil);
calladdr = netmkaddr(calladdr, "tcp", "16699");
(rc, c) := dial(calladdr, nil);
if (rc < 0)
error(sprint("%s: %r\n", calladdr));
if (doauth){
(fd, e) := authfd(c.dfd, Client, nil, nil, calladdr);
if (fd == nil)
error("dial: " + e);
srvfd = fd;
} else
srvfd = c.dfd;
c.dfd = c.cfd = nil;
export(srvfd);
}
op->init();
outc := chan of ref Rmsg;
spawn outproc(srvfd, outc);
xfs(dir, srvfd, outc);
}
|