# Streaming API

The RIPE Atlas streaming API allows you to tap into the real-time data flow of all the collected public results. Every time our system receives a measurement result, a new measurement is scheduled, or a probe connects or disconnects, a message is delivered to the clients who are subscribed to that event.

The messages can be streamed from the /stream/ endpoint using either a simple GET request or by opening a WebSocket. In addition, there is legacy Socket.IO support so that old clients and visualizations are not broken.

# Quick start

You can start streaming right away by opening a WebSocket and sending a subscribe message.

In both of these examples, one for JavaScript and one for Python, you will start to see messages containing RIPE Atlas results almost immediately.

const socket = new WebSocket("wss://atlas-stream.ripe.net/stream/");
const params = { streamType: "result", msm: 1001 }
socket.onmessage = function (event) {
  console.log(event.data);
};
socket.onopen = function (event) {
  this.send(JSON.stringify(["atlas_subscribe", params]));
};
import json
import websocket

ws = websocket.WebSocket()
ws.connect("wss://atlas-stream.ripe.net/stream/")
params = {"streamType": "result", "msm": 1001}
ws.send(json.dumps(["atlas_subscribe", params]))
for data in ws:
	print(data)

# Format

Messages to and from the streaming API follow a common JSON-based format. Each message is a JSON array of the form:

[type,payload]

where type is a string and payload is some valid JSON value.

For example, to subscribe to a particular measurement you would send a message like:

["atlas_subscribe",{"streamType":"result","msm":1001}]

and receive a confirmation message like:

["atlas_subscribed",{"streamType":"result","msm":1001}]

In general, the message type names and RIPE Atlas result payloads use snake_case keys with underscores, and subscription parameters use camelCase.

# WebSocket

You can listen to multiple subscriptions by opening a WebSocket pointing to the API endpoint. You can also dynamically unsubscribe and subscribe to subscriptions throughout the lifetime of the socket by sending an atlas_subscribe or atlas_unsubscribe message.

Messages to and from the sever follow the format given above, with one message sent per WebSocket frame.

Example using JavaScript:

const socket = new WebSocket("wss://atlas-stream.ripe.net/stream/");
const params = { streamType: "result", msm: 1001 }
socket.onmessage = function (event) {
  const [type, payload] = JSON.parse(event.data);
  console.log(type, payload); 
};
socket.onopen = function (event) {
  this.send(JSON.stringify(["atlas_subscribe", params]));
};
// Unsubscribe in 5 seconds
setTimeout(() => socket.send(JSON.stringify(["atlas_unsubscribe", params])), 5000);

/* output:

atlas_subscribed, Object { streamType: "result", msm: 1001 }
atlas_result, Object { msm: 1001, ... }
...
atlas_unsubscribed, Object { streamType: "result", msm: 1001 }
*/

# HTTP GET

You can listen to a single subscription by making a GET request to the endpoint and specifying subscription parameters in the query string.

If the subscription parameters are legal then the request will stay open for as long as data is flowing (see note below), outputting messages separated by newline characters. If the parameters are illegal or there is a fatal error for the subscription then an error line will be sent and the connection will be closed.

Example using curl:

$ curl -s "https://atlas-stream.ripe.net/stream/?streamType=result&msm=1001"
["atlas_subscribed",{"streamType":"result","msm":1001}]
["atlas_result",{"msm_id": 1001, ...}]
...

Note on timeouts: the HTTP connection will automatically close after some time of inactivity (currently 60 seconds). If you want a long-lived connection with subscriptions that don't generate a continuous flow of messages, consider using the WebSocket interface instead.

# Socket.IO

A Socket.IO interface is maintained for backwards-compatibility with legacy use cases, conforming to revision 4 (opens new window) of the Socket.IO protocol.

This interface is not recommended for new use cases. It will be maintained for as long as the Socket.IO project maintains the revision 4-compatibile 2.x.x branch (opens new window), after which it will be decommisioned.

From a client point of view, the Socket.IO interface works very similarly to the WebSocket interface as detailed above, with the exception that atlas_error messages contain a single string instead of an object, as in the previous version.

The following features have been dropped due to complexity and zero or very low usage:

  • historic replay
  • arbitrary filtering based on field values (equalsTo, lessThan, greaterThan)

# Stream backpressure

Whatever method you use to connect to the stream, there is a limit to how much data will be buffered on the server if your script or connection is too slow to receive it all in real time.

If the maximum buffer size is exceeded then some messages will be dropped, and a single warning message will be sent to you:

["atlas_error",{"code":"warning","detail":"Some results have been dropped because either your client or network connection is too slow for the current subscription(s)."}]

The connection will remain open and the server will continue to send you as many messages as you can handle.

# Subscriptions

You can subscribe to different streams using the atlas_subscribe message type. In each case you must specify a set of parameters, including a streamType which selects the data type of the subscription.

When streamType is set to "result", the client will receive measurement results.

When streamType is set to "probestatus", the client will receive probe connection and disconnection events.

When streamType is set to "metadata", the client will receive measurement metadata every time a measurement is created or edited.

# Parameters

The following parameters can be included in an atlas_subscribe message to specify what you want to receive on the stream. Which parameters you can use depends partially on the streamType.

Name Description streamType
streamType result, metadata, probestatus all
prb A specific probe ID. If you don't set this parameter, you will receive results from all the probes. result, probestatus
acceptedFields A list of accepted field names; the messages will be pruned server side. If you don't set this parameter you will receive all the fields. all, unless subscribing to result without msm or prb
enrichProbes Enrich results with the full source probe API info. result, probestatus
msm A specific measurement ID. result, metadata
type Stream only ping, traceroute, dns, ssl, http or ntp result, metadata
sourceAddress IP address or CIDR prefix of probe performing a measurement. result
destinationAddress IP address or CIDR prefix of the measurement target. result, metadata
sendBacklog Fetch the last few minutes of results before streaming. Mostly intended to cover the gap between the last available data in the REST API and the live results, and to recover results that might have been missed during brief disconnections. result if msm is also specified; not compatible with sourceAddress, destinationAddress or type

# Events

The following message types may be sent by the server.

Type Description
atlas_error Error objects containing code, which will not vary for the same type of error, and detail which is a human-readable string with more information but should not be parsed by machines
atlas_result Measurement results for a result subscription, see the measurement result format docs for details
atlas_metadata Measurement specifications for a metadata subscription, see the REST API reference for details
atlas_probestatus Connection and disconnection events for a probestatus subscription
atlas_subscribed Sent when a subscription to a stream is successful, including the specified params
atlas_unsubscribed Sent when the client has been unsubscribed from a stream, including the original params

# Available Visualisations

Last Updated: Friday 14 June 2024