This is my first issue, so sorry if i missed something.
I have encountered an issue when reading data from a stream using a PULL Query (im new to KSQL sorry if the problem is actually the query)
I have a test stream with only 4 rows, when i try to read it with the code bellow it outputs the rows but then throws an JSONDecodeError.
def consume_stream(stream: str, n_messages: int = 10) -> Generator[dict, None, None]:
load_dotenv(find_dotenv())
url = os.environ["KSQL_HOST"]
client = KSQLAPI(url)
client.query("SET 'auto.offset.reset' = 'earliest';")
query = f"select * from {stream}"
return client.query(query, return_objects=True)
The traceback is bellow (i use rich hence the pretty output, i can show the raw if needed), the problem is that the row returned is empty

A simple check like this fixes the issue
row = row.replace(",\n", "").replace("]\n", "")
if not row:
return None
As a note if a use client.query(query) instead (without the return_objects=True) it still raises an error, but now is StopIteration, probably for the same reason.
This is my first issue, so sorry if i missed something.
I have encountered an issue when reading data from a stream using a PULL Query (im new to KSQL sorry if the problem is actually the query)
I have a test stream with only 4 rows, when i try to read it with the code bellow it outputs the rows but then throws an JSONDecodeError.
The traceback is bellow (i use rich hence the pretty output, i can show the raw if needed), the problem is that the row returned is empty

A simple check like this fixes the issue
As a note if a use
client.query(query)instead (without the return_objects=True) it still raises an error, but now is StopIteration, probably for the same reason.