Patience is overrated – embrace the thrill of parsing streaming data as it arrives!

Author

Samuel Calderon

Published

December 14, 2023

This post has two objectives: to present some projects that use “ChatGPT” like technology to help R developers, and to present a package that can help all of them.

One year ago, with the launch of ChatGPT, the world found itself astonished by the profound impact this conversational AI model started to make across various domains. The tech community was particularly surprised by the model’s versatility, successfully powering chatbots, virtual assistants, and aiding developers in diverse applications. How did the R community take advantage of this?

LLMs for the R developer’s workflow

Once OpenAI allowed developers to use its API, the R community started to see different initiatives for incorporating this service, and similar ones, into their workflows. OpenAI lists the {rgpt3} package in its community library page, but we can also find the {openai} package which supports more recent endpoints. There is also the {chatgpt} package, which provides a chat interface that runs in the R console itself.

For VSCode users, the Genie extension provides lots of features integrating the OpenAI service using the contents of the files in your projects, and providing a UI where you can interact with the AI models exactly as you would do using ChatGPT. For Docker enthusiasts, you can use chatbot-ui to easily host your own chat UI using OpenAI’s API under the hood.

All the services mentioned above require you to have an OpenAI account and a valid API key, that allows the AI giant to charge you for its usage.

For Rstudio users, we have two packages providing similar functionality for using a chat interface without ever needing to leave the RStudio IDE itself. The {chattr} package provides a chat interface that can be accessed in the “Viewer” pane of RStudio. You can also use the chattr() function in source to directly give instructions to the AI assistant. The package supports using the OpenAI API or a self hosted (free) LLamaGPT executable. Even though it is not yet in CRAN, having the Posit PBC as the copyright holder of the package, you can expect it to keep receiving updates.

The second package is called {gptstudio} and requires that I make a disclaimer: I’m one of the co-authors. This package also provides a chat interface as a Rstudio addin, but in this case it runs as a background job. This means that you don’t have to close the chat when you need to use the R console. Sadly, currently this also means that you can’t directly use your documents content as context for the chat assistant, as background jobs don’t run in the same R session that the RStudio IDE uses.

While we work on overcoming this challenge, we provide the following features:

  • Every code chunk produced by the AI assistant can be saved to the clipboard with a single click.
  • You can save your conversations to be continued later.
  • You can change the chat settings per session, or save a default configuration. All without leaving the chat UI.
  • You can add R help pages as chat context for every package you have installed locally. This is very useful to receive assistance in the latest trends in the R ecosystem instead of having to wait for AI giants to update the cutoff date of their models. Use a “package::object” string anywhere in your prompt to accomplish this (e.g. “Help me with dplyr::join_by”).
  • The UI has support for internationalization. We currently support English, Spanish and German. We are open to receive more translations.
  • We support streaming messages, meaning you can start reading a response before it has fully arrived (just like in ChatGPT).
  • The chat UI inherits your RStudio IDE theme, to give a more “built-in” look. You can start the chat as an RStudio addin (even set up a custom keyboard shortcut).
  • You can choose your model. While the default option is to use the OpenAI’s “gpt-3.5-turbo” model, you can choose any of the current OpenAI models, such as “gpt-4” or “gpt-3.5-turbo-16k”.
  • You can choose your service. While OpenAI offers many good models, we also support using “Huggingface”, “Anthropic”, “Azure OpenAI” and “Palm”, each one of them provides many models.

We are also working on supporting self hosted services/models, such as {ollama}. For all these services, we use R’s functional OOP system, which I’m currently reworking to use the more explicit S7 system.

Each one of these features has meant great effort on our side, so we hope they help your workflow. We also hope you can let us know if you find any issue or bug.

Addressing a small, but common pain

Using APIs means that you have to use HTTP requests. Expecting to receive streaming data adds complexity to this, as you need to do something with the incoming data before it fully arrives. OpenAI uses Server-Sent Events communication (SSE) for streaming responses, enabling a unidirectional communication channel based on a single, long-lived HTTP connection that allows the server to send periodic updates to the connected clients.

