Making an Annotator ∞
Request to Lasair team ∞
Annotation means that external users push information to the Lasair database.
Therefore it requires that user to inform the Lasair team and be approved
before it will work. The team will use the admin interface to create an annotator
object in the database, which is a conneciton between the API token of that user
with the name (topic
) assigned to the annotator.
In the first case, write to Lasair Team to propose your annotator.
Here we see that admin interface that the Lasair team uses to make your annotator, if it is approved.:
You can see that your new annotator is a name (or topic) and description.
It is bound to your own Lasair account, meaning that you must use your own API token
to run your annotator code (see below). There is also a URL for further information.
If the annotator is active
, it can receive annotations, meaning that if the external system
goes wild, the annotator can be switched off by setting active=0
. Finally, the annotator
can be public
or not, meaning that it is visible or not to others building Lasair filters.
Make the code ∞
The following code reads the stream from a Lasair filter, and for each objectId
,
it pulls the complete object information so it could analyse the lightcurve
and other information before making a classification decision.
This information is collected up as the annotation and sent back to Lasair,
where it will be available for others to query.
There should be a file settings.py
file to accomany the code below with these variables defined::
TOPIC_IN
: The name of your streaming query as seen in the filter detail, where it says “The filter is streamed via kafka with the topic name”GROUP_ID
: Choose a new one evey run when testing; keep it constant for long-term runningAPI_TOKEN
: As found in ‘My Profile’ top right of the web pageTOPIC_OUT
: The name of your annotator as agreed with the Lasair team (above)
If the code below is not clear, it would be good for you to read about how the (Lasair client)[rest-api.html] works.
For more information about what is returned as objectInfo
, a complete example
is shown here.
For testing purposes, the GROUP_ID
will change frequently, and you get all of the alerts
the come from the given stream. Then you will set up your annotator program to run continuously,
perhaps in a screen
session on a server machine, or started every hour by cron
.
In that case, the GROUP_ID
will remain constant, so you won’t get any alerts twice.
A much simpler code is possible if for example the annotation is the classification
results from another broker. In that case, only the call to L.annotator()
is necessary.
import json, sys, settings
import lasair
# This function deals with an object once it is received from Lasair
def handle_object(objectId, L, topic_out):
# from the objectId, we can get all the info that Lasair has
objectInfo = L.objects([objectId])[0]
print(objectInfo.keys())
if not objectInfo:
return 0
# objectInfo.keys():
# -- objectData: about the object and its features
# -- candidates: the lightcurve of detections and nondetections
# -- sherlock: the sherlock information
# -- TNS: any crossmatch with the TNS database
# analyse object here. The following is a toy annotation
classdict = {'fruit': 'apple'}
classification = 'ripe'
explanation = 'another nice apple'
# now we annotate the Lasair data with the classification
L.annotate(
topic_out,
objectId,
classification,
version='0.1',
explanation=explanation,
classdict=classdict,
url='')
print(objectId, '-- annotated!')
return 1
#####################################
# first we set up pulling the stream from Lasair
# a fresh group_id gets all, an old group_id starts where it left off
group_id = settings.GROUP_ID
# a filter from Lasair, example 'lasair_2SN-likecandidates'
topic_in = settings.TOPIC_IN
# kafka consumer that we can suck from
consumer = lasair.lasair_consumer('kafka.lsst.ac.uk:9092', group_id, topic_in)
# the lasair client will be used for pulling all the info about the object
# and for annotating it
L = lasair.lasair_client(settings.API_TOKEN)
# TOPIC_OUT is an annotator owned by a user. API_TOKEN must be that users token.
topic_out = settings.TOPIC_OUT
# just get a few to start
max_alert = 5
n_alert = n_annotate = 0
while n_alert < max_alert:
msg = consumer.poll(timeout=20)
if msg is None:
break
if msg.error():
print(str(msg.error()))
break
jsonmsg = json.loads(msg.value())
objectId = jsonmsg['objectId']
n_alert += 1
n_annotate += handle_object(objectId, L, topic_out)
print('Annotated %d of %d objects' % (n_annotate, n_alert))