stream: Introduce PIPE type

Main reason to have this at all is to make char-by-char reading
feasible.  This occurs at `stream_chunk`, and previously if we passed
in STDIN for `stream_init_file`, STDIN will only terminate once
STREAM_DEFAULT_CHUNK number of characters have been fed into the pipe.

This isn't desirable for STDIN (we really want to read char-by-char
for expressions), nor would it necessarily be desirable in network
applications.  So any stream marked STREAM_TYPE_PIPE will only chunk
character-by-character rather than genuine chunks.
This commit is contained in:
2025-10-19 22:22:23 +01:00
parent 030a289497
commit cbfcf24ca2
2 changed files with 49 additions and 10 deletions

View File

@@ -100,6 +100,7 @@ void sym_table_cleanup(sym_table_t *);
typedef enum typedef enum
{ {
STREAM_TYPE_STRING, STREAM_TYPE_STRING,
STREAM_TYPE_PIPE,
STREAM_TYPE_FILE, STREAM_TYPE_FILE,
} stream_type_t; } stream_type_t;
@@ -133,6 +134,7 @@ typedef struct
#define STREAM_DEFAULT_CHUNK 64 #define STREAM_DEFAULT_CHUNK 64
stream_err_t stream_init_string(stream_t *, char *, sv_t); stream_err_t stream_init_string(stream_t *, char *, sv_t);
stream_err_t stream_init_pipe(stream_t *, char *, FILE *);
stream_err_t stream_init_file(stream_t *, char *, FILE *); stream_err_t stream_init_file(stream_t *, char *, FILE *);
void stream_stop(stream_t *); void stream_stop(stream_t *);

View File

