Display Control Plane API Operations using Amazon CloudWatch Logs Insights

Introduction

For small organisations that cannot afford to spend much on their network security, moving to the cloud enables them to easily uplift their security posture. The organisation can concentrate on innovating and scaling their workloads, while AWS provides them with a secure environment to use. More information regarding AWS security and compliance can be found at https://docs.aws.amazon.com/whitepapers/latest/aws-overview/security-and-compliance.html

The sentence above is not entirely accurate. The security of workloads in AWS is a shared responsibility between AWS and the customer. AWS is responsible for “Security of the cloud”. This means that AWS is responsible for protecting the infrastructure that runs the workloads (hardware, software, networking, physical facilities). The customer is responsible for “Security in the cloud”. This means that the customer, based on the service they select, must perform all the necessary security configuration and management tasks to keep the workload secure. A good place to learn more about this is at https://aws.amazon.com/compliance/shared-responsibility-model/

As a good security practice, one must always monitor all the activities that happen in their cloud environment, especially those that involve the management of resources. For instance, if one notices a lot of large Amazon EC2 instances being provisioned, this could possibly be an indication of a breach (or someone authorised is provisioning these without notifying others). The management operations performed on resources in your AWS account are referred to as Control Plane operations.

There are many commercial products that can help you with monitoring your AWS environment. However, you can quite adequately benefit from the tools that are natively provided by AWS as well.

In this blog, I will take you through the steps of using Amazon CloudWatch Logs Insights, to easily display the Control Plane operations, in a meaningful way.

Implementation

