diff options
| author | Aryadev Chavali <aryadev@aryadevchavali.com> | 2025-10-19 22:22:23 +0100 | 
|---|---|---|
| committer | Aryadev Chavali <aryadev@aryadevchavali.com> | 2025-10-19 22:25:10 +0100 | 
| commit | cbfcf24ca27df81f51c0f17f6de0730d03e74655 (patch) | |
| tree | 855ad69b679bb7685296af4c88fa3e7988e7e45d | |
| parent | 030a289497db1fcbd22eafb99d7d8827bca94563 (diff) | |
| download | alisp-cbfcf24ca27df81f51c0f17f6de0730d03e74655.tar.gz alisp-cbfcf24ca27df81f51c0f17f6de0730d03e74655.tar.bz2 alisp-cbfcf24ca27df81f51c0f17f6de0730d03e74655.zip  | |
stream: Introduce PIPE type
Main reason to have this at all is to make char-by-char reading
feasible.  This occurs at `stream_chunk`, and previously if we passed
in STDIN for `stream_init_file`, STDIN will only terminate once
STREAM_DEFAULT_CHUNK number of characters have been fed into the pipe.
This isn't desirable for STDIN (we really want to read char-by-char
for expressions), nor would it necessarily be desirable in network
applications.  So any stream marked STREAM_TYPE_PIPE will only chunk
character-by-character rather than genuine chunks.
| -rw-r--r-- | alisp.h | 2 | ||||
| -rw-r--r-- | impl/stream.c | 57 | 
2 files changed, 49 insertions, 10 deletions
@@ -100,6 +100,7 @@ void sym_table_cleanup(sym_table_t *);  typedef enum  {    STREAM_TYPE_STRING, +  STREAM_TYPE_PIPE,    STREAM_TYPE_FILE,  } stream_type_t; @@ -133,6 +134,7 @@ typedef struct  #define STREAM_DEFAULT_CHUNK 64  stream_err_t stream_init_string(stream_t *, char *, sv_t); +stream_err_t stream_init_pipe(stream_t *, char *, FILE *);  stream_err_t stream_init_file(stream_t *, char *, FILE *);  void stream_stop(stream_t *); diff --git a/impl/stream.c b/impl/stream.c index 878ca6c..7b5e4e7 100644 --- a/impl/stream.c +++ b/impl/stream.c @@ -30,6 +30,22 @@ stream_err_t stream_init_string(stream_t *stream, char *name, sv_t contents)    return STREAM_ERR_OK;  } +stream_err_t stream_init_pipe(stream_t *stream, char *name, FILE *pipe) +{ +  if (!stream || !pipe) +    return STREAM_ERR_INVALID_PTR; +  name = name ? name : "<stream>"; +  memset(stream, 0, sizeof(*stream)); + +  stream->type      = STREAM_TYPE_PIPE; +  stream->name      = name; +  stream->pipe.file = pipe; + +  vec_init(&stream->pipe.cache, STREAM_DEFAULT_CHUNK); + +  return STREAM_ERR_OK; +} +  stream_err_t stream_init_file(stream_t *stream, char *name, FILE *pipe)  {    if (!stream || !pipe) @@ -42,8 +58,6 @@ stream_err_t stream_init_file(stream_t *stream, char *name, FILE *pipe)    stream->pipe.file = pipe;    vec_init(&stream->pipe.cache, STREAM_DEFAULT_CHUNK); -  // try to read an initial chunk -  // stream_chunk(stream);    return STREAM_ERR_OK;  } @@ -57,6 +71,7 @@ void stream_stop(stream_t *stream)    case STREAM_TYPE_STRING:      // Nothing to do, all dealt with outside of stream      break; +  case STREAM_TYPE_PIPE:    case STREAM_TYPE_FILE:      // Must cleanup vector      vec_free(&stream->pipe.cache); @@ -72,6 +87,7 @@ u64 stream_size(stream_t *stream)    {    case STREAM_TYPE_STRING:      return stream->string.size; +  case STREAM_TYPE_PIPE:    case STREAM_TYPE_FILE:      return stream->pipe.cache.size;    default: @@ -87,6 +103,7 @@ bool stream_eos(stream_t *stream)    {    case STREAM_TYPE_STRING:      return stream->position >= stream->string.size; +  case STREAM_TYPE_PIPE:    case STREAM_TYPE_FILE:      return feof(stream->pipe.file);    default: @@ -102,6 +119,7 @@ bool stream_eoc(stream_t *stream)    {    case STREAM_TYPE_STRING:      return stream->position >= stream->string.size; +  case STREAM_TYPE_PIPE:    case STREAM_TYPE_FILE:      return feof(stream->pipe.file) &&             stream->position >= stream->pipe.cache.size; @@ -114,20 +132,32 @@ bool stream_eoc(stream_t *stream)  bool stream_chunk(stream_t *stream)  {    assert(stream); +  u64 to_read = STREAM_DEFAULT_CHUNK;    switch (stream->type)    {    case STREAM_TYPE_STRING:      // vacuously true      return true; +  case STREAM_TYPE_PIPE: +    to_read = 1; +    // fallthrough    case STREAM_TYPE_FILE:    {      if (feof(stream->pipe.file)) +      // We can't read anymore.  End of the line        return false; -    vec_ensure_free(&stream->pipe.cache, STREAM_DEFAULT_CHUNK); +    vec_ensure_free(&stream->pipe.cache, to_read);      int read = fread(vec_data(&stream->pipe.cache) + stream->pipe.cache.size, 1, -                     STREAM_DEFAULT_CHUNK, stream->pipe.file); -    stream->pipe.cache.size += read; -    return true; +                     to_read, stream->pipe.file); + +    // If we read something it's a good thing +    if (read > 0) +    { +      stream->pipe.cache.size += read; +      return true; +    } +    else +      return false;    }    default:      FAIL("Unreachable"); @@ -154,11 +184,15 @@ char stream_peek(stream_t *stream)    {    case STREAM_TYPE_STRING:      return stream->string.data[stream->position]; +  case STREAM_TYPE_PIPE:    case STREAM_TYPE_FILE:    {      // Cached already?  We are done.      if (stream->position < stream->pipe.cache.size) -      return ((char *)vec_data(&stream->pipe.cache))[stream->position]; +    { +      const char *const str = vec_data(&stream->pipe.cache); +      return str[stream->position]; +    }      // Try to read chunks in till we've reached it or we're at the end of the      // file. @@ -204,6 +238,7 @@ bool stream_seek_forward(stream_t *stream, u64 offset)      stream->position += offset;      return true;    } +  case STREAM_TYPE_PIPE:    case STREAM_TYPE_FILE:    {      // Similar principle as stream_peek really... @@ -223,7 +258,7 @@ bool stream_seek_forward(stream_t *stream, u64 offset)        continue;      // Same principle as the stream_eoc(stream) check. -    if (stream->position + offset >= stream->pipe.cache.size) +    if (stream->position + offset > stream->pipe.cache.size)        return false;      stream->position += offset;      return true; @@ -248,7 +283,7 @@ sv_t stream_substr(stream_t *stream, u64 size)    if (stream_eoc(stream))      return SV(NULL, 0); -  // TODO: this is kinda disgusting, any better way of doing this +  // See if I can go forward enough to make this substring    u64 current_position = stream->position;    bool successful      = stream_seek_forward(stream, size);    // Reset the position in either situation @@ -263,6 +298,7 @@ sv_t stream_substr(stream_t *stream, u64 size)    case STREAM_TYPE_STRING:      ptr = stream->string.data;      break; +  case STREAM_TYPE_PIPE:    case STREAM_TYPE_FILE:      ptr = vec_data(&stream->pipe.cache);      break; @@ -282,11 +318,12 @@ sv_t stream_substr_abs(stream_t *stream, u64 index, u64 size)      if (index + size <= stream_size(stream))        return SV(stream->string.data + index, size);      return SV(NULL, 0); +  case STREAM_TYPE_PIPE:    case STREAM_TYPE_FILE:    {      if (index + size <= stream_size(stream))        return SV(vec_data(&stream->pipe.cache) + index, size); -    // stream_size(stream) <= index + size => try reading chunks +    // (index + size > stream_size(stream)) => try reading chunks      for (bool read_chunk = stream_chunk(stream);           read_chunk && index + size >= stream->pipe.cache.size;           read_chunk = stream_chunk(stream))  | 
