In a previous article we explained the architecture of WCS (Wizzie Community Stack) and its components, which allow us to perform streaming and real-time analytics of very different data types, simply by configuring the different components of the platform.
To achieve this, a series of problems must be solved, which are listed below and which will be detailed in their respective sections:
- Obtaining game data.
- Normalization of the data (with our Normalizer module).
- Indexing of the data (in Druid).
- Visualization (with our Wizz-Vis module).
To solve each problem we will use different tools from the WCS stack, as shown in the diagram, where you can see a summary of the data processing flow.
Before getting down to work, we will need to fulfill the following requirements in order to reproduce what is explained in this post:
- A computer with at least 16 GB of RAM and Ubuntu 16.04+ or Centos 7.
- WCS installed (you can check this post)
- Another computer with the Counter Strike: Global Offensive game (free version available).
Obtaining the game data
To obtain data from a Counter Strike: Global Offensive game, we will take advantage of a functionality that allows sending information about the game’s status through HTTP POST messages with JSON. You can get more information about this functionality in the game documentation.
Since the way to send the data is through the HTTP protocol, we need a component that is able to receive these requests and send them to a Kafka topic. For this, we can use the n2kafka service. The easiest way is to start a docker on the same computer as the one where WCS has been deployed, using the following command:
docker run --restart always -d --net=host -e KAFKA_BROKERS=localhost --name=n2kafka wizzieio/n2kafka:2.1.2
This service listens to the HTTP protocol on port 7980. When sending POST messages via JSON to the path /v1/data/<topic>, it will send that JSON message to the topic specified in <topic>.
For this to work properly, we first have to create the topic where we want to receive the data. Specifically we will create the topic csgo_input, using the following command:
wcs kafka topics --create --topic csgo_input --replication-factor 1 --partitions 1
Once the service and the topic are ready, we can activate the functionality of sending game events adding the following configuration file to the configuration directory of Counter Strike:
"Console Sample v.1" { "uri" "http://:7980/v1/data/csgo_input" "timeout" "5.0" "buffer" "0.5" "throttle" "0.5" "heartbeat" "60.0" "data" { "provider" "1" "bomb" "1" "map" "1" "round" "1" "allplayers_id" "1" "allplayers_state" "1" "allplayers_match_stats" "1" "allplayers_weapons" "1" "allplayers_position" "1" "phase_countdowns" "1" "allgrenades" "1" } }
In the uri parameter you have to replace <WCS-IP> with the IP of the computer where WCS is running. The rest of the parameters can be left the same as in the example.
Once we start the game, we should start receiving JSON messages in the topic csgo_input. To check it we can use the following command:
wcs kafka consume --topic csgo_input
Data normalization
Once the data acquisition part is completed, the events that the game sends will be available in the csgo_input topic, so we can start working with them. The game sends the data that we are interested in when we activate the spectator mode in a game. As an example, we will put several bots to fight each other on the legendary Dust II map.
When the game starts, we start receiving messages like this one, every second:
{ "provider": { "name": "Counter-Strike: Global Offensive", "appid": 730, "version": 13656, "steamid": "76561198157701279", "timestamp": 1540034617 }, "bomb": { "state": "carried", "position": "1073.00, 2361.19, 126.67", "player": 76561197960265736 }, "map": { "mode": "casual", "name": "de_dust2", "phase": "live", "round": 1, "team_ct": { "score": 0, "timeouts_remaining": 1, "matches_won_this_series": 0 }, "team_t": { "score": 1, "timeouts_remaining": 1, "matches_won_this_series": 0 }, "num_matches_to_win_series": 0, "current_spectators": 0, "souvenirs_total": 0 }, "round": { "phase": "live" }, "allplayers": { "2": { "name": "Yogi", "observer_slot": 6, "team": "T", "state": { "health": 0, "armor": 0, "helmet": false, "flashed": 0, "burning": 0, "money": 900, "round_kills": 0, "round_killhs": 0, "round_totaldmg": 0, "equip_value": 4150 }, "match_stats": { "kills": 0, "assists": 0, "deaths": 1, "mvps": 0, "score": 1 }, "weapons": { }, "position": "-410.94, -163.14, -0.74", "forward": "-0.09, 1.00, -0.03" }, "2": { ... }, }, "phase_countdowns": { "phase": "live", "phase_ends_in": "72.4" }, "grenades": {}, "previously": { ... } }
Our goal is to obtain a series of dimensions and metrics that the Druid database can understand. To be precise, Druid requires that the JSON only has a single nesting level, and that there are only key-value pairs, without arrays or objects. To achieve that, we will process the message using the Normalizer component.
Initially we want to get the following information:
- Timestamp of each event (essential for its indexing in Druid).
- State of the players (life, armor, etc).
- Position of the players.
- Information about the map (name, round, game mode, etc).
- Statistics of the players (eliminations, deaths, mvps, etc).
With these requirements we can begin to create the Normalizer stream-plan. We will generate a message with information about each player and about the bomb, so there will be a processing flow for each element. As a result, we want to get a message like the following one, for each player:
{ "kills": 0, "round_killhs": 0, "type": "player", "mode": "casual", "state": "alive", "score": 0, "player_id": "76561197960265733", "assists": 0, "flashed": 0, "map": "de_dust2", "deaths": 0, "timestamp": 1540224174, "mvps": 0, "phase": "live", "equip_value": 1200, "health": 93, "team": "T", "steamid": "111111111111", "burning": 0, "position_x": -422.78, "armor": 100, "position_y": 235.91, "round": 0, "money": 1000, "position_z": -0.55, "name": "Toby", "round_kills": 0, "round_totaldmg": 0 }
And, for the bomb, another message of this type:
{ "phase": "live", "bomb_state": "carried", "latlong": "-562.92,-220.67", "type": "bomb", "steamid": "111111111111", "mode": "casual", "position_x": -220.67, "player_id": 76561197960265732, "position_y": -562.92, "round": 0, "position_z": 0.26, "name": "Bomb", "map": "de_dust2", "timestamp": 1540224174 }
You can see the flow of the event processing in the diagram on the left.
Basically, we process each message in such a way that, in the end, we get differentiated messages for each player and for the bomb. In the players’ branch, we filter the messages so that only those who have information about the players remain and then we create a message for each player (instead of one with all the players). Subsequently, we select the metrics we want to keep and place them all in the first level of the JSON output.
A similar process is carried out with the information about the bomb. Finally, we adapt the position field, which is a string with the coordinates separated by commas, to obtain each dimension with a different key and convert it to a number.
The stream plan that does this is attached below:
{ "inputs": { "csgo_input": [ "players", "bomb"] }, "streams": { "bomb": { "funcs" : [ { "name": "ContainsDimensionFilter", "className": "io.wizzie.normalizer.funcs.impl.ContainsDimensionFilter", "properties": { "dimensions": ["bomb"] } },{ "name":"SimpleMapper", "className":"io.wizzie.normalizer.funcs.impl.SimpleMapper", "properties": { "maps": [ {"dimPath":["provider","timestamp"], "as":"timestamp"}, {"dimPath":["provider","steamid"], "as":"steamid"}, {"dimPath":["map","name"], "as":"map"}, {"dimPath":["map","round"], "as":"round"}, {"dimPath":["map","mode"], "as":"mode"}, {"dimPath":["map","phase"], "as":"phase"}, {"dimPath":["bomb", "player"], "as":"player_id"}, {"dimPath":["bomb", "state"], "as":"bomb_state"}, {"dimPath":["bomb", "position"], "as": "position"} ] } },{ "name": "TypeIdentification", "className": "io.wizzie.normalizer.funcs.impl.FieldMapper", "properties": { "dimensions": [ { "dimension": "type", "value": "bomb", "overwrite": true } ] } } ], "sinks":[ {"topic":"common", "type":"stream"} ] }, "players" : { "funcs" : [ { "name": "ContainsDimensionFilter", "className": "io.wizzie.normalizer.funcs.impl.ContainsDimensionFilter", "properties": { "dimensions": ["allplayers"] } }, { "name":"FromMapToArray", "className":"io.wizzie.normalizer.funcs.impl.MapFlattenMapper", "properties" : { "flat_dimension": "allplayers", "key_dimension": "player_id", "output_dimension": "allplayers" } },{ "name":"MessagesPerPlayer", "className":"io.wizzie.normalizer.funcs.impl.ArrayFlattenMapper", "properties": { "flat_dimension": "allplayers" } },{ "name":"SimpleMapper", "className":"io.wizzie.normalizer.funcs.impl.SimpleMapper", "properties": { "maps": [ {"dimPath":["provider","timestamp"], "as":"timestamp"}, {"dimPath":["provider","steamid"], "as":"steamid"}, {"dimPath":["map","name"], "as":"map" }, {"dimPath":["map","round"], "as":"round" }, {"dimPath":["map","mode"], "as":"mode" }, {"dimPath":["map","phase"], "as":"phase" }, {"dimPath":["name"]}, {"dimPath":["player_id"]}, {"dimPath":["team"], "as":"team_id"}, {"dimPath":["team"], "as":"team"}, {"dimPath":["state","health"]}, {"dimPath":["state","armor"]}, {"dimPath":["state","helmet"]}, {"dimPath":["state","flashed"]}, {"dimPath":["state","burning"]}, {"dimPath":["state","money"]}, {"dimPath":["state","round_kills"]}, {"dimPath":["state","round_killhs"]}, {"dimPath":["state","round_totaldmg"]}, {"dimPath":["state","equip_value"]}, {"dimPath":["match_stats","kills"]}, {"dimPath":["match_stats","assists"]}, {"dimPath":["match_stats","deaths"]}, {"dimPath":["match_stats","mvps"]}, {"dimPath":["match_stats","score"]}, {"dimPath":["position"]} ] } },{ "name": "TypeIdentification", "className": "io.wizzie.normalizer.funcs.impl.FieldMapper", "properties": { "dimensions": [ { "dimension": "type", "value": "player", "overwrite": true } ] } }, { "name":"StateClassification", "className":"io.wizzie.normalizer.funcs.impl.ClassificationMapper", "properties": { "dimension": "health", "new_dimension": "state", "classification": ["dead", "alive"], "intervals": [0], "unknown_value": -1 } } ], "sinks":[ {"topic":"common", "type":"stream"} ] }, "common": { "funcs": [ { "name":"PositionSplitter", "className":"io.wizzie.normalizer.funcs.impl.StringSplitterMapper", "properties": { "dimension": "position", "delete_dimension": true, "delimitier": ", ", "fields": ["position_x", "position_y", "position_z"] } }, { "name":"TypeConverter", "className":"io.wizzie.normalizer.funcs.impl.FieldTypeConverterMapper", "properties": { "conversions": [ { "dimension": "position_x", "from": "string", "to": "number" }, { "dimension": "position_y", "from": "string", "to": "number" }, { "dimension": "position_z", "from": "string", "to": "number" } ] } } ], "sinks": [ {"topic":"csgo_norm", "type":"kafka", "partitionBy": "steamid"} ] } } }
The best way to correctly understand everything that is being done in the stream-plan is to consult what each of the functions do, in the Normalizer documentation.
To apply this stream-plan we have to copy its contents to the <WCS-installation-dir>/etc/wcs/normalizer-stream-plan.json file, where we then replace WCS-installation-dir with the WCS installation directory. By default it’s /usr/local/etc/wcs/normalizer-stream-plan.json. Once copied, we restart the Normalizer to apply the changes:
wcs stop normalizer wcs start normalizer
Again, we can check if it is working by consulting the kafka topic where the Normalizer writes to.
wcs kafka consume --topic csgo_norm
Data indexing
Once the messages are normalized and flattened, so that they can be processed by Druid, we can proceed to index them. In WCS this is done by configuring a Druid supervisor, in which we declare the dimensions and aggregations that Druid should index.
Since the normalizer writes the processed messages in the topic csgo_norm, the supervisor will be configured so that the indexing tasks read from that topic. The supervisor’s spec is as follows:
{ "type": "kafka", "dataSchema": { "dataSource": "csgo", "parser": { "type": "string", "parseSpec": { "format": "json", "timestampSpec": { "column": "timestamp", "format": "ruby" }, "dimensionsSpec": { "dimensions": [ "player_id", "type", "name", "team", "round", "mode", "state", "bomb_state", "phase" ], "dimensionExclusions": [], "spatialDimensions": [ { "dimName": "coordinates", "dims": [ "position_y", "position_x" ] } ] } } }, "metricsSpec": [ { "type": "count", "name": "events" }, { "type": "hyperUnique", "name": "players", "fieldName": "name", "isInputHyperUnique": false, "round": true }, { "name": "health", "type": "doubleSum", "fieldName": "health" }, { "name": "armor", "type": "doubleSum", "fieldName": "armor" }, { "name": "helmet", "type": "doubleSum", "fieldName": "helmet" }, { "name": "flashed", "type": "doubleSum", "fieldName": "flashed" }, { "name": "burning", "type": "doubleSum", "fieldName": "burning" }, { "name": "money", "type": "doubleSum", "fieldName": "money" }, { "name": "round_kills", "type": "doubleSum", "fieldName": "round_kills" }, { "name": "round_killhs", "type": "doubleSum", "fieldName": "round_killhs" }, { "name": "round_totaldmg", "type": "doubleSum", "fieldName": "round_totaldmg" }, { "name": "equip_value", "type": "doubleSum", "fieldName": "equip_value" }, { "name": "kills", "type": "doubleSum", "fieldName": "kills" }, { "name": "assists", "type": "doubleSum", "fieldName": "assists" }, { "name": "deaths", "type": "doubleSum", "fieldName": "deaths" }, { "name": "mvps", "type": "doubleSum", "fieldName": "mvps" }, { "name": "score", "type": "doubleSum", "fieldName": "score" }, { "name": "position_z", "type": "doubleSum", "fieldName": "height" } ], "granularitySpec" : { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "SECOND" } }, "ioConfig": { "topic": "csgo_norm", "consumerProperties": { "bootstrap.servers": "kafka:9092" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H" } }
To apply this spec we have to do a POST to the druid overlord. It can be done with the following command on the computer where WCS runs, having the spec in a file called index.json:
curl -X POST -H 'Content-Type: application/json' -d @index.json http://localhost:8084/druid/indexer/v1/supervisor
For simplicity’s sake, we have started using aggregations of type “DoubleSum”, which basically add the values of the metrics in each granularity time. It is important to note that we are going to use a second granularity, since we are obtaining real-time analytics of a game and we want to obtain the information from it as soon as possible.
Visualization
At this point we have the entire processing pipeline until we have them in the Druid database. Now we can start to query Druid from Wizz-vis to create dashboards and visualize the information.
Let’s start by creating the dashboard as such in Wizz-vis. Although it can be done directly from the Wizz-vis interface, in this case we will use the API. It is documented using Swagger, accessible through the path /swagger-ui.
For a dashboard called “CS GO” you would have to do a POST to /api/v1/dashboards with the following payload:
{ "name": "CS GO", "theme": "light", "interval": 1, "locked": false, "widgets": [] }
We have specified a refresh interval of 1 second to achieve a real-time experience.
Once we have the dashboard, we can start creating widgets. Let’s start with the simplest: a table with the statistics of each player. In each of the rows, of the table we will show the values of the metrics associated with a value of the dimension “name” (player name). We will choose the following metrics as an example:
- kills
- deaths
- assists
- mvps
Given that the aggregation we have configured for these metrics is “DoubleSum”, we will have to do a post-aggregation by dividing by the number of events in order to have the average value of the different measurements, and not the sum. In addition, we will do this in 3 second windows to make sure that events have arrived during that interval.
The configuration of the JSON widget is shown below. To understand the details of your configuration it is convenient to take a look at the Wizz-vis documentation.
{ "type": "WidgetTable", "title": "Player statistics", "dashboard_id": 1, "row": 0, "col": 6, "size_x": 6, "size_y": 3, "range": "last_3_seconds", "granularity": "PT1S", "start_time": null, "end_time": null, "limit": 10, "options": { "metrics": [ "Kills", "Assists", "Deaths", "Mvps" ] }, "datasource_name": "csgo", "dimensions": [ "name" ], "aggregators": [ { "aggregator": "kills", "aggregator_name": "kills", "filters": [] }, { "aggregator": "deaths", "aggregator_name": "deaths", "filters": [] }, { "aggregator": "assists", "aggregator_name": "assists", "filters": [] }, { "aggregator": "mvps", "aggregator_name": "mvps", "filters": [] }, { "aggregator": "events", "aggregator_name": "events", "filters": [] } ], "post_aggregators": [ { "output_name": "Kills", "operator": "/", "field_1": "kills", "field_2": "events" }, { "output_name": "Deaths", "operator": "/", "field_1": "deaths", "field_2": "events" }, { "output_name": "Assists", "operator": "/", "field_1": "assists", "field_2": "events" }, { "output_name": "Mvps", "operator": "/", "field_1": "mvps", "field_2": "events" } ], "filters": [] }
Now let’s make a more interesting widget: for example a flat widget with the real time positions of the players. In order to do this, we need two things:
- An image of the map on which to represent the points.
- Define, at least, 3 reference points to map the coordinates that the game reports to a position in the image.
The points can be obtained for each game map by simple experimentation, placing a player at several reference map points and looking up the coordinates within the game, in the events that are reported. The widget configuration would be the following:
{ "type": "WidgetPlaneLocation", "title": "Player location", "dashboard_id": 1, "row": 0, "col": 0, "size_x": 3, "size_y": 3, "range": "last_3_seconds", "granularity": "pt1s", "start_time": null, "end_time": null, "limit": 30000, "options": { "image": "https://s3-us-west-1.amazonaws.com/static.wizzie.io/devops/de_dust2_radar.jpg", "keep_ratio": true, "gps_markers": [ { "x": 80, "y": 14, "latitude": 3118, "longitude": -2093 }, { "x": 56, "y": 970, "latitude": -1032, "longitude": -2203 }, { "x": 922, "y": 34, "latitude": 3059, "longitude": 1561 } ] }, "datasource_name": "csgo", "dimensions": [ "coordinates", "name" ], "aggregators": [ { "aggregator": "events", "aggregator_name": "events", "filters": [] } ], "post_aggregators": [], "filters": [] }
To conclude, below you can see the configuration of the other two widgets that appear in the image, without going into additional explanations. I leave you as a challenge to understand how they work, with the help of the Wizz-vis documentation ;).
{ "type": "WidgetBar", "title": "Player Health", "dashboard_id": 1, "row": 3, "col": 0, "size_x": 12, "size_y": 3, "range": null, "granularity": "all", "start_time": false, "end_time": false, "limit": 10, "options": { "metrics": "Health" }, "datasource_name": "csgo", "dimensions": [ "name" ], "aggregators": [ { "aggregator": "health", "aggregator_name": "health", "filters": [] }, { "aggregator": "events", "aggregator_name": "events", "filters": [] } ], "post_aggregators": [ { "output_name": "Health", "operator": "/", "field_1": "health", "field_2": "events" } ], "filters": [ { "dimension_name": "type", "operator": "eq", "value": "player" } ] } { "type": "WidgetPlane", "title": "Death HeatMap", "dashboard_id": 1, "row": 0, "col": 3, "size_x": 3, "size_y": 3, "range": "last_10_minutes", "granularity": "all", "start_time": null, "end_time": null, "limit": 100000, "options": { "image": "https://s3-us-west-1.amazonaws.com/static.wizzie.io/devops/de_dust2_radar.jpg", "metrics": [ "players" ], "max_value": "max", "keep_ratio": true, "gps_markers": [ { "x": 80, "y": 14, "latitude": 3118, "longitude": -2093 }, { "x": 56, "y": 970, "latitude": -1032, "longitude": -2203 }, { "x": 922, "y": 34, "latitude": 3059, "longitude": 1561 } ] }, "datasource_name": "csgo", "dimensions": [ "coordinates" ], "aggregators": [ { "aggregator": "players", "aggregator_name": "players", "filters": [] } ], "post_aggregators": [], "filters": [ { "dimension_name": "state", "operator": "eq", "value": "dead" } ] }
Conclusions
With WCS (Wizzie Community Stack) we have been able to assemble a whole stack to analyze the metrics of a videogame without having to program anything, just through configuration. This is the great versatility and power of WCS, since, as it has been possible to configure it for this use case, it is possible to configure it for many others, using the same components and services that we have seen.
Besides, all this is only with the Community version. If we use the full WDP (Wizzie Data Platform), we can have not only this one use case, but several on the same platform, in the same cluster, as well as added advantages, such as:
- Authentication and access management.
- Configuring stream plans using API.
- Access to advanced streaming processing features.
- Scalability and high availability.
- Advanced support
And more features, which you can check out on the Wizzie website.
We hope you found this post interesting and we encourage you to come back often, because soon there will be new content on how to improve the analysis of Counter Strike games using enrichment, correlation …and much more.
Recent Comments