Let’s start:

  1. Open the AWS Console and then navigate to the AWS CloudTrail service page. Change to the appropriate AWS region.
  2. Create a new Trail to record just Management events. Ensure it is applied to all the regions and that it delivers events to Amazon CloudWatch Logs as well. Below is a screenshot of the required settings

  3. Open the Amazon CloudWatch service page and ensure it is in the same region as the AWS CloudTrail that was just configured (in step 2 above)
  4. Click on Dashboards and then click on Create dashboard. Give the dashboard a meaningful name (I called my dashboard ControlPlaneOperations-Dashboard)
  5. In the next screen, from the top menu, click on Add widget. Another screen will open. Select Query results and then click on Configure

  6. In the next screen, use the drop-down arrow (pointed by the red arrow below) to select the CloudWatch Log group that was configured for the new CloudTrail in step 1 above.

    In the formula section (denoted by the red rectangle), delete everything and replace it with the text below (the screenshot above already has the correct formula)

    fields eventTime, userIdentity.userName, userIdentity.accessKeyId, sourceIPAddress, awsRegion, eventSource, eventName | sort eventTime desc
    

    The above formula directs Amazon CloudWatch Log Insights to display the event time, user name and access key of the identity that performed the control plane operation, the ip address from where the operation was performed, the AWS region inside which the operation was performed, the event’s source and name. The results are sorted based on the event time in descending order.

    Next, set the time range for events that CloudWatch Logs Insights must process. To configure this, pick the appropriate duration from those displayed on the top right (as displayed inside the green rectangle in the screenshot above). For my setup, I chose 1hr.

    Once completed, click Create widget

  7. The next screen should look similar the screenshot below. Click Save dashboard

     

    Amazon CloudWatch Logs Insights - Create Dashboard

  8. Your Amazon CloudWatch dashboard is now complete. To refresh the events, you can press the refresh button (pointed by the red arrow in the screenshot
  9. Amazon CloudWatch Logs Insights - Refresh Events
  10. You can also enable auto refresh of the events by clicking the small arrow beside the refresh button. You will get a menu option similar to the screenshot below.

    Tick Auto refresh and choose the Refresh interval of your choice.

  11. Pro Tip 1 – if you want your dashboard to be displayed under the Favorite section when you open the Amazon CloudWatch service page, go into the Dashboards section of Amazon CloudWatch and click on Favorite (star) beside your dashboard name.
  12. Pro Tip 2 – If you want your dashboard to appear on the default Amazon CloudWatch service page, rename your dashboard to CloudWatch-Default.
  13. Pro Tip 3 = at the beginning of each row In your dashboard, you will notice a small arrow head. If you click on the arrow head, it expands that event and provides additional information.

Thats it! You should now have a dashboard similar to the one below that shows the control plane operations as they happen.

Amazon CloudWatch Logs Insights Dashboard

At times, I found there to be approximately five minutes of delay between when the event happened and when it was displayed. This could be due to the delay between when the event was generated and when that service that generated it delivered the logs to AWS CloudTrail.

The dashboard should allow you to easily monitor any suspicious control plane activities in your AWS account.

I hope the above was useful. Till the next time, Enjoy!

Using Serverless Framework and AWS to map Near-Realtime Positions of Trains

Background

A couple of months back, I found out about the Open Data initiative from Transport for New South Wales. This is an awesome undertaking, to provide data to developers and other interested parties, so that they can develop great applications. For those interested, the Open Data hub can be accessed at https://opendata.transport.nsw.gov.au.

I had been playing with data for a few months now and when I looked through the various APIs that I could access via the Open Data hub, I became extremely interested.

In this blog, I will take you through one of my mini projects based off the data at Open Data Hub. I will be using the Public Transport – Vehicle Positions API to plot the near realtime positions of Sydney trains on a map. The API provides access to more than just train position data, however to keep things simple, I will concentrate on only trains in this blog.

Solution Architecture

As I am a huge fan of serverless, I decided to architect my solution with as much serverless components as possible. The diagram below shows a high-level architecture of how the Transport Positioning System (this is what I will call my solution, TPS for short) will be created.

Let’s go through the steps (as marked in the diagram above)

  1. The lambda function will query the Open Data API every 5 minutes for the position of all trains
  2. After the data has been received, the lambda function will go through each record and assign a label to each train. To ensure the labels are consistent across each lambda invocation, the train to label association will be stored in an Amazon DynamoDB table. The lambda function will query the table to check if a train has already been allocated a label. If it has, then that label will be used. Otherwise, a new label will be created, and the Amazon DynamoDB table will be updated to store this new train to label association.
  3. I found Bing Maps to be much easier (and cheaper) to use for plotting items on a map. The only disadvantage is that it can, at most show 100 points (called pushpins) on the map. The lambda function will go through the first 100 items returned from Open Data API and using the labels that were found/created in step 2, create a pushpin url that will be used to generate a map of the location of the first 100 trains. This url will then be used to generate the map by sending a request to Bing Maps.
  4. The lambda function will then create a static webpage that displays the map showing the position of the trains, along with a description that provides more information about the labels used for each train (for example, label 1 could have a description of “19:10 Central Station to Penrith Station”). The label description is set to the train’s label, which is obtained from the query results of the Open Data API query.

This is what I cooked up earlier

Be warned! This blog is quite lengthy as a lot was done to make this solution work. However, if you would rather see the final result before delving into the details, check it out https://sls-tps-website-dev.s3-ap-southeast-2.amazonaws.com/vehiclelocation.html.

Screenshot of TPS

Above is a screenshot of TPS in action. It is a static webpage that is being generated every 5 minutes, showing the position of trains. I will keep the lambda function running for at least three months, so you have plenty of time available to check it out.

Okay, let’s get our hands dirty and start coding.

Prerequisites

Before we start, the following must be in place

  1. You must have an AWS account. If you don’t have one already, you can sign up for a free tier at https://aws.amazon.com/free/
  2. You must have Serverless Framework installed on your computer. If you don’t have, it follow the details at https://serverless.com/framework/docs/getting-started/
  3. Setup the AWS access keys and secret access key that Serverless Framework will use to provision resources into your AWS account. Instructions to get this done can be obtained from https://serverless.com/framework/docs/providers/aws/guide/credentials/
  4. After items 1 – 3 have been completed, create a python runtime Serverless Framework service (my service is called tps)
  5. Within the tps service folder, install the following Serverless Framework plugins
    1. serverless-python-requirements – this plugin adds all the required python modules into a zip file containing our lambda function script, which then gets uploaded to AWS (the required python modules must be defined in the requirement.txt file)
    2. serverless-prune-plugin – this plugin ensures that only the specified number of lambda function versions exist.
  6. Serverless-python-requirements requires a file called requirements.txt. For this solution, create a file called requirements.txt at the root of the service folder and put the following lines inside it
    requests==2.22.0
    gtfs-realtime-bindings==0.0.6
  7. Register an account with Transport for New South Wales Open Data Hub (https://opendata.transport.nsw.gov.au/). This is free. Once registered, login to the Open Data Hub portal and then under My Account, click on Applications and then create an Application that has permissions to Public Transport – Realtime Vehicle Positions API. Note down the API key that is generated as it will be used by the python script later.
  8. Create an account with Bing Maps, use the Website licence plan. This will provide 125,000 billable transactions of generating maps per calendar year at no charge. For generating a map every 5 minutes, this is more than enough. Note down the API key that is provided. Details on creating a Bing Maps account is available at https://www.microsoft.com/en-us/maps/licensing/options

This project has two parts to it. The first is to create the AWS resources that will host our project. For this, we will be using Serverless Framework to create our AWS Lambda function, Amazon Simple Storage Service (S3) bucket, Amazon DynamoDB table, AWS CloudWatch Logs and AWS CloudWatch Events.

The second part is the API query and data ingestion from the Open Data API. The next sections will cover each of these parts.

AWS Resource Creation

As previously mentioned, we will use Serverless Framework to create our AWS resources. Serverless Framework uses serverless.yml to specify which resources need to be created. This file is created by default whenever a Serverless Framework service is created.

In this section, I will take you through the serverless.yml file I used for this project.

The file starts like this.

service: sls-${self:custom.application}
plugins:
serverless-python-requirements
serverless-prune-plugin
custom:
pythonRequirements:
dockerizePip: true
slim: true
noDeploy:
typing
stage: ${opt:stage, self:custom.defaults.stage}
region: ${opt:region, self:custom.defaults.region}
application: ${env:APPLICATION, self:custom.defaults.application}
logRetentionInDays: ${opt:logretentionindays, env:logretentionindays, self:custom.defaults.logretentionindays}
s3:
websiteBucket: sls-${self:custom.application}-website-${self:custom.stage}
dynamodb:
transportPositionTableName: transportPosition
billingMode: PAY_PER_REQUEST
defaults:
application: tps
stage: dev
region: ap-southeast-2
logretentionindays: 14
prune:
automatic: true
number: 1

It defines the service name, plugins and variables that will be used in this file (notice the plugins serverless-python-requirements and serverless-prune-plugin)

The following default values have been configured in the above serverless.yml file.

  • application name is set to tps
  • environment is set to dev
  • Amazon CloudWatch logs is set to 14 days retention
  • the AWS region is set to ap-southeast-2 (Sydney)
  • the Amazon DynamoDB table is set to transportPosition
  • the billing mode for the Amazon DynamoDB table will be set to PAY PER REQUEST.

The next section defines the details for the cloud provider where resources will be provisioned.

provider:
name: aws
runtime: python3.7
endpointType: regional
stage: ${self:custom.stage}
region: ${self:custom.region}
memorySize: 256
timeout: 300
versionFunctions: false
deploymentBucket: sls-${self:custom.application}-deploymentbucket
logRetentionInDays: ${self:custom.logRetentionInDays}
environment:
STAGE: ${self:custom.stage}
REGION: ${self:custom.region}

As previously mentioned, I am using AWS. The lambda function will use python 3.7 runtime. The deployment bucket to host all artefacts is also defined. This is an Amazon Simple Storage Service (S3) bucket. Ensure that this S3 bucket exists before deploying the serverless service.

The next section defines the IAM role that will be created for the lambda function.

# define IAM roles
iamRoleStatements:
Effect: Allow
Action:
s3:HeadObject
s3:GetObject
s3:GetObjectAcl
s3:PutObject
s3:PutObjectAcl
s3:CreateMultiPartUpload
Resource:
arn:aws:s3:::${self:custom.s3.websiteBucket}/*
Effect: Allow
Action:
dynamodb:*
Resource:
!GetAtt transportPositionTable.Arn
!Join
''
– !GetAtt transportPositionTable.Arn
'/index/TripDate-GSI'

The specified IAM role will allow the lambda function to carry out all required operations on Amazon Dynamodb table (currently the IAM role permits all DynamoDB actions, however if required, this can be tightened) and to upload objects to the Amazon S3 bucket that will host the static website.

The next section provides instructions on what to include and exclude when creating the serverless package.

# you can add packaging information here
package:
include:
src/csv/*.csv
exclude:
__cache__/**
__pycache__/**
events/**
node_modules/**
'conf/**/*'
'gulp-libs/**/*'
'yarn.lock'
'package.json'
'.dotenv'

Now we come to the important sections for within serverless.yml. The next section defines the lambda function, its handler and what events will trigger it.

functions:
tpsFindVehiclePositionsCron:
handler: src.tps_vehiclepos.run
memorySize: 3008
timeout: 900
environment:
BUCKET: ${self:custom.s3.websiteBucket}
TRANSPORTPOSITION_TABLE: ${self:custom.dynamodb.transportPositionTableName}
events:
schedule:
rate: cron(0/5 * * * ? *)
enabled: true

For the tps lambda function, the handler function is at src.tps_vehiclepos.run (this means that there is a subfolder within the service folder called src, within which there is a python file called tps_vehiclepos. Inside this file is a function called run). The lambda function will be run every 5 minutes. To achieve this, I am using AWS Events. Two environment variables are also being passed to the lambda function (BUCKET and TRANSPORTPOSITION_TABLE)

The last section defines all the resources that will be created by the Serverless Framework.

resources:
Description: Transport Positioning System Resources
Resources:
websiteBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: ${self:custom.s3.websiteBucket}
#DynamoDB will be used to store the label that each transport will be given.
#The label will be used as the pushpin label on the map.
#DynamoDB will have attributes TripDate, TripId, VehicleId, ExpirationTime(TTL),
#TimeAdded, Latitude, Longitude, PushpinLabel, PushPinLabelDescr
transportPositionTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:custom.dynamodb.transportPositionTableName}
AttributeDefinitions:
AttributeName: TripDate
AttributeType: S
AttributeName: VehicleId
AttributeType: S
KeySchema:
AttributeName: TripDate
KeyType: HASH
AttributeName: VehicleId
KeyType: RANGE
BillingMode: ${self:custom.dynamodb.billingMode}
GlobalSecondaryIndexes:
IndexName: TripDate-GSI
KeySchema:
AttributeName: TripDate
KeyType: HASH
Projection:
ProjectionType: ALL
TimeToLiveSpecification:
AttributeName: ExpirationTime
Enabled: true

The following resources will be created

  • Amazon S3 bucket. This bucket will store the Bing Maps that show the train positions. It will also serve the website which will be used to display the position maps.
  • Amazon DynamoDB table. The table will be used to store details for trains found in the Open Data API query. Note that we are also using the DynamoDB TTL feature. Since we don’t need to retain items older than a day, this will allow us to easily prune the Amazon DynamoDB table, to reduce costs.

The full serverless.yml file can be downloaded from https://gist.github.com/nivleshc/951ff2f235a89d58d4abec6de91ef738

Generating the map

In this section we will go through the script that does all the magic, which is querying the Open Data API, processing the data, generating the map and then displaying the results in a webpage.

The script is called tps_vehiclelocation.py. I will discuss the script in parts below.

The first section lists the python modules that will be imported. It also defines the variables that will be used within the script.

#this script will query OpenData API to get the current positions for Sydney Trains. It will then use this information to plot the
#positions on a map
from google.transit import gtfs_realtime_pb2
import requests
import datetime
import boto3
import os
import time
from pytz import timezone
from decimal import Decimal
#list all variables –start
baseURL = 'https://api.transport.nsw.gov.au/v1/gtfs/vehiclepos/' # Define URL for opendata API
headers = {'Authorization': 'apikey <insert your opendata api key here>'} # Define header for opendata API
bucketName = os.environ['BUCKET'] # Set up bucket from environment variable
transportPositionTable = os.environ['TRANSPORTPOSITION_TABLE'] #dynamodb table for pushpin labels
bucketImagesKey = "images/" # set bucket key for images
HtmlLandingpagename = "vehiclelocation.html" #this is the landing html page that will load the image file to show the current position of vehicles
#Define operators for opendata API. Possible operators are ['sydneytrains', 'buses', 'ferries', 'lightrail', 'nswtrains', 'regionbuses', 'metro']
operators = ['sydneytrains']
#we will use bing maps to plot the locations
maps_bing_prefix = "https://dev.virtualearth.net/REST/v1/Imagery/Map/Road?mapsize=20000,20000&&quot;
maps_bing_apikey = "format=jpeg&dcl=1&key=<insert your bing maps api key here>"
maps_error_imageURL = "https://image.shutterstock.com/image-photo/image-white-text-message-will-600w-1012395526.jpg&quot; #show this image when there are errors
#generating map
maps_bing_pushpin_limit = 100 #this is the maximum pushpins that bing maps allows
inactive_vehicle_datetime_limit = 10 #(in minutes) this will be used to identify inactive vehicles. If a vehicle's
#last stop id timestamp is at least inactive_vehicle_datetime_limit minutes old,
# then it is considered to be inactive
maps_active_vehicle_icon = '4' #this is the icon that will be used to show active vehicles on the map
maps_inactive_vehicle_icon = '10' #this is the icon that will be used to show inactive vehicles on the map
legend_vehicle_displayed_on_map_color = "#00bfff" #this color will be used to show legend entries for which the vehicle is displayed on map
lastpushpinLabel = 0 #this will be used to generate labels for pushpins. Each pushpin will have a number, which will be associated with a description.
#the association will be displayed under a legends section on the right hand side of the web page showing the map.
#the vehicle's label will be used as the description
#the dicts below allow generating labels for the pushpins that will be mapped. Each pushpin will have a label and a description.
dict_pushpinLabel_to_desc_map = {} #this dict will associate a pushpin label to its description.
dict_vehicleId_to_pushpinLabel_map = {} #this dict will associate a vehicleId to the pushpin label it has been associated to on the map.
pushpin_url = "" #this is the url for the map with the pushpins containing the vehicle positions
total_vehicles_found = 0 #this is the total vehicles found, mapped or not
dynamodb_item_expiry_in_days = 2 #number of days after which items in dynamodb will be automatically deleted. Using DynamoDB TTL
tz_sydney = timezone('Australia/Sydney') # Set up timezone
#list all variables –end
s3 = boto3.resource('s3') # Define boto3 object to do s3 operations
dynamodb = boto3.resource('dynamodb') # Define boto3 object to do dynamodb operations
pushpinLabelsTable = dynamodb.Table(transportPositionTable)
# Initialize gtfs_realtime_pb2.FeedMessage
feed = gtfs_realtime_pb2.FeedMessage()

Remember to replace <insert your opendata api key> and <insert your bing maps api key here> with your own Open Data and Bing Maps api key. Do not include the ‘<‘ ‘>’ in the script.

Next, I will take you through the various functions that have been created to carry out specific tasks.

First up is the initialise global variables function. As you might be aware, AWS Lambdas can get reused over various invocations. During my experimentations, I found this happening and the issue I found was that my global variables were not being automatically initialised. This caused erroneous results. To circumvent this issue, I decided to write an explicit function that will initialise all global variables at the beginning of each lambda function invocation.

#most of the times, lambda functions get re-used by AWS for consequetive runs. To ensure the global variables are sanitised and do not
#cause issues because they contain values from last run, we will initialise them
def initialise_global_variables():
global lastpushpinLabel
global dict_pushpinLabel_to_desc_map
global dict_vehicleId_to_pushpinLabel_map
global pushpin_url
global total_vehicles_found
#initialise the values for the above global variables
print("Initialising global variables")
lastpushpinLabel = 0
dict_pushpinLabel_to_desc_map = {}
dict_vehicleId_to_pushpinLabel_map = {}
pushpin_url = ""
total_vehicles_found = 0

When placing each train location on the map (these will be doing using pushpins), a label will be used to identify each pushpin. Unfortunately, Bing Maps doesn’t allow more than three characters for each label. This is not enough to provide a meaningful label. The solution I devised was to use a consecutive numbering scheme for the labels and then provide a key on the website page. The key will provide a description for each label. One complication with this approach is that I need to ensure the same label is used for the same train for any lambda invocation. This is achieved by storing the label to train mapping in the Amazon DynamoDb table.

The next function downloads all label to train associations stored in Amazon DynamoDB. This ensures that labels remain consistent across lambda invocations.

#read the contents ot pushpinLabelsTable and populate pushpinLabels_LabelNum and pushpinLabels_Label dict
def load_pushpinLabels_from_dynamodb(today):
global dict_pushpinLabel_to_desc_map
global dict_vehicleId_to_pushpinLabel_map
global lastpushpinLabel
rdate = today.strftime("%Y-%m-%d")
pushpinLabelsTableContents = pushpinLabelsTable.query(
IndexName='TripDate-GSI',
KeyConditionExpression='TripDate = :tripdate',
ExpressionAttributeValues={":tripdate":rdate}
)
#check to see if there were any items returned from the above query
if len(pushpinLabelsTableContents['Items']) > 0:
#there were some items returned
#the vehicle's label could be same across different vehicles however its id is unique for the day. For this reason
#we must store the vehicle id along with the pushpinLabel and pushpinLabelDesc
for item in pushpinLabelsTableContents['Items']:
pushpinLabel = item['pushpinLabel']
pushpinLabelDesc = item['pushpinLabelDesc']
vehicleId = item['VehicleId']
inDynamodbTable = True #we will mark all entries that are being populated from dynamodb, so that when we update dynamodb
#we only write items that are missing, not everything. This saves on write capacity units
#the following fields were added progressively to dynamodb. Ensure that missing items doesn't break the code. Just add blanks if missing
try:
latitude = item['latitude']
except:
latitude = ""
try:
longitude = item['longitude']
except:
longitude = ""
try:
tripId = item['tripId']
except:
tripId = ""
dict_pushpinLabel_to_desc_map[str(pushpinLabel)] = {"desc": str(pushpinLabelDesc), "vehicleId": str(vehicleId), "inDynamodbTable": inDynamodbTable, "latitude": latitude, "longitude": longitude, "tripId": tripId}
dict_vehicleId_to_pushpinLabel_map[str(vehicleId)] = {"label": str(pushpinLabel), "desc": str(pushpinLabelDesc)}
#if a higher pushpinLabel was found in pushpinLabels table, then set the lastPushpinLabel to this
if int(pushpinLabel) > int(lastpushpinLabel):
lastpushpinLabel = int(pushpinLabel)
#else there were no results for the above query, which means there were no entries in dynamodb table for today. Let's start fresh
print("load_pushpinLabels_from_dynamodb: Loaded ", len(pushpinLabelsTableContents['Items']), "pushpinLabels from dynamodb table. lastpushpinLabel:", lastpushpinLabel)

The next function just queries Open Data API for the train positions.

def callOpenData(operator, feed):
print("callOpenData:Obtaining vehicle location for", operator,end="…")
response = requests.get(baseURL + operator, headers=headers)
feed.ParseFromString(response.content)
print("done")

The following function takes the data from the above function and processes it.

def process_feed(feed, today):
global dict_pushpinLabel_to_desc_map
global dict_vehicleId_to_pushpinLabel_map
global lastpushpinLabel
global pushpin_url
global total_vehicles_found
#we will use the python module 'time' to convert epoch time (this is what gtfsr timestamps are in) to local time
#set the timezone for time
os.environ['TZ'] = 'AEST-10AEDT-11,M10.5.0,M3.5.0'
time.tzset()
print(f'timezone set to {time.tzname}')
num_pushpin_assigned = 0
total_feed_entity = len(feed.entity)
total_vehicles_found += total_feed_entity
print('Total feed.entity:',total_feed_entity," Total Vehicles Found:",total_vehicles_found)
#As we are using Bing maps, there is a limit to the number of pushpins we can specify for our map. If we
#reach this limit, we will break out of the loop below as there is no benefit for continuing on processing
for entity in feed.entity:
if num_pushpin_assigned >= maps_bing_pushpin_limit:
break #we haev exceeded the number of pushpins that can be used with Bing Maps. Exit loop
tripupdatetimestamp_autz = time.ctime(entity.vehicle.timestamp)
vehicleId = entity.vehicle.vehicle.id
#we need to make sure that the fields used for dynamodb keys are not null. If they are then skip this record. Currently these are TripDate which is
#todays date and vehicleId. So just check vehicleId for being not null and TripDate won't be null.
if (vehicleId): #only go ahead if vehicleId is present. Otherwise just print that vehicleId for this record is missing
#lets find out if this vehicle already has a pushpinLabel assigned for today
if vehicleId in dict_vehicleId_to_pushpinLabel_map.keys():
#this vehicle already has a pushpinLabel. Get the label
pushpinLabel = dict_vehicleId_to_pushpinLabel_map[vehicleId]['label']
else:
#this vehicle doesn't have any pushpinLabel already assigned. Generate a new pushpinLabel for it
lastpushpinLabel += 1 #increment the lastpushpinLabel so that it now points to a new number
pushpinLabel = lastpushpinLabel
tripId = entity.vehicle.trip.trip_id
#the pushpinLabelDesc will be set to the vehicle's label. There have been instances where I noticed the vehicle's label is missing/null.
#In these cases, set the pushpinLabelDesc to TripId
if not entity.vehicle.vehicle.label:
pushpinLabelDesc = tripId
else:
pushpinLabelDesc = entity.vehicle.vehicle.label
#since this vehicle had not been previously assigned a pushpinLabel for today, add its details to the two dict
inDynamodbTable = False #this item has not been read from or written to dynamodb yet
latitude = entity.vehicle.position.latitude
longitude = entity.vehicle.position.longitude
dict_pushpinLabel_to_desc_map[str(pushpinLabel)] = {"desc": str(pushpinLabelDesc),"vehicleId": str(vehicleId),"inDynamodbTable":inDynamodbTable,"latitude":latitude,"longitude":longitude,"tripId":tripId}
dict_vehicleId_to_pushpinLabel_map[str(vehicleId)] = {"label": str(pushpinLabel), "desc": str(pushpinLabelDesc)}
#this vehicle will be displayed on the map. Update dict_pushpinLabel_to_desc_map for this vehicle's entry so that when the legend is
#generated, it will be coloured differently to show that it is currently displayed on the map
dict_pushpinLabel_to_desc_map[str(pushpinLabel)]['isDisplayedOnMap'] = True
#add this vehicle's details to the pushpin url
pushpin_url += "pp=" + str(entity.vehicle.position.latitude) + "," + str(entity.vehicle.position.longitude)
#based on how long ago the laststopid timestamp is, calculate if the vehicle is active or inactive and respectively assign the icon
if (datetime.datetime.now() datetime.datetime.strptime(tripupdatetimestamp_autz,'%a %b %d %H:%M:%S %Y')) > datetime.timedelta(minutes=inactive_vehicle_datetime_limit):
#this vehicle is inactive
pushpin_url += ";" + maps_inactive_vehicle_icon + ";"
else:
pushpin_url += ";" + maps_active_vehicle_icon + ";"
pushpin_url += str(pushpinLabel) + "&"
num_pushpin_assigned += 1 #increment the counter that denotes the number of pushpins added to the pushpin url
else:
print("process_feed:MissingVehicleId:VehicleWillBeSkipped:TripId:", entity.vehicle.trip.trip_id, " VehicleLabel:", entity.vehicle.vehicle.label," Latitude: ", entity.vehicle.position.latitude, " Longitude: ", entity.vehicle.position.longitude)

The function goes through each vehicle record that was returned and checks to see if the vehicle already has a label associated to it. If there is one, then this label will be used for it otherwise a new one is created. The Amazon DynamoDB table will be updated with this newly created label. This function uses the first 100 trains returned by the Open Data API, to generate a pushpin URL. This URL will be used to generate the Bing Maps showing the positions of the trains.

The following function uses the pushpin URL to create a map using Bing Maps.

def generate_vehicle_position_webpage(map_url, body, s3Bucket, s3ImageKey, imagefilename, mainHtmlfilename, today):
global dict_pushpinLabel_to_desc_map
global dict_vehicleId_to_pushpinLabel_map
global lastpushpinLabel
global total_vehicles_found
#get a handle on the s3 bucket
s3 = boto3.resource('s3')
imgObject = s3.Object(s3Bucket,s3ImageKey+imagefilename)
map_webrequest = requests.post(map_url, data=body)
#check if the maps were successfully obtained.
if map_webrequest.status_code != 200:
#there was an error. show the error
print("generate_vehicle_position_webpage:Bing Map request failed")
print("generate_vehicle_position_webpage:Reason:", map_webrequest.reason)
print("generate_vehicle_position_webpage:ErrorMessage:", map_webrequest.text)
print("generate_vehicle_position_webpage:RequestBody:", body)
#as there has been an error generating the map, display an image denoting that there has been an error
map_webrequest = requests.get(maps_error_imageURL)
else:
print("generate_vehicle_position_webpage:Bing Map request was successful. Status code:",map_webrequest.status_code)
#upload the image file to S3 bucket, set it for public read and ensure content-type is image/jpeg
print("generate_vehicle_position_webpage:Uploading map to s3 bucket")
upload_img_result = imgObject.put(Body=map_webrequest.content,ACL='public-read',ContentType='image/jpeg')
print("generate_vehicle_position_webpage:Generating landing html page:",mainHtmlfilename,end="…")
#Generate the label description using all the pushpin labels for the day. The first entry will be the header field
pushpinLabels = "<li>Label – Description <font color=" + legend_vehicle_displayed_on_map_color + ">[blue label descriptions show currently displayed vehicles]</font></li>"
for index in range(1, lastpushpinLabel + 1):
pushpinLabel_desc = dict_pushpinLabel_to_desc_map[str(index)]['desc']
#the isDisplayedOnMap attribute was added later to DynamoDB so there might be items that don't have it. This ensures that a call to get this
#attribute will not break the program
try:
isDisplayedOnMap = dict_pushpinLabel_to_desc_map[str(index)]['isDisplayedOnMap']
except:
isDisplayedOnMap = False
#for all vehicles currently displayed on map, show their label description in a different color. This makes it easy to differentiate between a vehicle
#that is currently displayed and one that was previously displayed today however it is now not mapped.
if isDisplayedOnMap:
pushpinLabels += "<font color=" + legend_vehicle_displayed_on_map_color + "><li>" + str(index) + " – " + str(pushpinLabel_desc) + "</li></font>"
else:
pushpinLabels += "<li>" + str(index) + " – " + str(pushpinLabel_desc) + "</li>"
htmlObject = s3.Object(s3Bucket,mainHtmlfilename)
htmlContent= """<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body {
font-family: Arial;
color: white;
}
.split {
height: 100%;
width: 50%;
position: fixed;
z-index: 1;
top: 0;
overflow-x: hidden;
padding-top: 20px;
}
.left {
left: 0;
width: 70%;
background-color: #111;
}
.right {
right: 0;
width: 30%;
background-color: black;
}
.centered {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
text-align: center;
}
</style>
<script>
<!–
function timedRefresh(timeoutPeriod) {
setTimeout("location.reload(true);",timeoutPeriod);
}
window.onload = timedRefresh(60000);
// –>
</script>
</head>"""
htmlContent +="""
<body>
<div class="split left">
<div class="centered">
<img src={} alt="Vehicle Location Map" width="1250" height="1000">
<p> image source {}</p>
</div>
</div>
<div class="split right">
<p>Map last updated at {} [updated every 5 min].
<br>Total vehicles found {} (at most, only the first 100 will be mapped)
<br>
<br>Icon Description
<br>Blue = active vehicles (location reported within last {} minutes)
<br>Red = inactive vehicles (location last reported at least {} minutes ago)
</p>
<ul style="list-style-type:square;">""".format(s3ImageKey+imagefilename, imagefilename, imagefilename[:4], total_vehicles_found, inactive_vehicle_datetime_limit, inactive_vehicle_datetime_limit)
htmlContent += pushpinLabels
htmlContent +="""
</ul>
</div>
</body>
</html>"""
upload_html_result = htmlObject.put(Body=htmlContent,ACL='public-read',ContentType='text/html')
print("done")
print("generate_vehicle_position_webpage:Uploaded map:",imagefilename," UploadResult:",upload_img_result)
print("generate_vehicle_position_webpage:Uploaded htmlfile:",mainHtmlfilename," UploadResult:",upload_html_result)

The function provides the pushpin URL to Bing Maps. Bing Maps returns a map showing the positions of the trains. The map is then uploaded to the Amazon S3 bucket that will serve the website. The function then goes generates a description page showing descriptions for each label. As you can imagine, each day, there are hundreds of labels being created. Not all of these labels will be displayed on the map, however they would be listed in the key area. To provide quick access to the key descriptions which show trains that are currently displayed in the map, the corresponding entries will be displayed in blue. This ensures that people don’t go on a wild goose chase, trying to locate a train on the map which might not be displayed.

The next function updates the transport position Amazon DynamoDB table with any new labels that were created during this invocation. This ensures that the labels persist for all subsequent lambda invocations.

def update_pushpinLabelsTable(tripdate):
global lastpushpinLabel
#update the pushpinLabels dynamodb table with all the pushpinLabels that were created in this invocation
print("update_pushpinLabelsTable:Uploading new pushpinLabels to dynamodb Table")
time_now = datetime.datetime.now(tz_sydney) #each item uploaded to dynamodb table will have the time it was inserted
time_now_str = time_now.strftime("%Y-%m-%d %H:%M:%S")
epoch_time_now = time_now.timestamp()
expirationTime = int(epoch_time_now + (dynamodb_item_expiry_in_days * 24 * 3600)) #convert expiry days to seconds
print("update_pushpinLabelsTable:time_now:", time_now_str," epoch_time_now:", epoch_time_now, " expirationTime:", expirationTime)
num_items_added_to_dynamodb = 0
for index in range(1, lastpushpinLabel + 1):
pushpinLabel = index
pushpinLabelDesc = dict_pushpinLabel_to_desc_map[str(index)]['desc']
vehicleId = dict_pushpinLabel_to_desc_map[str(index)]['vehicleId']
inDynamodbTable = dict_pushpinLabel_to_desc_map[str(index)]['inDynamodbTable']
latitude = dict_pushpinLabel_to_desc_map[str(index)]['latitude']
longitude = dict_pushpinLabel_to_desc_map[str(index)]['longitude']
tripId = dict_pushpinLabel_to_desc_map[str(index)]['tripId']
#only write back to dynamodb table those entries that are new. Dynamodb items to be regarded as immutable and should not be changed.
#using TTL (which is set to attribute ExpirationTime) allows for easy cleanup of items as we don't want items longer than 24 hours as they
#are not being mapped (default expirationTime has been set to 2 days)
if (not inDynamodbTable):
try:
pushpinLabelsTable.put_item(
Item={
'TripDate': str(tripdate),
'VehicleId': str(vehicleId),
'tripId': str(tripId),
'pushpinLabel': str(pushpinLabel),
'pushpinLabelDesc': str(pushpinLabelDesc),
'latitude': str(latitude),
'longitude': str(longitude),
'TimeAdded': time_now_str,
'ExpirationTime': expirationTime
}
)
num_items_added_to_dynamodb += 1
except Exception as e:
print("update_pushpinLabelsTable:Error with put_item operation ",str(e))
print("update_pushpinLabelsTable:TripDate:",str(tripdate)," VehicleId:",str(vehicleId)," pushpinLabel:",str(pushpinLabel)," pushpinLabelDesc:",str(pushpinLabelDesc)," Latitude:",str(latitude)," Longitude:",str(longitude)," TimeAdded:",time_now_str," ExpirationTime:",expirationTime)
num_items_added_to_dynamodb += 1
print("update_pushpinLabelsTable:Uploaded ",num_items_added_to_dynamodb," new pushpinLabel(s) to dynamodb table. LastpushpinLabel:",lastpushpinLabel)

Now that we know what all the functions do, lets move on to the handler function that the tps lambda will call. The handler function will coordinate all the functions.

def run(event, context):
print(datetime.datetime.now(), 'Started')
today = datetime.datetime.now(tz_sydney)
initialise_global_variables()
load_pushpinLabels_from_dynamodb(today)
#loop through each operator and get their vehicle positions
for operator in operators:
callOpenData(operator, feed)
process_feed(feed, today)
rdate = today.strftime("%Y-%m-%d")
time_now = datetime.datetime.now(tz_sydney)
mapImageName = time_now.strftime("%Y-%m-%dT%H%M") + '.jpg'
#get fhe image file and upload it to s3
generate_vehicle_position_webpage(maps_bing_prefix + maps_bing_apikey,pushpin_url[:1], bucketName, bucketImagesKey, mapImageName, HtmlLandingpagename, rdate)
update_pushpinLabelsTable(rdate)

The handler function calls the respective functions to get the following tasks done (in the order listed below)

  • Initialises the global variables
  • downloads the previously associated labels from the Amazon DynamoDB table
  • calls Open Data API to get the latest position of the trains.
  • the train data is then processed, and the pushpin URL generated
  • Using the pushpin URL, the map is generated using Bing Maps.
  • A landing page is created. This webpage shows the map along with a key to show descriptions for each label.
  • Finally, all labels that were created within this lambda invocation are uploaded to the Amazon DynamoDB table. This ensures that for all subsequent invocations of the lambda function, the respective trains get the same label assigned to them.

The full tps_vehiclepos.py file can be downloaded from https://gist.github.com/nivleshc/03cb06bd6ae9cb969192af0ee2a1a15b

That’s it! Now you have a good idea about how the AWS resources were generated and how the data was acquired, processed and then visualised.

Cost to run the solution

When I started developing this solution, to view what type of data was being provided by Open Data API, I tried to ingest everything into DynamoDB. This was a VERY VERY bad idea as it cost me quite a lot. However, since then I have modified my code to only ingest and store fields that are required, into Amazon DynamoDB table. This has drastically reduced by running costs. Currently I am being charged approximately USD0.05 or less per day. You can run this easily within your free tier without incurring any additional costs (just as a precaution, monitor your costs to ensure there are no surprises).

As mentioned at the beginning of this blog, the final result can be seen at https://sls-tps-website-dev.s3-ap-southeast-2.amazonaws.com/vehiclelocation.html. The webpage is refreshed every one minute however the map is generated every five minutes. I will keep this project running for at least three months. So, if you want to check it out, you have ample time.

I have extremely enjoyed creating this project, I hope you all enjoy it as well. Till the next time, Enjoy!