- Obtención de los datos del juego.
- Normalización de los datos (con Normalizer).
- Indexado de los datos (en Druid).
- Visualización (con Wizz-Vis).
Para resolver cada problema utilizaremos distintas herramientas del stack de WCS, como se muestra en el esquema, donde mostramos un resumen del flujo de procesamiento de los datos.
- Un equipo con, al menos, 16 GB de RAM y Ubuntu 16.04+ o Centos 7.
- WCS instalado (puedes consultar este post)
- Otro equipo con el juego Counter Strike: Global Offensive (tiene versión gratuita).
Obtención de los datos del juego
Para obtener datos de una partida de Counter Strike: Global Offensiveaprovecharemos una funcionalidad que permite enviar información sobre el estado de la partida mediante mensajes HTTP POST con JSON. Puedes obtener más información sobre esta funcionalidad en la documentación del juego.
Dado que la forma de enviar los datos es a mediante el protocolo HTTP, necesitamos un componente que sea capaz de recibir estas peticiones y enviarlas a un topic de Kafka. Para ello podemos utilizar el servicio n2kafka. La forma más sencilla es levantar un docker en el mismo equipo que en el que se ha desplegado WCS, utilizando el siguiente comando:
docker run --restart always -d --net=host -e KAFKA_BROKERS=localhost --name=n2kafka wizzieio/n2kafka:2.1.2
Este servicio escucha el protocolo HTTP en el puerto puerto 7980. Al enviar mensajes POST con JSON al path /v1/data/<topic>, enviará ese mensaje JSON al topic especificado en <topic>.
Para que esto funcione correctamente, primero deberemos crear el topic en el que queremos recibir los datos. Concretamente crearemos el topic csgo_input, utilizando el siguiente comando:
wcs kafka topics --create --topic csgo_input --replication-factor 1 --partitions 1
Una vez preparado el servicio y el topic, podemos activar la funcionalidad de envío de eventos del juego, añadiendo el siguiente archivo de configuración al directorio de configuración de Counter Strike:
"Console Sample v.1" { "uri" "http://:7980/v1/data/csgo_input" "timeout" "5.0" "buffer" "0.5" "throttle" "0.5" "heartbeat" "60.0" "data" { "provider" "1" "bomb" "1" "map" "1" "round" "1" "allplayers_id" "1" "allplayers_state" "1" "allplayers_match_stats" "1" "allplayers_weapons" "1" "allplayers_position" "1" "phase_countdowns" "1" "allgrenades" "1" } }
En el parámetro uri se debe sustituir <WCS-IP> por la IP del equipo donde se ejecuta WCS. El resto de parámetros se pueden dejar igual que en el ejemplo.
Si iniciamos el juego, deberíamos empezar a recibir mensajes JSON en el topic csgo_input. Para comprobarlo podemos utilizar el comando:
wcs kafka consume --topic csgo_input
Normalización de los datos
Una vez completado el apartado de adquisición de datos, tendremos los eventos que envía el juego en el topic csgo_input por lo que ya podremos empezar a trabajar con ellos. El juego envía los datos que nos interesan cuando activamos el modo espectador en una partida. Como ejemplo, pondremos a varios bots a pelearse entre ellos en el mítico mapa Dust II.
Cuando empieza la partida, comenzamos a recibir mensajes como este cada segundo:
{ "provider": { "name": "Counter-Strike: Global Offensive", "appid": 730, "version": 13656, "steamid": "76561198157701279", "timestamp": 1540034617 }, "bomb": { "state": "carried", "position": "1073.00, 2361.19, 126.67", "player": 76561197960265736 }, "map": { "mode": "casual", "name": "de_dust2", "phase": "live", "round": 1, "team_ct": { "score": 0, "timeouts_remaining": 1, "matches_won_this_series": 0 }, "team_t": { "score": 1, "timeouts_remaining": 1, "matches_won_this_series": 0 }, "num_matches_to_win_series": 0, "current_spectators": 0, "souvenirs_total": 0 }, "round": { "phase": "live" }, "allplayers": { "2": { "name": "Yogi", "observer_slot": 6, "team": "T", "state": { "health": 0, "armor": 0, "helmet": false, "flashed": 0, "burning": 0, "money": 900, "round_kills": 0, "round_killhs": 0, "round_totaldmg": 0, "equip_value": 4150 }, "match_stats": { "kills": 0, "assists": 0, "deaths": 1, "mvps": 0, "score": 1 }, "weapons": { }, "position": "-410.94, -163.14, -0.74", "forward": "-0.09, 1.00, -0.03" }, "2": { ... }, }, "phase_countdowns": { "phase": "live", "phase_ends_in": "72.4" }, "grenades": {}, "previously": { ... } }
Nuestro objetivo es obtener una serie de dimensiones y métricas que pueda entender la base de datos Druid. En concreto, Druid necesita que el JSON sólo tenga un único nivel de anidamiento, y que sólo haya parejas clave-valor, sin arrays ni objetos. Para lograr eso, procesaremos el mensaje utilizando el componente Normalizer.
En principio queremos conseguir la siguiente información:
- Timestamp de cada evento (imprescindible para su indexado en Druid).
- Estado de los jugadores (vida, armadura, etc).
- Position of the players.
- Información sobre el mapa (nombre, ronda, modo de juego, etc).
- Estadísticas de los jugadores (eliminaciones, muertes, mvps, etc).
Con estos requisitos ya podemos comenzar a elaborar el stream-plan del normalizador. Vamos a generar un mensaje con información sobre cada jugador y sobre la bomba, por lo que habrá un flujo de procesamiento para cada elemento. Como resultado queremos obtener para cada jugador un mensaje con esta forma:
{ "kills": 0, "round_killhs": 0, "type": "player", "mode": "casual", "state": "alive", "score": 0, "player_id": "76561197960265733", "assists": 0, "flashed": 0, "map": "de_dust2", "deaths": 0, "timestamp": 1540224174, "mvps": 0, "phase": "live", "equip_value": 1200, "health": 93, "team": "T", "steamid": "111111111111", "burning": 0, "position_x": -422.78, "armor": 100, "position_y": 235.91, "round": 0, "money": 1000, "position_z": -0.55, "name": "Toby", "round_kills": 0, "round_totaldmg": 0 }
Y para la bomba otro mensaje con esta forma:
{ "phase": "live", "bomb_state": "carried", "latlong": "-562.92,-220.67", "type": "bomb", "steamid": "111111111111", "mode": "casual", "position_x": -220.67, "player_id": 76561197960265732, "position_y": -562.92, "round": 0, "position_z": 0.26, "name": "Bomb", "map": "de_dust2", "timestamp": 1540224174 }
Básicamente, procesamos cada mensaje de tal forma que se obtengan mensajes diferenciados para cada jugador y para la bomba en la salida. En la rama de los jugadores, filtramos los mensajes para quedarnos sólo aquellos que tengan la información sobre los jugadores y después creamos un mensaje por cada jugador (en lugar de uno con todos los jugadores). Posteriormente, seleccionamos las métricas que queremos conservar y las colocamos todas en el primer nivel del JSON.
Con la información sobre la bomba se realiza un procesamiento similar. Por último adaptamos el campo de posición, que es un string con las coordenadas separadas por comas, para obtener cada dimensión con una clave distinta y lo convertimos a número.
El stream plan que hace esto lo adjuntamos a continuación:
{ "inputs": { "csgo_input": [ "players", "bomb"] }, "streams": { "bomb": { "funcs" : [ { "name": "ContainsDimensionFilter", "className": "io.wizzie.normalizer.funcs.impl.ContainsDimensionFilter", "properties": { "dimensions": ["bomb"] } },{ "name":"SimpleMapper", "className":"io.wizzie.normalizer.funcs.impl.SimpleMapper", "properties": { "maps": [ {"dimPath":["provider","timestamp"], "as":"timestamp"}, {"dimPath":["provider","steamid"], "as":"steamid"}, {"dimPath":["map","name"], "as":"map"}, {"dimPath":["map","round"], "as":"round"}, {"dimPath":["map","mode"], "as":"mode"}, {"dimPath":["map","phase"], "as":"phase"}, {"dimPath":["bomb", "player"], "as":"player_id"}, {"dimPath":["bomb", "state"], "as":"bomb_state"}, {"dimPath":["bomb", "position"], "as": "position"} ] } },{ "name": "TypeIdentification", "className": "io.wizzie.normalizer.funcs.impl.FieldMapper", "properties": { "dimensions": [ { "dimension": "type", "value": "bomb", "overwrite": true } ] } } ], "sinks":[ {"topic":"common", "type":"stream"} ] }, "players" : { "funcs" : [ { "name": "ContainsDimensionFilter", "className": "io.wizzie.normalizer.funcs.impl.ContainsDimensionFilter", "properties": { "dimensions": ["allplayers"] } }, { "name":"FromMapToArray", "className":"io.wizzie.normalizer.funcs.impl.MapFlattenMapper", "properties" : { "flat_dimension": "allplayers", "key_dimension": "player_id", "output_dimension": "allplayers" } },{ "name":"MessagesPerPlayer", "className":"io.wizzie.normalizer.funcs.impl.ArrayFlattenMapper", "properties": { "flat_dimension": "allplayers" } },{ "name":"SimpleMapper", "className":"io.wizzie.normalizer.funcs.impl.SimpleMapper", "properties": { "maps": [ {"dimPath":["provider","timestamp"], "as":"timestamp"}, {"dimPath":["provider","steamid"], "as":"steamid"}, {"dimPath":["map","name"], "as":"map" }, {"dimPath":["map","round"], "as":"round" }, {"dimPath":["map","mode"], "as":"mode" }, {"dimPath":["map","phase"], "as":"phase" }, {"dimPath":["name"]}, {"dimPath":["player_id"]}, {"dimPath":["team"], "as":"team_id"}, {"dimPath":["team"], "as":"team"}, {"dimPath":["state","health"]}, {"dimPath":["state","armor"]}, {"dimPath":["state","helmet"]}, {"dimPath":["state","flashed"]}, {"dimPath":["state","burning"]}, {"dimPath":["state","money"]}, {"dimPath":["state","round_kills"]}, {"dimPath":["state","round_killhs"]}, {"dimPath":["state","round_totaldmg"]}, {"dimPath":["state","equip_value"]}, {"dimPath":["match_stats","kills"]}, {"dimPath":["match_stats","assists"]}, {"dimPath":["match_stats","deaths"]}, {"dimPath":["match_stats","mvps"]}, {"dimPath":["match_stats","score"]}, {"dimPath":["position"]} ] } },{ "name": "TypeIdentification", "className": "io.wizzie.normalizer.funcs.impl.FieldMapper", "properties": { "dimensions": [ { "dimension": "type", "value": "player", "overwrite": true } ] } }, { "name":"StateClassification", "className":"io.wizzie.normalizer.funcs.impl.ClassificationMapper", "properties": { "dimension": "health", "new_dimension": "state", "classification": ["dead", "alive"], "intervals": [0], "unknown_value": -1 } } ], "sinks":[ {"topic":"common", "type":"stream"} ] }, "common": { "funcs": [ { "name":"PositionSplitter", "className":"io.wizzie.normalizer.funcs.impl.StringSplitterMapper", "properties": { "dimension": "position", "delete_dimension": true, "delimitier": ", ", "fields": ["position_x", "position_y", "position_z"] } }, { "name":"TypeConverter", "className":"io.wizzie.normalizer.funcs.impl.FieldTypeConverterMapper", "properties": { "conversions": [ { "dimension": "position_x", "from": "string", "to": "number" }, { "dimension": "position_y", "from": "string", "to": "number" }, { "dimension": "position_z", "from": "string", "to": "number" } ] } } ], "sinks": [ {"topic":"csgo_norm", "type":"kafka", "partitionBy": "steamid"} ] } } }
Para aplicar este stream-plan tenemos que copiar su contenido al fichero <WCS-installation-dir>/etc/wcs/normalizer-stream-plan.json donde sustituimos WCS-installation-dir por el directorio de instalación de WCS. Por defecto es /usr/local/etc/wcs/normalizer-stream-plan.json. Una vez copiado, reiniciamos el Normalizer para aplicar los cambios:
wcs stop normalizer wcs start normalizer
De nuevo, podemos comprobar si está funcionando consultando el topic de kafka en el que escribe el Normalizer.
wcs kafka consume --topic csgo_norm
Indexación de los datos
Una vez normalizados y aplanados los mensajes, para que puedan ser procesados por Druid, podemos proceder a su indexación. En WCS esto se hace configurando un supervisor de Druid, en el que se declaran las dimensiones y agregaciones que Druid deberá indexar.
Dado que el normalizador escribe los mensajes procesados en el topic csgo_norm, se configurará el supervisor para que las tareas de indexación lean de ese topic. El spec del supervisor es el siguiente:
{ "type": "kafka", "dataSchema": { "dataSource": "csgo", "parser": { "type": "string", "parseSpec": { "format": "json", "timestampSpec": { "column": "timestamp", "format": "ruby" }, "dimensionsSpec": { "dimensions": [ "player_id", "type", "name", "team", "round", "mode", "state", "bomb_state", "phase" ], "dimensionExclusions": [], "spatialDimensions": [ { "dimName": "coordinates", "dims": [ "position_y", "position_x" ] } ] } } }, "metricsSpec": [ { "type": "count", "name": "events" }, { "type": "hyperUnique", "name": "players", "fieldName": "name", "isInputHyperUnique": false, "round": true }, { "name": "health", "type": "doubleSum", "fieldName": "health" }, { "name": "armor", "type": "doubleSum", "fieldName": "armor" }, { "name": "helmet", "type": "doubleSum", "fieldName": "helmet" }, { "name": "flashed", "type": "doubleSum", "fieldName": "flashed" }, { "name": "burning", "type": "doubleSum", "fieldName": "burning" }, { "name": "money", "type": "doubleSum", "fieldName": "money" }, { "name": "round_kills", "type": "doubleSum", "fieldName": "round_kills" }, { "name": "round_killhs", "type": "doubleSum", "fieldName": "round_killhs" }, { "name": "round_totaldmg", "type": "doubleSum", "fieldName": "round_totaldmg" }, { "name": "equip_value", "type": "doubleSum", "fieldName": "equip_value" }, { "name": "kills", "type": "doubleSum", "fieldName": "kills" }, { "name": "assists", "type": "doubleSum", "fieldName": "assists" }, { "name": "deaths", "type": "doubleSum", "fieldName": "deaths" }, { "name": "mvps", "type": "doubleSum", "fieldName": "mvps" }, { "name": "score", "type": "doubleSum", "fieldName": "score" }, { "name": "position_z", "type": "doubleSum", "fieldName": "height" } ], "granularitySpec" : { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "SECOND" } }, "ioConfig": { "topic": "csgo_norm", "consumerProperties": { "bootstrap.servers": "kafka:9092" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H" } }
Para aplicar este spec tenemos que hacer un POST al overlord de druid. Se puede hacer con el siguiente comando en el propio equipo donde se ejecuta WCS, teniendo el spec en un archivo llamado index.json:
curl -X POST -H 'Content-Type: application/json' -d @index.json http://localhost:8084/druid/indexer/v1/supervisor
Por simplicidad, hemos comenzado utilizando agregaciones de tipo “DoubleSum”, que básicamente suma los valores de las métricas en cada tiempo de granularidad. Es importante destacar que vamos a utilizar una granularidad de segundo, ya que estamos obteniendo analíticas en tiempo real de un juego y queremos obtener la información de este tan pronto como sea posible.
Visualización
Llegados a este punto tenemos todo el pipeline de procesamiento de los datos hasta tenerlos en la base de datos Druid. Con esto ya podemos comenzar a hacer queries a Druid desde Wizz-vis para elaborar dashboards y visualizar la información.
Vamos a comenzar creando el dashboard como tal en Wizz-vis. Si bien se puede hacer directamente desde la interfaz de Wizz-vis, en este caso vamos a utilizar la API. Está documentada usando Swagger, accesible mediante el path /swagger-ui.
{ "name": "CS GO", "theme": "light", "interval": 1, "locked": false, "widgets": [] }
Hemos especificado un intervalo de refresco interval de 1 segundo para lograr una experiencia en tiempo real.
Una vez que tenemos el dashboard, podemos comenzar a crear widgets. Vamos a empezar con el más sencillo: una tabla con estadísticas de cada jugador. En cada una de las filas de la tabla representaremos los valores de las métricas asociadas a un valor de la dimensión “name” (nombre del jugador). Como métricas elegiremos por ejemplo las siguientes:
- kills
- deaths
- assists
- mvps
Dado que la agregación que tenemos configurada para estas métricas es “DoubleSum”, habrá que hacer una post-agregación dividiendo entre el número de eventos para poder tener el valor medio de las distintas mediciones, y no la suma. Además, haremos esto en ventanas de 3 segundos para tener cierta garantía de que hayan llegado eventos en ese intervalo.
A continuación se muestra la configuración del widget en JSON. Para entender los detalles de su configuración es conveniente echar un vistazo a la documentación de Wizz-vis..
{ "type": "WidgetTable", "title": "Player statistics", "dashboard_id": 1, "row": 0, "col": 6, "size_x": 6, "size_y": 3, "range": "last_3_seconds", "granularity": "PT1S", "start_time": null, "end_time": null, "limit": 10, "options": { "metrics": [ "Kills", "Assists", "Deaths", "Mvps" ] }, "datasource_name": "csgo", "dimensions": [ "name" ], "aggregators": [ { "aggregator": "kills", "aggregator_name": "kills", "filters": [] }, { "aggregator": "deaths", "aggregator_name": "deaths", "filters": [] }, { "aggregator": "assists", "aggregator_name": "assists", "filters": [] }, { "aggregator": "mvps", "aggregator_name": "mvps", "filters": [] }, { "aggregator": "events", "aggregator_name": "events", "filters": [] } ], "post_aggregators": [ { "output_name": "Kills", "operator": "/", "field_1": "kills", "field_2": "events" }, { "output_name": "Deaths", "operator": "/", "field_1": "deaths", "field_2": "events" }, { "output_name": "Assists", "operator": "/", "field_1": "assists", "field_2": "events" }, { "output_name": "Mvps", "operator": "/", "field_1": "mvps", "field_2": "events" } ], "filters": [] }
Ahora vamos a hacer un widget más interesante: por ejemplo un widget de plano con la posición de los jugadores en tiempo real. Para poder hacer esto necesitamos dos cosas:
- Una imagen del plano sobre el que representar los puntos.
- Establecer, al menos, 3 puntos de referencia para poder mapear las coordenadas que reporta el juego a una posición en la imagen.
Los puntos se pueden obtener para cada mapa del juego por simple experimentación, colocando a un jugador en puntos de referencia del mapa y viendo las coordenadas dentro del juego en los eventos que se reportan. La configuración del widget sería la siguiente:
{ "type": "WidgetPlaneLocation", "title": "Player location", "dashboard_id": 1, "row": 0, "col": 0, "size_x": 3, "size_y": 3, "range": "last_3_seconds", "granularity": "pt1s", "start_time": null, "end_time": null, "limit": 30000, "options": { "image": "https://s3-us-west-1.amazonaws.com/static.wizzie.io/devops/de_dust2_radar.jpg", "keep_ratio": true, "gps_markers": [ { "x": 80, "y": 14, "latitude": 3118, "longitude": -2093 }, { "x": 56, "y": 970, "latitude": -1032, "longitude": -2203 }, { "x": 922, "y": 34, "latitude": 3059, "longitude": 1561 } ] }, "datasource_name": "csgo", "dimensions": [ "coordinates", "name" ], "aggregators": [ { "aggregator": "events", "aggregator_name": "events", "filters": [] } ], "post_aggregators": [], "filters": [] }
{ "type": "WidgetBar", "title": "Player Health", "dashboard_id": 1, "row": 3, "col": 0, "size_x": 12, "size_y": 3, "range": null, "granularity": "all", "start_time": false, "end_time": false, "limit": 10, "options": { "metrics": "Health" }, "datasource_name": "csgo", "dimensions": [ "name" ], "aggregators": [ { "aggregator": "health", "aggregator_name": "health", "filters": [] }, { "aggregator": "events", "aggregator_name": "events", "filters": [] } ], "post_aggregators": [ { "output_name": "Health", "operator": "/", "field_1": "health", "field_2": "events" } ], "filters": [ { "dimension_name": "type", "operator": "eq", "value": "player" } ] } { "type": "WidgetPlane", "title": "Death HeatMap", "dashboard_id": 1, "row": 0, "col": 3, "size_x": 3, "size_y": 3, "range": "last_10_minutes", "granularity": "all", "start_time": null, "end_time": null, "limit": 100000, "options": { "image": "https://s3-us-west-1.amazonaws.com/static.wizzie.io/devops/de_dust2_radar.jpg", "metrics": [ "players" ], "max_value": "max", "keep_ratio": true, "gps_markers": [ { "x": 80, "y": 14, "latitude": 3118, "longitude": -2093 }, { "x": 56, "y": 970, "latitude": -1032, "longitude": -2203 }, { "x": 922, "y": 34, "latitude": 3059, "longitude": 1561 } ] }, "datasource_name": "csgo", "dimensions": [ "coordinates" ], "aggregators": [ { "aggregator": "players", "aggregator_name": "players", "filters": [] } ], "post_aggregators": [], "filters": [ { "dimension_name": "state", "operator": "eq", "value": "dead" } ] }
Conclusiones
Con WCS (Wizzie Community Stack) hemos podido montar todo un stack para análisis de métricas de un videojuego sin necesidad de programar nada, únicamente mediante configuración. Esto le da una gran versatilidad y potencia a WCS, ya que, al igual que se ha podido configurar para este caso de uso, es posible configurarlo para muchos otros, utilizando los mismos componentes y servicios que hemos visto.
Además, todo esto es sólo con la versión Community. Si utilizamos WDP (Wizzie Data Platform) , podemos tener no sólo este caso de uso, sino varios en la misma plataforma, en un mismo clúster, además de ventajas añadidas, como por ejemplo:
- Autenticación y gestión de acceso.
- Configuración de stream plans mediante API.
- Acceso a funcionalidades avanzadas de procesamiento en streaming.
- Escalabilidad y alta disponibilidad.
- Soporte avanzado.
Y más funcionalidades, que puedes consultar en la web de Wizzie.
Esperamos que os haya resultado interesante este post y os animamos a que estéis atentos, porque próximamente habrá nuevo contenido mejorando el análisis de las partidas del Counter Strike utilizando enriquecimiento, correlación… y mucho más.
Recent Comments