# Streaming API
The RIPE Atlas streaming API allows you to tap into the real-time data flow of all the collected public results. Every time our system receives a measurement result, a new measurement is scheduled, or a probe connects or disconnects, a message is delivered to the clients who are subscribed to that event.
The messages can be streamed from the /stream/
endpoint using either a simple GET
request or by opening a WebSocket. In addition, there is legacy Socket.IO support so that old clients and visualizations are not broken.
# Quick start
You can start streaming right away by opening a WebSocket and sending a subscribe message.
In both of these examples, one for JavaScript and one for Python, you will start to see messages containing RIPE Atlas results almost immediately.
const socket = new WebSocket("wss://atlas-stream.ripe.net/stream/");
const params = { streamType: "result", msm: 1001 }
socket.onmessage = function (event) {
console.log(event.data);
};
socket.onopen = function (event) {
this.send(JSON.stringify(["atlas_subscribe", params]));
};
import json
import websocket
ws = websocket.WebSocket()
ws.connect("wss://atlas-stream.ripe.net/stream/")
params = {"streamType": "result", "msm": 1001}
ws.send(json.dumps(["atlas_subscribe", params]))
for data in ws:
print(data)
# Format
Messages to and from the streaming API follow a common JSON-based format. Each message is a JSON array of the form:
[type,payload]
where type is a string and payload is some valid JSON value.
For example, to subscribe to a particular measurement you would send a message like:
["atlas_subscribe",{"streamType":"result","msm":1001}]
and receive a confirmation message like:
["atlas_subscribed",{"streamType":"result","msm":1001}]
In general, the message type names and RIPE Atlas result payloads use snake_case
keys with underscores, and subscription parameters use camelCase
.
# WebSocket
You can listen to multiple subscriptions by opening a WebSocket pointing to the API endpoint. You can also dynamically unsubscribe and subscribe to subscriptions throughout the lifetime of the socket by sending an atlas_subscribe
or atlas_unsubscribe
message.
Messages to and from the sever follow the format given above, with one message sent per WebSocket frame.
Example using JavaScript:
const socket = new WebSocket("wss://atlas-stream.ripe.net/stream/");
const params = { streamType: "result", msm: 1001 }
socket.onmessage = function (event) {
const [type, payload] = JSON.parse(event.data);
console.log(type, payload);
};
socket.onopen = function (event) {
this.send(JSON.stringify(["atlas_subscribe", params]));
};
// Unsubscribe in 5 seconds
setTimeout(() => socket.send(JSON.stringify(["atlas_unsubscribe", params])), 5000);
/* output:
atlas_subscribed, Object { streamType: "result", msm: 1001 }
atlas_result, Object { msm: 1001, ... }
...
atlas_unsubscribed, Object { streamType: "result", msm: 1001 }
*/
# HTTP GET
You can listen to a single subscription by making a GET request to the endpoint and specifying subscription parameters in the query string.
If the subscription parameters are legal then the request will stay open for as long as data is flowing (see note below), outputting messages separated by newline characters. If the parameters are illegal or there is a fatal error for the subscription then an error line will be sent and the connection will be closed.
Example using curl:
$ curl -s "https://atlas-stream.ripe.net/stream/?streamType=result&msm=1001"
["atlas_subscribed",{"streamType":"result","msm":1001}]
["atlas_result",{"msm_id": 1001, ...}]
...
Note on timeouts: the HTTP connection will automatically close after some time of inactivity (currently 60 seconds). If you want a long-lived connection with subscriptions that don't generate a continuous flow of messages, consider using the WebSocket interface instead.
# Socket.IO
A Socket.IO interface is maintained for backwards-compatibility with legacy use cases, conforming to revision 4 (opens new window) of the Socket.IO protocol.
This interface is not recommended for new use cases. It will be maintained for as long as the Socket.IO project maintains the revision 4-compatibile 2.x.x branch (opens new window), after which it will be decommisioned.
From a client point of view, the Socket.IO interface works very similarly to the WebSocket interface as detailed above, with the exception that atlas_error
messages contain a single string instead of an object, as in the previous version.
The following features have been dropped due to complexity and zero or very low usage:
- historic replay
- arbitrary filtering based on field values (equalsTo, lessThan, greaterThan)
# Stream backpressure
Whatever method you use to connect to the stream, there is a limit to how much data will be buffered on the server if your script or connection is too slow to receive it all in real time.
If the maximum buffer size is exceeded then some messages will be dropped, and a single warning message will be sent to you:
["atlas_error",{"code":"warning","detail":"Some results have been dropped because either your client or network connection is too slow for the current subscription(s)."}]
The connection will remain open and the server will continue to send you as many messages as you can handle.
# Subscriptions
You can subscribe to different streams using the atlas_subscribe
message type. In each case you must specify a set of parameters, including a streamType
which selects the data type of the subscription.
When streamType
is set to "result"
, the client will receive measurement results.
When streamType
is set to "probestatus"
, the client will receive probe connection and disconnection events.
When streamType
is set to "metadata"
, the client will receive measurement metadata every time a measurement is created or edited.
# Parameters
The following parameters can be included in an atlas_subscribe
message to specify what you want to receive on the stream. Which parameters you can use depends partially on the streamType
.
Name | Description | streamType |
---|---|---|
streamType | result , metadata , probestatus | all |
prb | A specific probe ID. If you don't set this parameter, you will receive results from all the probes. | result , probestatus |
acceptedFields | A list of accepted field names; the messages will be pruned server side. If you don't set this parameter you will receive all the fields. | all, unless subscribing to result without msm or prb |
enrichProbes | Enrich results with the full source probe API info. | result , probestatus |
msm | A specific measurement ID. | result , metadata |
type | Stream only ping , traceroute , dns , ssl , http or ntp | result , metadata |
sourceAddress | IP address or CIDR prefix of probe performing a measurement. | result |
destinationAddress | IP address or CIDR prefix of the measurement target. | result , metadata |
sendBacklog | Fetch the last few minutes of results before streaming. Mostly intended to cover the gap between the last available data in the REST API and the live results, and to recover results that might have been missed during brief disconnections. | result if msm is also specified; not compatible with sourceAddress , destinationAddress or type |
# Events
The following message types may be sent by the server.
Type | Description |
---|---|
atlas_error | Error objects containing code , which will not vary for the same type of error, and detail which is a human-readable string with more information but should not be parsed by machines |
atlas_result | Measurement results for a result subscription, see the measurement result format docs for details |
atlas_metadata | Measurement specifications for a metadata subscription, see the REST API reference for details |
atlas_probestatus | Connection and disconnection events for a probestatus subscription |
atlas_subscribed | Sent when a subscription to a stream is successful, including the specified params |
atlas_unsubscribed | Sent when the client has been unsubscribed from a stream, including the original params |
# Available Visualisations
- Visualising which DNS root instance a probe ends up on: DNS root instances (opens new window)
- Visualising probe connections and disconnections: Probe connections/disconnections (opens new window)
- Real-time latency graphs (against AS3333): Real-time latency graphs (opens new window)