Server-Sent Events come in chunks, here you have an example:

data: This is the first chunk, it has one line

data: This is the second chunk
extra: It has two lines

data: This is the third chunk, it has an id field. This is common.
id: 123

: Lines that start with a colon are comments, they don't hold data
data: This is the forth chunk, it has a comment

data: This is the fifth chunk. Normally you will receive a data field
custom: But the server can send custom field names.

You can imagine that in order to use the incoming data, we need to parse the text received. While this might seem trivial, until now each package that streams data from OpenAI has implemeted its own way of parsing the incoming chunks. There should be a standard way. In fact, Server-Sent Events date back to at least 2006, and there is a HTML specification that provides instructions for browsers and other clients on how to parse them.

After taking into account those instructions, I’m excited to announce the release of version 0.1.0 of the SSEparser R package, designed to provide robust functionality for parsing Server-Sent Events (SSE) and building upon them. This package is a valuable tool for data analysts and software engineers working with real-time streaming data.

Installation

You can easily install the SSEparser package from CRAN using the following command:

install.packages("SSEparser")

For those who prefer to live on the bleeding edge, the development version can be installed with the pak package:

pak::pak("calderonsamuel/SSEparser")

Example Usage

Let’s delve into a simple example to showcase the power of the SSEparser package. The parse_sse() function takes a string containing a server-sent event and converts it into an R list. Check out the example below:

library(SSEparser)

event <- "data: test\nevent: message\nid: 123\n\n"

parse_sse(event)
#> [[1]]
#> [[1]]$data
#> [1] "test"
#> 
#> [[1]]$event
#> [1] "message"
#> 
#> [[1]]$id
#> [1] "123"

The package also handles comments in the event stream gracefully, ensuring they are not parsed.

Use in HTTP Requests

SSEparser goes beyond simple event parsing; it seamlessly integrates with HTTP requests for real-time streaming data. The code snippet below demonstrates creating an HTTP request for the stream, it will have a MIME type “text/event-stream”. We ask for 3 events from the dummy API of postman.

library(httr2)

sse_request <- request("https://postman-echo.com/server-events/3") %>% 
    req_body_json(data = list(
        event = "message",
        request = "POST"
    ))

Now, we can use SSEparser inside the callback function of the stream. This example illustrates parsing multiple events from a streaming data source.

parser <- SSEparser$new()

response <- sse_request %>%
    req_perform_stream(callback = \(x) {
        event <- rawToChar(x)
        parser$parse_sse(event)
        TRUE
    })

str(parser$events)
#> List of 3
#>  $ :List of 3
#>   ..$ event: chr "message"
#>   ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#>   ..$ id   : chr "1"
#>  $ :List of 3
#>   ..$ event: chr "message"
#>   ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#>   ..$ id   : chr "2"
#>  $ :List of 3
#>   ..$ event: chr "ping"
#>   ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#>   ..$ id   : chr "3"

Look at the data fields, they look like JSON strings. We should be able to parse them as they come too.

Extending SSEparser

One of the strengths of SSEparser is its extensibility. Suppose you want to parse the content of every data field into an R list instead of a JSON string. In that case, you can easily create a custom parser by inheriting from the SSEparser class. Here’s an example:

CustomParser <- R6::R6Class(
    classname = "CustomParser",
    inherit = SSEparser,
    public = list(
        initialize = function() {
            super$initialize()
        },
        append_parsed_sse = function(parsed_event) {
            parsed_event$data <- jsonlite::fromJSON(parsed_event$data)
            self$events = c(self$events, list(parsed_event))
            invisible(self)
        }
    )
)

Now you can use your custom parser for streaming data with the same ease. We re-use the same request that we used before.

parser <- CustomParser$new()

response <- sse_request %>%
    httr2::req_perform_stream(callback = \(x) {
        event <- rawToChar(x)
        parser$parse_sse(event)
        TRUE
    })

