Setting up the kafka listener#
Attention: this notebook must be run locally, or on colab, because Binder does not allow for secure connection
To run locally, inside this repository do:
python -m venv alerts
source alerts/bin/activate
pip install gcn-kafka ligo.skymap xmltodict
To run on colab, download the notebook and upload it on colab, then add an initial line with
! pip install gcn-kafka ligo.skymap xmltodict
Swift#
Instruments |
Energy Range |
Field of View |
Localization |
|---|---|---|---|
Burst Alert Telescope (BAT) |
15-350 keV |
2 ster |
\(\leq 1-3^{\prime}\) radius (statistical, 90%) |
X-ray Telescope (XRT) |
0.3-10 keV |
\(23.6^{\prime} \times 23.6^{\prime}\) |
\(1-3^{\prime \prime}\) |
Ultraviolet/Optical Telescope UVOT |
170-650 nm |
\(17^{\prime} \times 17^{\prime}\) |
\(< 1^{\prime \prime}\) |
Type |
Typical Latency Since Burst |
Location Precision |
Description |
|---|---|---|---|
SWIFT_BAT_QL_POS |
13-30 seconds |
1-3’ |
QuickLook Position Notice (subset of BAT_POS info) |
SWIFT_BAT_GRB_POS_ACK |
13-30 seconds |
1-3’ |
First Position Notice, the BAT Position |
SWIFT_XRT_POSITION |
\(30-80\) |
\(<7^{\prime \prime}\) |
XRT afterglow |
SWIFT_UVOT_POS |
\(1-3\) |
\(<2^{\prime \prime}\) |
UVOT afterglow |
Check https://gcn.gsfc.nasa.gov/swift.html for further details for the content of each notice
Fermi#
Instruments |
Energy Range |
Field of View |
Localization |
|---|---|---|---|
Large Area Telescope (LAT) |
\(20 \mathrm{MeV}- >300 \mathrm{GeV}\) |
2.5 ster |
\(\leq 1^{\circ}\) radius (statistical, 90%) |
Gamma-ray Burst Monitor (GBM) |
\(8 \mathrm{keV}-30\) MeV |
8.8 ster |
\(\gtrsim 1-10^{\circ}\) radius (statistical + systematic) |
Type |
Contents |
Latency |
|---|---|---|
FERMI_GBM_ALERT |
Trigger info |
~5 seconds |
FERMI_GBM_FLT_POS |
Flight localization, classification |
~10 seconds |
FERMI_GBM_GND_POS |
Updated ground localization using finer lookup tables |
20-300 seconds |
FERMI_GBM_FIN_POS |
Final trigger localization |
15 minutes |
FERMI_LAT_POS_INI |
Onboard LAT detection, initial position |
2-4 seconds |
FERMI_LAT_POS_UPD |
Updated onboard localization with more data |
2-32 seconds |
FERMI_LAT_GND |
Ground localization of onboard trigger |
8-12 hours |
Check https://gcn.gsfc.nasa.gov/fermi.html for further details for the content of each notice
Get familiar with notice content#
Let’s try our first kafka connection#
set
"auto.offset.reset": "earliest"
to receive all the messages you missed since the last time you had the listener active
set
"auto.offset.reset": "latest"
to receive ONLY NEW messages you missed since the last time you had the listener active
from credentials import load_credentials, get_credential, list_credentials
# Load credentials from credentials.txt
load_credentials()
# Check what's configured
print("Configured credentials:")
for key, status in list_credentials().items():
print(f" {key}: {status}")
# Access specific credentials when needed
try:
gcn_client_id = get_credential('GCN_CLIENT_ID')
gcn_client_secret = get_credential('GCN_CLIENT_SECRET')
print(f"\n✅ GCN Client ID: {gcn_client_id[:10]}..." )
except ValueError as e:
print(f"\n⚠️ {e}")
📖 Loading credentials from: /Users/samueleronchini/Desktop/acme_tutorials/alerts/credentials.txt
✅ Loaded 17 credentials
Configured credentials:
GCN_CLIENT_ID: ✅ Configured
GCN_CLIENT_SECRET: ✅ Configured
FINK_USERNAME: ⚠️ Empty
FINK_GROUP_ID: ⚠️ Empty
FINK_SERVERS: ✅ Configured
SLACK_WEBHOOK: ✅ Configured
SLACK_CHANNEL_ID: ✅ Configured
EMAIL_SENDER: ✅ Configured
EMAIL_PASSWORD: ✅ Configured
EMAIL_SMTP_SERVER: ✅ Configured
EMAIL_SMTP_PORT: ✅ Configured
EMAIL_RECIPIENT: ✅ Configured
TELEGRAM_BOT_TOKEN: ✅ Configured
GRACEDB_USERNAME: ⚠️ Empty
GRACEDB_PASSWORD: ⚠️ Empty
GCN_CLIENT_ID_PROD: ✅ Configured
GCN_CLIENT_SECRET_PROD: ✅ Configured
✅ GCN Client ID: 5h42prl0v1...
from gcn_kafka import Consumer
from confluent_kafka import TopicPartition
import os
import json
import datetime
import email
import xmltodict
CONFIG = {"group.id": "", "auto.offset.reset": "earliest"}
consumer = Consumer(config=CONFIG,
client_id=gcn_client_id,
client_secret=gcn_client_secret,
domain='gcn.nasa.gov')
def write_json(text_data, output_dir="./gcn_alerts"):
"""Parse GCN text format and extract relevant information, then save to JSON."""
# Convert bytes to string if necessary
if isinstance(text_data, bytes):
text_data = text_data.decode('utf-8')
# Parse using email module
msg = email.message_from_string(text_data)
result = dict(msg)
try:
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Create filename using trigger number or timestamp
if 'TRIGGER_NUM' in result:
filename = f"GRB_{result['TRIGGER_NUM']}.json"
else:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"GRB_{timestamp}.json"
filepath = os.path.join(output_dir, filename)
with open(filepath, 'w') as f:
json.dump(result, f, indent=2)
print(f"Saved to: {filepath}")
except Exception as e:
print("Error processing GCN text notice:", e)
return result
def define_time_range(t1, t2):
"""Define start and end timestamps in milliseconds since the unix epoch."""
timestamp1 = int((datetime.datetime.now() - datetime.timedelta(days=t1)).timestamp() * 1000)
timestamp2 = timestamp1 + t2*86400000
return timestamp1, timestamp2
timestamp1, timestamp2 = define_time_range(1, 1)
print(f"Fetching messages between {timestamp1} and {timestamp2}")
# Subscribe to topics and receive alerts
topics = 'gcn.classic.text.FERMI_GBM_ALERT'
# topics = 'gcn.classic.text.SWIFT_BAT_GRB_POS_ACK'
# topics = 'gcn.classic.text.SWIFT_XRT_POSITION'
# topics = 'gcn.classic.text.SWIFT_UVOT_POS'
start = consumer.offsets_for_times(
[TopicPartition(topics, 0, timestamp1)])
end = consumer.offsets_for_times(
[TopicPartition(topics, 0, timestamp2)])
consumer.assign(start)
# Calculate the number of messages to consume, ensuring it's within valid range
num_messages = end[0].offset - start[0].offset
for message in consumer.consume(abs(num_messages), timeout=1):
# while True:
# for message in consumer.consume(timeout=1):
if message.error():
print(message.error())
continue
# Print the topic and message ID
print(f'topic={message.topic()}, offset={message.offset()}')
value = message.value()
# xml_dict = xmltodict.parse(value)
write_json(value)
Fetching messages between 1763462992372 and 1763549392372
topic=gcn.classic.text.FERMI_GBM_ALERT, offset=3537
Saved to: ./gcn_alerts/GRB_785185339.json
topic=gcn.classic.text.FERMI_GBM_ALERT, offset=3538
Saved to: ./gcn_alerts/GRB_785190695.json
topic=gcn.classic.text.FERMI_GBM_ALERT, offset=3539
Saved to: ./gcn_alerts/GRB_785198410.json
SVOM#
Instruments |
Energy Range |
Field of View |
Localization |
|---|---|---|---|
ECLAIRs |
\(4-250 \mathrm{ keV}\) |
2 ster |
\(\sim 10^{\prime}\) radius (statistical + systematic) |
GRM |
\(15-5000 \mathrm{ keV}\) |
2.6 ster |
\(\sim 10^{\circ}\) |
MXT |
0.2-10 keV |
\(1.1^{\circ} \times 1.1^{\circ}\) |
~10-100” radius (statistical + systematic) |
VT |
450-650 nm (Blue band) |
\(26^{\prime} \times 26^{\prime}\) |
\(\sim 1^{\prime \prime}\) radius |
Kafka topic |
Notice types |
|---|---|
gcn.notices.svom.voevent.grm |
grm-trigger |
gcn.notices.svom.voevent.eclairs |
eclairs-wakeup eclairs-catalog slewing / not-slewing |
gcn.notices.svom.voevent.mxt |
mxt-initial mxt-update |
gcn.notices.svom.voevent.vt |
Available soon |
Notice Type |
Content |
Latency |
|---|---|---|
grm-trigger |
GRM trigger notice with possible rough localization |
\(\sim 15 \mathrm{sec}\) |
eclairs-wakeup |
ECLAIRs localization of a GRB candidate |
\(\sim 15 \mathrm{sec}\) |
eclairs-catalog |
ECLAIRs localization of a flaring source from the on-board catalogue |
\(\sim 15 \mathrm{sec}\) |
slewing/ not-slewing |
Platform slew status and target coordinates |
~18 min |
mxt-initial_qf# |
MXT localization of a found counterpart (only if platform slews) |
\(\sim 3 \mathrm{~min}\) |
mxt-update_qf# |
MXT updated localization |
\(\sim 3 \mathrm{~min}\) |
def voevent_to_json(xml_file):
# Convert to normal dict and JSON
json_data = json.loads(json.dumps(xml_file))
# Navigate through the XML structure safely
voevent = json_data.get("voe:VOEvent", {})
wherewhen = voevent.get("WhereWhen", {})
# Initialize result dictionary
result = {}
try:
obs_location = wherewhen.get("ObsDataLocation", {}).get("ObservationLocation", {})
astro_coords = obs_location.get("AstroCoords", {})
pos = astro_coords.get("Position2D", {}).get("Value2", {})
c1 = pos.get("C1")
c2 = pos.get("C2")
# The coordinates are already in RA/Dec format (not galactic)
ra = float(c1)
dec = float(c2)
result['ra'] = ra
result['dec'] = dec
print("RA:", ra)
print("Dec:", dec)
except Exception as e:
print("No coordinates found")
trigger_time = astro_coords.get("Time", {}).get("TimeInstant", {}).get("ISOTime")
result['trigger_time'] = trigger_time
print("Trigger time:", trigger_time)
# Save to JSON file
output_dir = "./gcn_alerts"
os.makedirs(output_dir, exist_ok=True)
# Extract burst ID for filename
burst_id = voevent.get("What", {}).get("Group", [{}])[0].get("Param", [{}])[1].get("@value", "unknown")
filename = f"SVOM_{burst_id}.json"
filepath = os.path.join(output_dir, filename)
with open(filepath, 'w') as f:
json.dump(result, f, indent=2)
print(f"Saved to: {filepath}")
return json_data
# Subscribe to topics and receive alerts
topics = 'gcn.notices.svom.voevent.eclairs'
# topics = 'gcn.notices.svom.voevent.grm'
timestamp1, timestamp2 = define_time_range(5, 5)
start = consumer.offsets_for_times(
[TopicPartition(topics, 0, timestamp1)])
end = consumer.offsets_for_times(
[TopicPartition(topics, 0, timestamp2)])
num_messages = end[0].offset - start[0].offset
consumer.assign(start)
for message in consumer.consume(abs(num_messages), timeout=1):
# while True:
# for message in consumer.consume(timeout=1):
if message.error():
print(message.error())
continue
# Print the topic and message ID
print(f'topic={message.topic()}, offset={message.offset()}')
value = message.value()
xml_dict = xmltodict.parse(value)
voevent_to_json(xml_dict)
topic=gcn.notices.svom.voevent.eclairs, offset=277
RA: 18.832
Dec: -41.0854
Trigger time: 2025-11-16T15:32:26.019000
Saved to: ./gcn_alerts/SVOM_sb25111604.json
topic=gcn.notices.svom.voevent.eclairs, offset=278
RA: 335.2652
Dec: -10.4105
Trigger time: 2025-11-16T15:57:15.919000
Saved to: ./gcn_alerts/SVOM_sb25111605.json
topic=gcn.notices.svom.voevent.eclairs, offset=279
RA: 335.3182
Dec: -10.4239
Trigger time: 2025-11-16T15:57:36.399000
Saved to: ./gcn_alerts/SVOM_sb25111605.json
topic=gcn.notices.svom.voevent.eclairs, offset=280
RA: 84.4289
Dec: 12.2431
Trigger time: 2025-11-19T08:47:23.892000
Saved to: ./gcn_alerts/SVOM_sb25111901.json
check https://fsc.svom.org/readthedocs/svom/notices_and_circulars/index.html for more details about circulars and notices
Einstein Probe#
Instruments |
Energy Range |
Field of View |
Localization |
|---|---|---|---|
Wide Field X-ray Telescope |
0.5-4 keV |
\(3600 \mathrm{deg}^2\) |
2 arcmin |
Follow-up X-ray Telescope |
\(0.5-10 \mathrm{keV}\) |
60 arcmin diameter |
5-15 arcsec (90% c.l.) |
Type |
Contents |
Latency |
|---|---|---|
gcn.notices.einstein_probe.wxt.alert |
Localization, Count Rate, Significance |
\(\sim 1 \mathrm{~min}\) |
def to_json(text_data, output_dir="./gcn_alerts"):
"""Parse GCN JSON format and extract position and trigger time, then save to JSON."""
try:
# Parse the JSON data
data = json.loads(text_data)
# Extract only position and trigger time
alert_info = {
"trigger_time": data.get("trigger_time"),
"ra": data.get("ra"),
"dec": data.get("dec")
}
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Generate filename based on trigger time and ID
trigger_id = data.get("id", ["unknown"])[0]
filename = f"einstein_probe_{trigger_id}.json"
filepath = os.path.join(output_dir, filename)
# Save to JSON file
with open(filepath, 'w') as f:
json.dump(alert_info, f, indent=2)
print(f"Saved alert to {filepath}")
return alert_info
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return None
except Exception as e:
print(f"Error processing alert: {e}")
return None
timestamp1, timestamp2 = define_time_range(10, 10)
# Subscribe to topics and receive alerts
topics = 'gcn.notices.einstein_probe.wxt.alert'
start = consumer.offsets_for_times(
[TopicPartition(topics, 0, timestamp1)])
end = consumer.offsets_for_times(
[TopicPartition(topics, 0, timestamp2)])
consumer.assign(start)
# Calculate the number of messages to consume, ensuring it's within valid range
num_messages = end[0].offset - start[0].offset
print(end[0].offset, start[0].offset)
for message in consumer.consume(abs(num_messages), timeout=5):
# while True:
# for message in consumer.consume(timeout=1):
if message.error():
print(message.error())
continue
# Print the topic and message ID
print(f'topic={message.topic()}, offset={message.offset()}')
value = message.value()
# xml_dict = xmltodict.parse(value)
to_json(value)
-1 182
topic=gcn.notices.einstein_probe.wxt.alert, offset=182
Saved alert to ./gcn_alerts/einstein_probe_01709248221.json
topic=gcn.notices.einstein_probe.wxt.alert, offset=183
Saved alert to ./gcn_alerts/einstein_probe_11900480769.json
Let’s switch on our kafka listener#
import threading
import time
consumer.subscribe(['gcn.notices.einstein_probe.wxt.alert',
'gcn.notices.svom.voevent.grm',
'gcn.notices.svom.voevent.eclairs',
'gcn.classic.text.SWIFT_BAT_GRB_POS_ACK',
'gcn.classic.text.FERMI_GBM_ALERT'])
def kafka_listener():
try:
while not stop_event.is_set():
msg = consumer.poll(timeout=1.0) # poll frequently
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
for message in consumer.consume(timeout=1):
if message.error():
print(message.error())
continue
print(f'topic={message.topic()}, offset={message.offset()}')
if message.topic() == 'gcn.notices.einstein_probe.wxt.alert':
value = message.value()
to_json(value)
elif 'svom' in message.topic():
value = message.value()
xml_dict = xmltodict.parse(value)
voevent_to_json(xml_dict)
elif 'SWIFT' in message.topic() or 'FERMI' in message.topic():
value = message.value()
write_json(value)
except KafkaException as e:
print(f"Kafka exception: {e}")
finally:
consumer.close()
print("Consumer closed.")
Start the listener as a background process#
CONFIG = {"group.id": "", "auto.offset.reset": "latest"}
consumer = Consumer(config=CONFIG,
client_id=gcn_client_id,
client_secret=gcn_client_secret,
domain='gcn.nasa.gov')
import threading
stop_event = threading.Event()
# --- Start listener in a daemon thread ---
listener_thread = threading.Thread(target=kafka_listener, daemon=True)
listener_thread.start()
Check if the listener is alive#
listener_thread.is_alive()
True
Stop the listener#
stop_event.set() # This will stop keep_polling safely
listener_thread.join() # wait for it to exit