@@ -30,6 +30,22 @@ stream_err_t stream_init_string(stream_t *stream, char *name, sv_t contents)
return STREAM_ERR_OK; return STREAM_ERR_OK;
} }
stream_err_t stream_init_pipe(stream_t *stream, char *name, FILE *pipe)
{
if (!stream || !pipe)
return STREAM_ERR_INVALID_PTR;
name = name ? name : "<stream>";
memset(stream, 0, sizeof(*stream));
stream->type = STREAM_TYPE_PIPE;
stream->name = name;
stream->pipe.file = pipe;
vec_init(&stream->pipe.cache, STREAM_DEFAULT_CHUNK);
return STREAM_ERR_OK;
}
stream_err_t stream_init_file(stream_t *stream, char *name, FILE *pipe) stream_err_t stream_init_file(stream_t *stream, char *name, FILE *pipe)
{ {
if (!stream || !pipe) if (!stream || !pipe)
@@ -42,8 +58,6 @@ stream_err_t stream_init_file(stream_t *stream, char *name, FILE *pipe)
stream->pipe.file = pipe; stream->pipe.file = pipe;
vec_init(&stream->pipe.cache, STREAM_DEFAULT_CHUNK); vec_init(&stream->pipe.cache, STREAM_DEFAULT_CHUNK);
// try to read an initial chunk
// stream_chunk(stream);
return STREAM_ERR_OK; return STREAM_ERR_OK;
} }
@@ -57,6 +71,7 @@ void stream_stop(stream_t *stream)
case STREAM_TYPE_STRING: case STREAM_TYPE_STRING:
// Nothing to do, all dealt with outside of stream // Nothing to do, all dealt with outside of stream
break; break;
case STREAM_TYPE_PIPE:
case STREAM_TYPE_FILE: case STREAM_TYPE_FILE:
// Must cleanup vector // Must cleanup vector
vec_free(&stream->pipe.cache); vec_free(&stream->pipe.cache);
@@ -72,6 +87,7 @@ u64 stream_size(stream_t *stream)
{ {
case STREAM_TYPE_STRING: case STREAM_TYPE_STRING:
return stream->string.size; return stream->string.size;
case STREAM_TYPE_PIPE:
case STREAM_TYPE_FILE: case STREAM_TYPE_FILE:
return stream->pipe.cache.size; return stream->pipe.cache.size;
default: default:
@@ -87,6 +103,7 @@ bool stream_eos(stream_t *stream)
{ {
case STREAM_TYPE_STRING: case STREAM_TYPE_STRING:
return stream->position >= stream->string.size; return stream->position >= stream->string.size;
case STREAM_TYPE_PIPE:
case STREAM_TYPE_FILE: case STREAM_TYPE_FILE:
return feof(stream->pipe.file); return feof(stream->pipe.file);
default: default:
@@ -102,6 +119,7 @@ bool stream_eoc(stream_t *stream)
{ {
case STREAM_TYPE_STRING: case STREAM_TYPE_STRING:
return stream->position >= stream->string.size; return stream->position >= stream->string.size;
case STREAM_TYPE_PIPE:
case STREAM_TYPE_FILE: case STREAM_TYPE_FILE:
return feof(stream->pipe.file) && return feof(stream->pipe.file) &&
stream->position >= stream->pipe.cache.size; stream->position >= stream->pipe.cache.size;
@@ -114,20 +132,32 @@ bool stream_eoc(stream_t *stream)
bool stream_chunk(stream_t *stream) bool stream_chunk(stream_t *stream)
{ {
assert(stream); assert(stream);
u64 to_read = STREAM_DEFAULT_CHUNK;
switch (stream->type) switch (stream->type)
{ {
case STREAM_TYPE_STRING: case STREAM_TYPE_STRING:
// vacuously true // vacuously true
return true; return true;
case STREAM_TYPE_PIPE:
to_read = 1;
// fallthrough
case STREAM_TYPE_FILE: case STREAM_TYPE_FILE:
{ {
if (feof(stream->pipe.file)) if (feof(stream->pipe.file))
// We can't read anymore. End of the line
return false; return false;
vec_ensure_free(&stream->pipe.cache, STREAM_DEFAULT_CHUNK); vec_ensure_free(&stream->pipe.cache, to_read);
int read = fread(vec_data(&stream->pipe.cache) + stream->pipe.cache.size, 1, int read = fread(vec_data(&stream->pipe.cache) + stream->pipe.cache.size, 1,
STREAM_DEFAULT_CHUNK, stream->pipe.file); to_read, stream->pipe.file);
stream->pipe.cache.size += read;
return true; // If we read something it's a good thing
if (read > 0)
{
stream->pipe.cache.size += read;
return true;
}
else
return false;
} }
default: default:
FAIL("Unreachable"); FAIL("Unreachable");
@@ -154,11 +184,15 @@ char stream_peek(stream_t *stream)
{ {
case STREAM_TYPE_STRING: case STREAM_TYPE_STRING:
return stream->string.data[stream->position]; return stream->string.data[stream->position];
case STREAM_TYPE_PIPE:
case STREAM_TYPE_FILE: case STREAM_TYPE_FILE:
{ {
// Cached already? We are done. // Cached already? We are done.
if (stream->position < stream->pipe.cache.size) if (stream->position < stream->pipe.cache.size)
return ((char *)vec_data(&stream->pipe.cache))[stream->position]; {
const char *const str = vec_data(&stream->pipe.cache);
return str[stream->position];
}
// Try to read chunks in till we've reached it or we're at the end of the // Try to read chunks in till we've reached it or we're at the end of the
// file. // file.
@@ -204,6 +238,7 @@ bool stream_seek_forward(stream_t *stream, u64 offset)
stream->position += offset; stream->position += offset;
return true; return true;
} }
case STREAM_TYPE_PIPE:
case STREAM_TYPE_FILE: case STREAM_TYPE_FILE:
{ {
// Similar principle as stream_peek really... // Similar principle as stream_peek really...
@@ -223,7 +258,7 @@ bool stream_seek_forward(stream_t *stream, u64 offset)
continue; continue;
// Same principle as the stream_eoc(stream) check. // Same principle as the stream_eoc(stream) check.
if (stream->position + offset >= stream->pipe.cache.size) if (stream->position + offset > stream->pipe.cache.size)
return false; return false;
stream->position += offset; stream->position += offset;
return true; return true;
@@ -248,7 +283,7 @@ sv_t stream_substr(stream_t *stream, u64 size)
if (stream_eoc(stream)) if (stream_eoc(stream))
return SV(NULL, 0); return SV(NULL, 0);
// TODO: this is kinda disgusting, any better way of doing this // See if I can go forward enough to make this substring
u64 current_position = stream->position; u64 current_position = stream->position;
bool successful = stream_seek_forward(stream, size); bool successful = stream_seek_forward(stream, size);
// Reset the position in either situation // Reset the position in either situation
@@ -263,6 +298,7 @@ sv_t stream_substr(stream_t *stream, u64 size)
case STREAM_TYPE_STRING: case STREAM_TYPE_STRING:
ptr = stream->string.data; ptr = stream->string.data;
break; break;
case STREAM_TYPE_PIPE:
case STREAM_TYPE_FILE: case STREAM_TYPE_FILE:
ptr = vec_data(&stream->pipe.cache); ptr = vec_data(&stream->pipe.cache);
break; break;
@@ -282,11 +318,12 @@ sv_t stream_substr_abs(stream_t *stream, u64 index, u64 size)
if (index + size <= stream_size(stream)) if (index + size <= stream_size(stream))
return SV(stream->string.data + index, size); return SV(stream->string.data + index, size);
return SV(NULL, 0); return SV(NULL, 0);
case STREAM_TYPE_PIPE:
case STREAM_TYPE_FILE: case STREAM_TYPE_FILE:
{ {
if (index + size <= stream_size(stream)) if (index + size <= stream_size(stream))
return SV(vec_data(&stream->pipe.cache) + index, size); return SV(vec_data(&stream->pipe.cache) + index, size);
// stream_size(stream) <= index + size => try reading chunks // (index + size > stream_size(stream)) => try reading chunks
for (bool read_chunk = stream_chunk(stream); for (bool read_chunk = stream_chunk(stream);
read_chunk && index + size >= stream->pipe.cache.size; read_chunk && index + size >= stream->pipe.cache.size;
read_chunk = stream_chunk(stream)) read_chunk = stream_chunk(stream))