str(parser$events)
#> List of 3
#>  $ :List of 3
#>   ..$ event: chr "info"
#>   ..$ data :List of 2
#>   .. ..$ event  : chr "message"
#>   .. ..$ request: chr "POST"
#>   ..$ id   : chr "1"
#>  $ :List of 3
#>   ..$ event: chr "notification"
#>   ..$ data :List of 2
#>   .. ..$ event  : chr "message"
#>   .. ..$ request: chr "POST"
#>   ..$ id   : chr "2"
#>  $ :List of 3
#>   ..$ event: chr "ping"
#>   ..$ data :List of 2
#>   .. ..$ event  : chr "message"
#>   .. ..$ request: chr "POST"
#>   ..$ id   : chr "3"

With SSEparser v0.1.0, you have a powerful tool at your disposal for handling and parsing Server-Sent Events efficiently. Feel free to explore its features and enhance your real-time streaming data workflows.

Extending for OpenAI chat completions

For OpenAI, the parser can’t just convert every data field to an R list, because the last chunk is not a valid JSON string. So we need to slightly modify how we parse the data fields. We can also add a buffer to save just the actual content of the chat response, as this comes deeply nested inside every chunk.

ChatParser <- R6::R6Class(
  classname = "ChatParser",
  inherit = SSEparser,
  public = list(
    chat_response = NULL, # this will be our buffer
    initialize = function() {
      self$chat_response <- ""
      super$initialize()
    },
    append_parsed_sse = function(parsed_event) {
      # ----- here you can do whatever you want with the event data -----
      if (parsed_event$data == "[DONE]") return()
      parsed_event$data <- jsonlite::fromJSON(parsed_event$data, simplifyDataFrame = FALSE)
      
      content <- parsed_event$data$choices[[1]]$delta$content
      self$chat_response <- paste0(self$chat_response, content)
      # ----- END ----
      
      self$events = c(self$events, list(parsed_event))
      invisible(self)
    }
  )
)

The next code chunk defines the request. We are just asking “What is 2 + 2”, and specifying that we cant a streaming response.

openai_chat_request <- request("https://api.openai.com/v1") %>% # base url
  req_url_path_append("chat/completions") %>% # endpoint
  req_auth_bearer_token(Sys.getenv("OPENAI_API_KEY")) %>%
  req_body_json(data = list(
    model = "gpt-3.5-turbo",
    messages = list(
      list(role = "user", content = "What is 2 + 2")
    ),
    stream = TRUE
  ))

Now we can request the stream, just as we did before. In this case, we can access the chat response.

parser <- ChatParser$new()

response <- openai_chat_request %>%
    httr2::req_perform_stream(callback = \(x) {
        event <- rawToChar(x)
        parser$parse_sse(event)
        TRUE
    })

parser$chat_response
#> [1] "2 + 2 equals 4."

And just like that, you have a fully functional parser for OpenAI streaming requests.

Final words, very off topic

2023 has been an amazing year, R-wise, for me. One of my resolutions at the start of the year was to go trough the process of sending my first package to CRAN. I had read R packages cover to cover, multiple times, in preparation for that moment. When I finally gathered the courage, I started polishing my in-development package to be ready from CRAN.

It was during that time that I stumbled upon the {gptstudio} project, whose version 0.1.0 was already on CRAN. I assisted the authors (James Wade and Michel Nivard) implementing the streaming functionality, running the app as a background job, and providing some CSS styling. For that effort, they decided to include me as a co-author when they submitted gptstudio v0.2.0 to CRAN.

I decided to write about this experience in a blog post, which marked my first time writing a post in English. I also chose to share it in the R weekly repository, and it was featured as a highlighted post on the web and the podcast! It was very amusing to hear Eric Nantz (someone I listen to every Thursday while I get ready for work) successfully pronouncing “Ministerio del Interior”, the place where I work here in Peru.

Around the same time, the LatinR conference call for papers was open, and I decided to send 2 projects. Both of them got approved and I found myself presenting R related work in Uruguay. Talking and hearing about R projects for three full days with many amazing people was a very refreshing experience. It was really an honor.

I feel very thankful for everything the R community has provided me. Open source does really have the power to impact our lives in many unexpected ways. Looking forward to more growth and learning in the coming year.