TI AWR1642 ミリ波センサの入門


TechForum Digikey Employee

作成者:Taylor Roorda、最終更新日:2019年10月1日

イントロダクション


Texas Instrumentsの XWR1642 は、シングルパッケージの統合型ミリ波レーダシステムです。このデバイスには、76~81GHzのFMCW(周波数変調連続波)信号を送受信するためのすべてのRF回路が搭載されており、TI C674x DSPとARM Cortex-R4Fの両方が、それぞれすべてのRF信号処理とアプリケーションコードを処理します。この高周波数帯域は、他のレーダ装置と比較して多くの利点を提供します。例えば、ミリメートルレベルの精度と、プラスチックや衣服のような物体や雨や霧のような妨害要素を「透過する」能力などです。このICは、TIからオートモーティブバージョン(AWR)とインダストリアルバージョン(IWR)の2種類が発売されており、AWRバージョンは車載認証とCANインターフェースを備えています。このチュートリアルは、AWRバージョンのセンサを使用して作成されていますが、IWRバージョンともほぼ同じですので、完全に互換性があります。

このガイドでは、TIがmmWave Toolboxで提供するラボをベースにして、 AWR1642 で開発を開始するために必要な手順を説明します。現時点ではシミュレーションですが、対象アプリケーションは自律型ロボットのナビゲーションシステムです。Automotive ToolboxにあるTIの短距離レーダ(SRR)ラボは、すべてのレーダ構成と処理コードがすでに含まれているため、ベースファームウェアとして使用されます。

リソース

このプロジェクトは、 AWR1642BOOST 開発キットとCode Composer Studio v9.1を使用して開発されました。Code Composerには追加のソフトウェア要件が多数ありますが、これについては次のセクションで説明します。

前提条件の設定

プロジェクトを始める前に、セットアップやダウンロードするものがたくさんあります。このガイドでは、Automotive Toolboxのプロジェクトを使用するため、最初のステップはそれをダウンロードしてドキュメントを入手することです。各プロジェクト例の事前要件はすべてこの中に含まれています。

  1. Code ComposerでView → Resource Explorerを選択します。
  2. Resource Explorerタブ内でSoftware → mmWave Sensorsを選択します。
  3. Automotive Toolboxをダブルクリックし、右端にあるインストールボタンを選択します。

:white_check_mark: オートモーティブ(AWR)キットをお持ちの場合でも、Industrial Toolboxのラボはご利用いただけます。より多くの例については、両方のツールボックスをダウンロードしてください。ただし、 AWR1642BOOST-ODS キットはアンテナ構造が異なるため、AWR ODSデモでのみ動作します。

  1. Toolboxのダウンロードが完了したら、Obstacle Detectionラボを開きます。

  2. 2つのCCS Project(1つはDSP用、もう1つはCortex用)をそれぞれダブルクリックし、右上のImport to IDEボタンをクリックします。

  3. デフォルトでは、ToolboxファイルはC:/TI/mmwave_automotive_toolbox_にインストールされます。そこに移動し、labs/lab0002_short_range_radar/docs/AutoSrr_usersguide.htmllを開きます。このマニュアルには、プロジェクトとそれを構築するためのすべての要件が記載されています。

  4. Software Requirements and Firmware Requirementsからすべてをダウンロードしてください。このチュートリアルでは MATLAB Runtimeは必要ありませんが、TIのGUIを実行する場合は必要です。様々なソフトウェアパッケージの最新バージョンは、特にmmWave SDKと互換性があるとは限らないため、記載されているバージョンに注意してください。このチュートリアルで使用する重要なパッケージとバージョンの簡単な要約を以下に示します。CCSとmmWave SDKと一緒にインストールされる他のパッケージは、正しいバージョンである必要があります。

パッケージ バージョン
mmWave Automotive Toolbox 2.5.0
mmWave SDK 2.0.0.04
SYS/BIOS 6.53.2.00
TI DSP Compiler 8.1.3
TI ARM Compiler* 18.12.1.LTS

:exclamation: このバージョンのARMコンパイラは、指定されたバージョンの16.9.6.LTSとは異なりますが、それでもサンプルでは動作しました。古いコンパイラをダウンロードできる場所は見つかりませんでした。

  1. プロジェクトを右クリックし、Properties → Generalを選択して、プロジェクトがすべての正しいバージョンのソフトウェアパッケージを使用していることを確認してください。最初のタブにコンパイラのバージョンが表示されます。Productsタブをクリックすると、使用されている追加パッケージが表示されます。

  2. この時点で、右クリックしてRebuild Projectを選択すれば、両方のプロジェクトをビルドできるはずです。

デバッグのセットアップ

AWR1642でデバッグを行うには、いくつかの特別な手順が必要ですが、ラボのユーザーガイドに記載されて いますので、ここでは詳しく説明しません。セクション4(Execute the Lab)を参照し、Expand for help with Debug modeボタンをクリックして、Code Composer Studioによるプログラミングとデバッグを有効にしてください。

:warning: デバッグを有効にするプロセスでは、デバイス上でバイナリを書き込む必要があるため、CCSを使ってデバイスをリセットしないように注意してください。デバッグ中にCCSでデバイスをリセット/再プログラムしようとすると、前述の手順で書き 込んだデバッグバイナリが上書きされます。この場合、プロセッサは起動時にクラッシュし、デバッグウィンドウにフォルトが表示されます。これを修正するには、上記のバイナリを再書き込みして、デバッグモードに戻してください。

問題を避けるため、私は以下の方法でデバッグ中にデバイスをリセットしています。

  1. Debugウィンドウで、Group 1を右クリックし、Disconnect Targetを選択します。
  2. ターゲットボードのリセットボタンを押します。これにより、デバッグプログラムが再実行されます。
  3. Debugウィンドウで、Group 1を右クリックし、 Connect Targetを選択します。
  4. プログラムをロードし、通常通りデバッグします。

サンプルコードの解析

2つのサンプルプロジェクトには、ふるいにかけるべきコードがたくさんあります。広範なコメントやドキュメントを含めると、両プロジェクトのいくつかの主要ファイルの間には、約1万行のコードがあります。このセクションでは、デモの主要機能のうち、最も注目すべきセクションの概要を、プロセッサ、コアごとにまとめます。プロジェクトのより完全な説明については、srr demo_16xx_mss/mss_main.cの先頭にあるコメントブロックも参照してください。

この例ではShort Range Radarラボを使用していますが、mmWave Automotive and Industrial Toolboxesに含まれる他の多くのラボも同様の構成になっています。以下の情報の一部は、それらに当てはまるかもしれません。

MSS(ARM)

ARMコアは、このデモでは主に次の3つの役割を担っています。ホストからCLIを実行して設定を行い、レーダ結果をシリアルポートに書き出し、後で追加するロボットナビゲーションアプリケーションを実行します。

コマンドラインインターフェース

CLIの初期化は、mss_srr_cli.cファイルのSRR_MSS_CLIInit()で定義されています。ここで定義されているコマンドのほとんどは、コンフィギュレーション設定をDSPに渡すために使用されます。

アプリケーションにカスタムコマンドを使いたい場合は、ここに追加することができます。各エントリには、コマンド文字列、ヘルプ文字列、ハンドラ関数が必要です。以下は、ナビゲーションコードの移動先を更新する、このアプリケーションのエントリ例です。

cli.c - スニペット例

/* Custom CLI command */
cliCfg.tableEntry[cnt].cmd            = "navSetDestination";
cliCfg.tableEntry[cnt].helpString     = "<x> <y>";
cliCfg.tableEntry[cnt].cmdHandlerFxn  = navigation_CLISetDestination;
cnt++;

ログ結果

UARTへのレーダ結果の書き込みは、mss_main.c.MmwDemo_mboxReadTask()で行われます。この関数は、2つのプロセッサ間でメモリ共有されているメールボックスをチェックします。メッセージにオブジェクト(物体)検出結果が含まれている場合は、UART経由で直接送信されます。これらのパケットのフォーマットはラボのユーザーズガイドに記載されているので、ここでは改めて説明しません。チュートリアルの後半で、これらのパケットを使用します。

ナビゲーションアルゴリズムの実行

最後のステップのMmwDemo_mboxReadTask()関数は、ARMがDSPからレーダデータを取得する場所であるため、レーダデータを使用するアプリケーションコードを追加する場所としても最適です。検出されたオブジェクトデータは、構造体のmessage.body要素にあります。

mss_main.c - スニペット

/* Process the received message: */
switch (message.type)
{
    case MMWDEMO_DSS2MSS_DETOBJ_READY:
        /* Use radar data to update and recalculate mapping data */
        updateMapFromRadar(&message.body.detObj);
 
        /* Modify/hijack message to append path data */
        appendPathTLV(&message.body.detObj);
 
        /* ... */
}

DSS(C674x)

ご想像のとおり、DSPの仕事は計算を行うことです。 dss_data_path.cには、レーダ処理コードの大部分が含まれています。MmwDemo_interFrameProcessingは主要な処理関数であり、2番目のレーダFFT、CFAR、追跡、およびその他の処理が含まれています。

dss_main.cファイルに戻ると、MmwDemo_dssDataPathProcessEvents()がメインイベントハンドラで、データパス関数を呼び出して処理を行います。MmwDemo_dssSendProcessOutputToMSS()は、レーダの結果がすべて共有メールボックスにコピーされる関数です。結果のメッセージは、上記のMmwDemo_mboxReadTask()関数で示したMMWDEMO_DSS2MSS_DETOBJ_READYケースをトリガします。

DSPにはこのチュートリアルで必要なオブジェクト検出処理がセットアップされているので、他に追加したり変更したりすることはありません。

ナビゲーション

障害物検出がTIのファームウェアによって処理されるようになったので、今度はそのデータで何か有用なことをしましょう。このチュートリアルでは、仮想のロボットが障害物を回避するための最短経路を探索することを目標とします。探索の実行にDijkstraのアルゴリズムが使われます。

まず、ロボットの特性をいくつか定義する必要があります。

  • TIのデフォルトSRRレーダ設定では、最大探知距離は約80m、視野角は120度です。しかし、これはこの用途に必要な距離よりはるかに長いため、20mのUSRR(超短距離レーダ)設定を使用します。
  • ロボットがナビゲーションのために1mのグリッドを使うと仮定します。実際のロボットの大きさはもっと小さくてもよいですが、1mであれば余裕を持たせることができます(また、ステップが1であれば物事を簡単に進めることができます)。
  • グリッド間の移動は、直交移動と斜め移動の両方が認められています。
    • 前進も後退も可能ですが、レーダが「見ている」のは前方のみです。

この情報を使って、ナビゲーションのコードを作り始めることができます。これは3つのレイヤで構成されます。それはグラフ、マップ、インターフェースレイヤです。プロジェクトのソースファイルをダウンロードしていれば、これらのファイルはすでにnavigationディレクトリの下に含まれています。

:white_check_mark: 通常、この種のアルゴリズムはmallo()を使用してグラフノードを動的に割り当てますが、これはメモリがあまり豊富ではない組み込みシステムで実行されているため、また動的メモリ割り当てに関するその他の問題があるため、静的実装が使用されます。

グラフレイヤ(Graph Layer)

グラフレベルはナビゲーションのアプリケーションの最下位レベルです。グラフはノードとノード間の接続で構成されます。この場合、ロボットは隣接するどのグリッド空間にも移動できるため、各ノードはすべての隣接ノードと接続されています。例えば、3x3のグラフは次のようになります。

graph_diagram

このレベルでは、グラフには座標に関する情報がありません。あるのは、ノードの配列があり、それらがどのように互いに接続されているかだけです。

graph.h

#ifndef GRAPH_H_
#define GRAPH_H_
 
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <limits.h>
 
#define MAX_GRAPH_SIZE      250
#define MAX_EDGES_PER_NODE  8
#define UNINITIALZED        UINT_MAX
 
typedef enum node_pos {MID_NODE, LEFT_NODE, RIGHT_NODE, BOTTOM_NODE} NodePos;
 
typedef struct Graph {
  unsigned int rows;
  unsigned int cols;
  unsigned int size;
  unsigned int edges[MAX_GRAPH_SIZE][MAX_EDGES_PER_NODE];
} graph_t;
 
void createGraph(graph_t *graph, unsigned int rows, unsigned int cols);
void createEdge(graph_t *graph, int src, int dest);
NodePos checkNode(int rows, int cols, unsigned int index);
 
#endif //GRAPH_H_

graph.c

#include "graph.h"
 
void createGraph(graph_t *graph, unsigned int rows, unsigned int cols)
{
  unsigned int i, j = 0;
  unsigned int n;
  unsigned int c;
  NodePos current_node_pos;
 
  // Save dimensions
  graph->rows = rows;
  graph->cols = cols;
  n = graph->rows * graph->cols;
  graph->size = n;
 
  // Initialize edges for each node
  for(i=0; i < MAX_GRAPH_SIZE; i++)
  {
    for(j = 0; j < MAX_EDGES_PER_NODE; j++)
    {
      graph->edges[i][j] = UNINITIALZED;
    }
  }
 
  // Column offset used below
  c = graph->cols;
 
  // Create edges between nodes
  for(i=0; i < n; i++)
  {
    current_node_pos = checkNode(graph->rows, graph->cols, i);
    switch(current_node_pos)
    {
      case MID_NODE:
        // Contains 8 edges
        createEdge(graph, i, i-1);
        createEdge(graph, i, i+1);
        createEdge(graph, i, i+c);
        createEdge(graph, i, (i+c)-1);
        createEdge(graph, i, (i+c)+1);
        createEdge(graph, i, i-c);
        createEdge(graph, i, (i-c)-1);
        createEdge(graph, i, (i-c)+1);
        break;
      case LEFT_NODE:
        // Contains 5 edges
        createEdge(graph, i, i+1);
        createEdge(graph, i, i+c);
        createEdge(graph, i, (i+c)+1);
        createEdge(graph, i, i-c);
        createEdge(graph, i, (i-c)+1);
        break;
      case RIGHT_NODE:
        // Contains 5 edges
        createEdge(graph, i, i-1);
        createEdge(graph, i, i+c);
        createEdge(graph, i, (i+c)-1);
        createEdge(graph, i, i-c);
        createEdge(graph, i, (i-c)-1);
        break;
    }
  }
}
 
void createEdge(graph_t *graph, int src, int dest)
{
  unsigned int idx = 0;
 
  // Ignore requested edge if it is out of bounds
  if(dest < 0 || dest >= graph->size)
  {
    return;
  }
 
  // Get to the first unused entry in the array
  while(graph->edges[src][idx] != UNINITIALZED && idx < MAX_EDGES_PER_NODE)
  {
    idx++;
  }
 
  graph->edges[src][idx] = dest;
}
 
NodePos checkNode(int rows, int cols, unsigned int index)
{
 
  if(index % cols == 0)
  {
    return LEFT_NODE;
  }
  else if(index % cols == cols-1)
  {
    return RIGHT_NODE;
  }
  else
  {
    return MID_NODE;
  }
 
}

マップレイヤ(Map Layer)

マップレイヤはグラフレイヤの上に構築され、(-1, 3)のような実世界の座標を基礎となるグラフノードにマッピングします。グラフノードが接続以外の情報を含まないのに対し、マップレイヤのノードはx,y座標とその位置の障害物の状態も含みます。外側のマップ構造には、現在の目的地と、その目的地に対して最後に計算された最短経路も含まれます。

したがって、マップ レイヤにおける上記の3x3の例は、赤いノードが遮られ、緑のノードがクリアになっている下の図のようになります。

image

ロボットが(0, 0)から(0, 2)に移動しようとしたとき、(0, 1)が妨げられていることがわかれば、Dijkstraのアルゴリズムが実行され、(0, 0)→(1, 1)→(0, 2)という経路を取ります。

map.h

#ifndef MAP_H_
#define MAP_H_
 
#include <stdio.h>
#include <stdbool.h>
#include <stdint.h>
#include <limits.h>
 
#include "graph.h"
 
// Map builds on the underlying graph by mapping real coordinates to the nodes
#define MAX_MAP_SIZE  MAX_GRAPH_SIZE
#define INF           UINT_MAX
#define UNDEFINED     UINT_MAX
 
typedef struct MapNode {
  // Map node contains real coordinates
  int n;
  int x;
  int y;
  bool traversable;             // Permanent condition ex: walls
  bool obstructed;              // Temporary condition ex: person walking by
  unsigned int *graph_node;     // Link to the associated graph node list
} map_node_t;
 
typedef struct PathNode {
  // Points to a specific node of the map
  map_node_t *node;
  // Points to the next node in the path
  struct PathNode *next;
} path_t;
 
typedef struct Map {
  // Coordinate system
  int left_bound;
  int right_bound;
  int upper_bound;
  int lower_bound;
 
  unsigned int rows;
  unsigned int cols;
  unsigned int size;
 
  // nodes mirrors the underlying graph and can be used to lookup the mode nodes
  // from the indices resulting from a graph search
  map_node_t nodes[MAX_MAP_SIZE];
  map_node_t *origin;
  map_node_t *destination;
 
  // Array of pointers to keep track of the optimal path
  map_node_t *path[MAX_MAP_SIZE];
  unsigned int path_length;
 
  // Underlying graph structure
  graph_t graph;
} map_t;
 
// Map functions
void createMap(map_t *map, int left, int right, int down, int up);
void addMapNode(map_t *map, int x, int y);
map_node_t* getNodeAt(map_t *map, int x, int y);
void setOrigin(map_t *map, int x, int y);
void setDestination(map_t *map, int x, int y);
void setTraversableStatus(map_t *map, int x, int y, bool traversable);
void setObstructionStatus(map_t *map, int x, int y, bool obstructed);
void clearObstructions(map_t *map);
int8_t findOptimalPath(map_t *map);
 
// Utility functions
int8_t dijkstra(map_t *map, map_node_t *src, map_node_t *dest, unsigned int *prev);
unsigned int distance(map_node_t *p1, map_node_t *p2);
unsigned int find_min(bool *unvisited_nodes, unsigned int *distances);
unsigned int find_next_unvisited(bool *unvisited_nodes, unsigned int *graph_nodes, unsigned int start);
bool isPathObstructed(map_t *map);
 
#endif //MAP_H_

map.c

#include "map.h"
 
/* Map functions */
 
void createMap(map_t *map, int left, int right, int down, int up)
{
  unsigned int i;
  int x, y;
 
  map->left_bound = left;
  map->right_bound = right;
  map->lower_bound = down;
  map->upper_bound = up;
 
  // Keep track of size
  map->rows = (map->upper_bound - map->lower_bound) + 1;
  map->cols = (map->right_bound - map->left_bound) + 1;
  map->size = map->rows * map->cols;
 
  // Initialize navigation parameters
  map->origin = NULL;
  map->destination = NULL;
  map->path_length = 0;
  for(i = 0; i < MAX_MAP_SIZE; i++)
  {
      map->path[i] = NULL;
  }
 
  // Create a graph with matching dimensions
  createGraph(&map->graph, map->rows, map->cols);
 
  // Assign x, y values to  nodes
  for(y = down; y <= up; y++)
  {
    for(x = left; x <= right; x++)
    {
      // Create a new unobstructed map node at (x,y)
      addMapNode(map, x, y);
    }
  }
}
 
void addMapNode(map_t *map, int x, int y)
{
  unsigned int n = (x - map->left_bound) + (y - map->lower_bound) * map->cols;
 
  map->nodes[n].n = n;
  map->nodes[n].x = x;
  map->nodes[n].y = y;
  map->nodes[n].obstructed = false;
  map->nodes[n].traversable = true;
  map->nodes[n].graph_node = map->graph.edges[n];
}
 
map_node_t* getNodeAt(map_t *map, int x, int y)
{
  unsigned int n = (x - map->left_bound) + (y - map->lower_bound) * map->cols;
 
  if(n > map->size)
  {
    return NULL;
  }
 
  return &map->nodes[n];
}
 
void setOrigin(map_t *map, int x, int y)
{
  map_node_t *node = getNodeAt(map, x, y);
  map->origin = node;
}
 
void setDestination(map_t *map, int x, int y)
{
  map_node_t *node = getNodeAt(map, x, y);
  map->destination = node;
}
 
void setTraversableStatus(map_t *map, int x, int y, bool traversable)
{
  map_node_t *node = getNodeAt(map, x, y);
  node->traversable = traversable;
}
 
void setObstructionStatus(map_t *map, int x, int y, bool obstructed)
{
  map_node_t *node = getNodeAt(map, x, y);
  node->obstructed = obstructed;
}
 
void clearObstructions(map_t *map)
{
  unsigned int i;
 
  // Clear all obstructions from the map
  for(i = 0; i < map->size; i++)
  {
    map->nodes[i].obstructed = false;
  }
}
 
int8_t findOptimalPath(map_t *map)
{
  unsigned int i;
  unsigned int u = map->destination->n;
  unsigned int hops[MAX_MAP_SIZE];
 
  if(map->destination == NULL || map->origin == NULL)
  {
      return -1;
  }
 
  // Delete the last path
  for(i = 0; i < map->path_length; i++)
  {
      map->path[i] = NULL;
  }
  map->path_length = 0;
 
  dijkstra(map, map->origin, map->destination, hops);
 
  if(hops[u] != UNDEFINED || u == map->origin->n)
  {
      // Determine the path length so we can reverse iterate
      while(hops[u] != UNDEFINED)
      {
        map->path_length++;
        u = hops[u];
      }
 
      // Rebuild the path by reverse iteration
      u = map->destination->n;
      for(i = map->path_length; i > 0; i--)
      {
          map->path[i] = &map->nodes[u];
          u = hops[u];
      }
  }
 
  // Add the origin to the path for completeness
  map->path[0] = map->origin;
  map->path_length++;
 
  return 0;
}
 
/* Utility functions */
 
int8_t dijkstra(map_t *map, map_node_t *src, map_node_t *dest, unsigned int *prev)
{
  // Dijkstra's Algorithm
  graph_t *graph = &map->graph;
 
  unsigned int i;
  unsigned int alt;
  unsigned int current_index, neighbor_index;
  map_node_t *current_node, *current_neighbor;
 
  unsigned int remaining_nodes;
 
  unsigned int graph_idx;
  unsigned int *graph_edges;
 
  // Create set of unvisited nodes
  bool unvisited[MAX_MAP_SIZE] = {0};
  unsigned int num_unvisited = 0;
 
  unsigned int dist[MAX_MAP_SIZE];
 
  /* Initialize temporary distance values & prev node list */
  for(i = 0; i < graph->size; i++)
  {
    // Distance to all non-source nodes is initialized to infinity
    dist[i] = INF;
    // Previous nodes are undefined because none have been visited
    prev[i] = UNDEFINED;
    // Only allow a node to be visited if it's unobstructed
    if(map->nodes[i].obstructed || !map->nodes[i].traversable)
    {
      continue;
    }
    unvisited[i] = true;
    num_unvisited++;
  }
  // Distance to the intial node is 0
  dist[src->n] = 0;
 
  remaining_nodes = num_unvisited;
  while(remaining_nodes > 0)
  {
    // Set node index with the smallest distance as current
    current_index = find_min(unvisited, dist);
    if(current_index == UNDEFINED)
    {
      prev = NULL;
      return -1;
    }
 
    // Expects a pointer
    current_node = &map->nodes[current_index];
 
    // Mark current node visited and remove from unvisited set
    unvisited[current_index] = false;
    remaining_nodes--;
 
    if(current_node == dest)
    {
      // Dest has been reached and algorithm can stop
      return 0;
    }
 
    // Get the array of connected nodes from the graph
    graph_edges = map->graph.edges[current_index];
 
    // Get the first connected, unvisited node index
    graph_idx = find_next_unvisited(unvisited, graph_edges, -1);
 
    while(graph_idx != UNDEFINED && graph_idx < MAX_EDGES_PER_NODE)
    {
      // neighbor_index = graph_node->dest;
      neighbor_index = graph_edges[graph_idx];
      current_neighbor = &map->nodes[neighbor_index];
 
      // Calculate distances through current node and all unvisited neighbors
      alt = dist[current_index] + distance(current_node, current_neighbor);
 
      // If new distance is smaller, assign it
      if(alt < dist[neighbor_index])
      {
        dist[neighbor_index] = alt;
        prev[neighbor_index] = current_index;
      }
 
      // First check if we've reached the end of the list, if not see if the next edge is defined
      // Second if statement will cause a run time error if the first condition isn't checked first
      if(graph_idx >= MAX_EDGES_PER_NODE)
      {
        break;
      }
      else if(graph_edges[graph_idx + 1] == UNDEFINED)
      {
        break;
      }
 
      // Follow the next edge in the graph
      graph_idx = find_next_unvisited(unvisited, graph_edges, graph_idx);
    }
  }
 
  // If the function reaches this point, something probably went wrong
  return -1;
}
 
unsigned int distance(map_node_t *p1, map_node_t *p2)
{
  // Returns distance squared, only need a relative value
  int d_x = p2->x - p1->x;
  int d_y = p2->y - p1->y;
  return (unsigned int) d_x*d_x + d_y*d_y;
}
 
unsigned int find_min(bool *unvisited_nodes, unsigned int *distances)
{
  unsigned int current_idx;
  float current_dist;
  float min = INF;
  unsigned int index = UNDEFINED;
 
  for(current_idx = 0; current_idx < MAX_MAP_SIZE; current_idx++)
  {
      if(unvisited_nodes[current_idx])
      {
          current_dist = distances[current_idx];
          if(current_dist < min)
          {
              min = current_dist;
              index = current_idx;
          }
      }
  }
 
  return index;
}
 
unsigned int find_next_unvisited(bool *unvisited_nodes, unsigned int *graph_edges, unsigned int start)
{
  unsigned int edge_idx = start + 1;
  unsigned int current_graph_idx = graph_edges[edge_idx];
 
  while(edge_idx < MAX_EDGES_PER_NODE)
  {
    if(current_graph_idx != UNINITIALZED && unvisited_nodes[current_graph_idx])
    {
      return edge_idx;
    }
 
    edge_idx++;
    current_graph_idx = graph_edges[edge_idx];
  }
 
  return UNDEFINED;
}
 
bool isPathObstructed(map_t *map)
{
  unsigned int i;
 
  for(i = 0; i < map->path_length; i++)
  {
    if(map->path[i]->obstructed)
    {
      return true;
    }
  }
 
  return false;
}

インターフェースレイヤ(Interface Layer)

インターフェースレイヤは、TI検出コードがマップレイヤとやり取りする場所です。ここの関数は、マップノードの障害ステータスを更新したり、計算されたパスをUARTデータパケットに追加したり、CLIを使用してマップの目的地を更新したりするために使用されます。

navigationInit()関数は、マップとグラフの構造が初期化される場所です。 レーダの最大検知距離は約20mに設定されていますが、このチュートリアルの仮想ロボットはそこまで遠くを見る必要はありません。 したがって、ナビゲーションの座標範囲は、X 軸で-10~10、Y 軸で-1~10に設定されます。ロボットは常にレーダの原点の真後ろにある空間から移動を開始するため、原点は常に(0, -1)に設定する必要があります。

updateMapFromRadar()は、レーダからの障害物データをマップにロードします。 この関数はメールボックスメッセージ内にある検出結果へのポインタを受け取りますが、実際のデータはDSPのメモリ内にあります。 このデータへのポインタはDSPによって作成されたため、MSSはデータにアクセスする前にアドレスを変換する必要があります。

:exclamation: 対応するデータにアクセスするには、DSPからのポインタに対して SOC_translateAddress(, SOC_TranslateAddr_Dir_FROM_OTHER_CPU, NULL)を使用する必要があります。これは見落としがちですが、ポインタの中身にアクセスする前にポインタを変換しないと、エラー結果を招いたり、他の問題を引き起こしたりします。

appendPathTLV()は、DSPによって作成されたフレームにパスの計算結果を書き込むために使用されます。これにより、TIの既存のシリアルフレーム構造をほとんど混乱させることなく拡張することができます。最後のステップのアドレス変換と同様に、ここにも「厄介なこと」があります。

:exclamation: 検出結果ヘッダのtotalPacketLenの値は32バイトの倍数でなければなりません。DSPは最も近い倍数に丸めますが、MSSで変更する場合は、追加を行った後にこの手順をやり直す必要があります。適切な長さに丸めないと、シリアルフレームでデータが欠落していることに気づくかもしれません。

navigate.h

#ifndef NAVIGATE_H_
#define NAVIGATE_H_
 
#include <stdint.h>
 
#include <ti/drivers/soc/soc.h>
#include "../common/mmw_messages.h"
 
#include "map.h"
 
/* Pick some number not used by TI for the path TLV ID */
#define MMWDEMO_OUTPUT_MSG_BEST_PATH    42
 
/* Clone of the structure used by the DSP */
typedef struct detected_object
{
    int16_t   speed;        /*!< @brief Doppler index */
    uint16_t   peakVal;     /*!< @brief Peak value */
    int16_t  x;             /*!< @brief x - coordinate in meters. Q format provides the bitwidth. */
    int16_t  y;             /*!< @brief y - coordinate in meters. Q format provides the bitwidth. */
} detected_object_t;
 
/* Structure for transmitting path data over UART */
typedef struct path_point
{
    int8_t x;
    int8_t y;
} path_point_t;
 
/* Function prototypes */
void navigationInit(void);
void updateMapFromRadar(MmwDemo_detInfoMsg *detInfo);
void appendPathTLV(MmwDemo_detInfoMsg *detInfo);
 
/* CLI command functions */
int32_t navigation_CLIBuildMap(int32_t argc, char *argv[]);
int32_t navigation_CLISetDestination(int32_t argc, char *argv[]);
 
#endif /* NAVIGATE_H_*/

navigate.c

#include "navigate.h"
 
static map_t radar_map;
static path_point_t points[MAX_MAP_SIZE];
 
/* Initialize structures and calculate initial path */
void navigationInit(void)
{
  createMap(&radar_map, -9, 9, -1, 10);
  setOrigin(&radar_map, 0, -1);
  setDestination(&radar_map, -1, 4);
  findOptimalPath(&radar_map);
}
 
/* Update map with obstacle data from radar */
void updateMapFromRadar(MmwDemo_detInfoMsg *detInfo)
{
  uint16_t i;
  map_t *map = &radar_map;
 
  uint8_t *bytes_ptr;
  MmwDemo_msgTlv current_tlv;
  MmwDemo_output_message_dataObjDescr *desc;
  detected_object_t *detected_objs;
  detected_object_t current_obj;
 
  uint32_t num_objs;
  int16_t x, y;
 
  /* Get detected point data from the TLVs */
  for(i = 0; i < detInfo->header.numTLVs; i++)
  {
      current_tlv = detInfo->tlv[i];
 
      if(current_tlv.type == MMWDEMO_OUTPUT_MSG_DETECTED_POINTS)
      {
          /* Address has to be translated from the other core */
          bytes_ptr = (uint8_t*) SOC_translateAddress(current_tlv.address, SOC_TranslateAddr_Dir_FROM_OTHER_CPU, NULL);
 
          /* Extract only the detected object structure */
          desc = (MmwDemo_output_message_dataObjDescr *) bytes_ptr;
          detected_objs = (detected_object_t *) (bytes_ptr + sizeof(MmwDemo_output_message_dataObjDescr));
 
          num_objs = desc->numDetetedObj;
 
          // Don't loop any longer than needed
          break;
      }
  }
 
  // Sanity check
  if(num_objs > 50)
  {
      return;
  }
 
  // Clear the detected obstructions from the last frame
  clearObstructions(map);
 
  // For each object detected by the radar, mark its coordinates on the map as obstructed
  // Distances are in meters (might need to be scaled for Q format)
  // Assume that each map grid space is 1 square meter
  for(i = 0; i < num_objs; i++)
  {
    current_obj = detected_objs[i];
 
    /* Scale for Q format of x, y coords */
    x = current_obj.x >> desc->xyzQFormat;
    y = current_obj.y >> desc->xyzQFormat;
 
    /* Coordinates are simply integers for now
     * Would be helpful to handle fractional values in the future
     * Block the whole space if something is in it*/
    if(x > map->right_bound || x < map->left_bound)
    {
        continue;
    }
 
    if(y > map->upper_bound || y < map->lower_bound)
    {
        continue;
    }
 
    // Don't allow the origin to be obstructed because we're already there and the resulting path would always be null
    if(x == map->origin->x && y == map->origin->y)
    {
        continue;
    }
 
    setObstructionStatus(map, x, y, true);
  }
 
  // If the original path has been obstructed, calculate a new one
  if(isPathObstructed(map) || map->path_length <= 1)
  {
    findOptimalPath(map);
  }
 
}
 
/* Hijack the TLV structure from the DSS to transmit the path */
void appendPathTLV(MmwDemo_detInfoMsg *detInfo)
{
    unsigned int i;
 
    uint32_t new_tlvIdx = detInfo->header.numTLVs;
    uint32_t new_tlvLength = radar_map.path_length * sizeof(path_point_t);
    uint32_t totalPacketLen;
 
    /* Fill out the point array used to transmit the path */
    for(i = 0; i < radar_map.path_length; i++)
    {
        points[i].x = radar_map.path[i]->x;
        points[i].y = radar_map.path[i]->y;
    }
 
    /* Add the new TLV to the message */
    detInfo->tlv[new_tlvIdx].length = new_tlvLength;
    detInfo->tlv[new_tlvIdx].type = MMWDEMO_OUTPUT_MSG_BEST_PATH;
    detInfo->tlv[new_tlvIdx].address = (uint32_t) &points;
 
    // Increment the number of TLVs
    detInfo->header.numTLVs++;
 
    // Increment total packet length
    totalPacketLen = detInfo->header.totalPacketLen + new_tlvLength;
 
    /* Round total length up to a multiple of 32 */
    detInfo->header.totalPacketLen = MMWDEMO_OUTPUT_MSG_SEGMENT_LEN * ((totalPacketLen + (MMWDEMO_OUTPUT_MSG_SEGMENT_LEN-1))/MMWDEMO_OUTPUT_MSG_SEGMENT_LEN);
}
 
/* Command line functions for run-time path management */
int32_t navigation_CLIBuildMap(int32_t argc, char *argv[])
{
    int xmin, xmax, ymin, ymax;
 
    if(argc != 5)
    {
        return -1;
    }
 
    xmin = (int) atoi(argv[1]);
    xmax = (int) atoi(argv[2]);
    ymin = (int) atoi(argv[3]);
    ymax = (int) atoi(argv[4]);
 
    createMap(&radar_map, xmin, xmax, ymin, ymax);
    return 0;
}
 
int32_t navigation_CLISetDestination(int32_t argc, char *argv[])
{
    int x, y;
 
    if(argc != 3)
    {
        return -1;
    }
 
    x = (int) atoi(argv[1]);
    y = (int) atoi(argv[2]);
 
    setDestination(&radar_map, x, y);
    findOptimalPath(&radar_map);
    return 0;
}

ビジュアル化

実物のロボットがない場合、ロボットがたどる経路を可視化するためにプロットが使用されます。TIは MATLAB を使ってGUIを実装し、スクリプトのコンパイルバージョンを提供しています。 MATLAB のライセンスを持っていない場合、これを変更するのは簡単ではありません。そのため、このチュートリアルでは、無料で簡単にアクセスできる Python を使って同様の結果を得ることにします。以下に可視化結果の例を示します。

フレーム構造

Pythonプログラムの最下位レベルでは、組み込みデバイスが使用する構造をミラーリングして、バイナリ データを人間が読みやすい形式に変換します。Frames.pyは、レーダからの1つのパケット全体を表すFrameクラスを定義します。 クラスコンストラクタへの入力は、シリアル ポートから受信した全バイト文字列です。 次に、文字列は、struct.unpackを使用してヘッダのさまざまなパラメータと、含まれる各TLV(Type Length Value)データベクトルに分析され、シリアル化されたC構造体が構造体の要素のPythonタプル(tuple)に変換されます。

:white_check_mark: AWR1642 からのシリアルフレームはリトルエンディアン形式であるため、適宜変換する必要があります。

frames.py

import struct
 
 
class FrameError(Exception):
    def __init__(self, value):
        self.value = value
 
 
class TLVData:
    def __init__(self, serial_tlv):
        pass
 
    @staticmethod
    def preparsing(serial_tlv):
        # Only used if there is extra data in a frame outside of the regular objects
        return None, serial_tlv
 
    def __str__(self):
        result = ''
        for key in self.__dict__.keys():
            result += '{}: {}\n'.format(key, self.__dict__[key])
        return result
 
 
class DataObjDescriptor:
    def __init__(self, serial_tlv):
        elements = struct.unpack('<2h', serial_tlv)
        self.numDetectedObj = elements[0]
        self.xyzQFormat = elements[1]
 
 
class DetectedPoints(TLVData):
    NAME = 'DETECTED_POINTS'
    SIZE_BYTES = 8
 
    def __init__(self, serial_tlv):
        super(DetectedPoints, self).__init__(serial_tlv)
        # Unpack elements from the structure
        elements = struct.unpack('<hHhh', serial_tlv)
        self.speedIdx = elements[0]
        self.peakVal = elements[1]
        self.x = elements[2]
        self.y = elements[3]
 
    @staticmethod
    def preparsing(serial_tlv):
        # Strip off the 4 byte description header
        descriptor = DataObjDescriptor(serial_tlv[0:4])
        stripped_tlv = serial_tlv[4:]
        return descriptor, stripped_tlv
 
 
class ClusteringResults(TLVData):
    NAME = 'CLUSTERING_RESULTS'
    SIZE_BYTES = 8
 
    def __init__(self, serial_tlv):
        super(ClusteringResults, self).__init__(serial_tlv)
        # Unpack elements from the structure
        elements = struct.unpack('<4h', serial_tlv)
        self.xCenter = elements[0]
        self.yCenter = elements[1]
        self.xSize = elements[2]
        self.ySize = elements[3]
 
    @staticmethod
    def preparsing(serial_tlv):
        # Strip off the 4 byte description header
        descriptor = DataObjDescriptor(serial_tlv[0:4])
        stripped_tlv = serial_tlv[4:]
        return descriptor, stripped_tlv
 
 
class TrackedObjects(TLVData):
    NAME = 'TRACKED_OBJECTS'
    SIZE_BYTES = 12
 
    def __init__(self, serial_tlv):
        super(TrackedObjects, self).__init__(serial_tlv)
        # Unpack elements from the structure
        elements = struct.unpack('<6h', serial_tlv)
        self.x = elements[0]
        self.y = elements[1]
        self.xd = elements[2]
        self.yd = elements[3]
        self.xsize = elements[4]
        self.ysize = elements[5]
 
    @staticmethod
    def preparsing(serial_tlv):
        # Strip off the 4 byte description header
        descriptor = DataObjDescriptor(serial_tlv[0:4])
        stripped_tlv = serial_tlv[4:]
        return descriptor, stripped_tlv
 
 
class ParkingAssist(TLVData):
    NAME = 'PARKING_ASSIST'
    SIZE_BYTES = 2
 
    def __init__(self, serial_tlv):
        super(ParkingAssist, self).__init__(serial_tlv)
        # elements = struct.unpack('h', serial_tlv)
 
    @staticmethod
    def preparsing(serial_tlv):
        # Strip off the 4 byte description header
        descriptor = DataObjDescriptor(serial_tlv[0:4])
        stripped_tlv = serial_tlv[4:]
        return descriptor, stripped_tlv
 
 
class BestPath(TLVData):
    NAME = 'BEST_PATH'
    SIZE_BYTES = 2
 
    def __init__(self, serial_tlv):
        super(BestPath, self).__init__(serial_tlv)
        elements = struct.unpack('<2b', serial_tlv)
        self.x = elements[0]
        self.y = elements[1]
 
 
# Map type ID to the corresponding class
TLV_TYPES = {
    # Obstacle detection
    1: DetectedPoints,
    2: ClusteringResults,
    3: TrackedObjects,
    4: ParkingAssist,
    # Custom
    42: BestPath,
}
 
 
class TLVHeader:
    SIZE_BYTES = 8
 
    def __init__(self, serial_tlv_header):
        elements = struct.unpack('<2I', serial_tlv_header)
        self.type = elements[0]
        self.length = elements[1]
 
    def __str__(self):
        result = 'TLV Header:\n'
        for key in self.__dict__.keys():
            result += '{}: {}\n'.format(key, self.__dict__[key])
        return result
 
 
class TLV:
    def __init__(self, serial_tlv):
        self.header = TLVHeader(serial_tlv[0:TLVHeader.SIZE_BYTES])
 
        # Lookup constructor for the specific type of object
        self.obj_class = TLV_TYPES[self.header.type]
        self.name = self.obj_class.NAME
        # Note this size is PER OBJECT
        self.obj_size = self.obj_class.SIZE_BYTES
 
        # Strip off excess headers before parsing objects
        headerless_tlv = serial_tlv[TLVHeader.SIZE_BYTES:]
        self.descriptor, processed_tlv = self.obj_class.preparsing(headerless_tlv)
        try:
            self.objects = self.parse_objects(processed_tlv)
        except struct.error as e:
            # Save whole frame from failing if one TLV is bad
            print('Exception while parsing TLV objects: ', e)
            self.objects = []
 
    def __str__(self):
        result = str(self.header) + 'Name: {}\n'.format(self.name)
        for each in self.objects:
            result += str(each)
        return result
 
    def parse_objects(self, serial_tlv):
        objects = []
        num_objects = int(self.header.length / self.obj_size)
        for i in range(0, num_objects):
            new = self.obj_class(serial_tlv[0:self.obj_size])
            objects.append(new)
            serial_tlv = serial_tlv[self.obj_size:]
 
        return objects
 
 
class FrameHeader:
    def __init__(self, serial_header):
        self.full_header = serial_header
 
    def __str__(self):
        result = 'Frame Header:\n'
        for key in self.__dict__.keys():
            result += '{}: {}\n'.format(key, self.__dict__[key])
 
        return result
 
    def verify_checksum(self):
        pass
 
 
class ShortRangeRadarFrameHeader(FrameHeader):
    SIZE_BYTES = 40
    PACKET_LENGTH_END = 16
 
    def __init__(self, serial_header):
        super().__init__(serial_header)
        self.sync = serial_header[0:8]
        self.version = serial_header[8:12]
        self.packetLength = struct.unpack('<I', serial_header[12:16])[0]
        self.platform = serial_header[16:20]
 
        values = struct.unpack('<5I', serial_header[20:40])
        self.frameNumber = values[0]
        self.timeCpuCycles = values[1]
        self.numDetectedObj = values[2]
        self.numTLVs = values[3]
        self.subFrameNumber = values[4]
 
 
class Frame:
    FRAME_START = b'\x02\x01\x04\x03\x06\x05\x08\x07'
 
    def __init__(self, serial_frame, frame_type):
        # Parse serial data into header and TLVs
        # Note that frames are LITTLE ENDIAN
 
        # Length should be > header_size
        frame_length = len(serial_frame)
        if frame_length < frame_type.SIZE_BYTES:
            raise FrameError('Frame is smaller than required header size. '
                             'Expected length {}. Measured length {}.'.format(frame_type.SIZE_BYTES, frame_length))
 
        # Initialize the header
        self.header = frame_type(serial_frame[0:frame_type.SIZE_BYTES])
 
        # Second sanity check
        if frame_length < self.header.packetLength:
            raise FrameError('Frame is too small. Expected {} bytes, '
                             'receieved {} bytes.'.format(self.header.packetLength, frame_length))
 
        # Convert remaining data into TLVs
        full_tlv_data = serial_frame[frame_type.SIZE_BYTES:]
        tlv_data = full_tlv_data
        self.tlvs = []
        for i in range(self.header.numTLVs):
            # Check header to get length of each TLV
            length = TLVHeader.SIZE_BYTES + TLVHeader(tlv_data[0:TLVHeader.SIZE_BYTES]).length
            # Create a 'length' bytes TLV instance
            new_tlv = TLV(tlv_data[0:length])
            self.tlvs.append(new_tlv)
 
            # Slice off the consumed TLV data
            tlv_data = tlv_data[length:]
 
    def __str__(self):
        # Print header followed by TLVs
        result = "START FRAME\n"
        result += str(self.header)
        result += 'TLVs: {\n'
        for each in self.tlvs:
            result += str(each)
        result += '}\n'
        result += 'END FRAME\n'
        return result

シリアルインターフェース

次のレベルは、PySerialを使用して実装された実際のシリアルインターフェースです。 インターフェースは、レーダボードの制御およびデータ シリアルポートを開き、データをキューに読み取ります。 各COM ポートは独自のTxキューとRxキュー、および独自のスレッドを取得するため、プログラムの他の部分から独立して実行できます。

frames.pyで提供されるデータを使って、インターフェースはフレームの開始「magic word」をチェックします。新しいフレームが検出されると、フレームタイプによって指定されたバイト数が、全パケット長を受信するまで読み込まれます。インターフェースはパケットの全長まで読み、結果として得られるフレームをメインアプリケーションによって処理される受信キューに入れます。

serial_interface.py

import serial
import time
import struct
from multiprocessing import Queue
from threading import Thread
 
from frames import Frame
 
 
class SerialInterface:
    def __init__(self, user_port, data_port, frame_type):
        self.control_uart = serial.Serial(user_port, 115200)
        self.data_uart = serial.Serial(data_port, 921600)
        self.frame_type = frame_type
 
        self.uarts_enable = False
 
        # Use Queues to transfer data between processes
        self.control_tx_queue = Queue()
        self.control_rx_queue = Queue()
        self.data_tx_queue = Queue()
        self.data_rx_queue = Queue()
 
        # Spawn processes to handle serial data transfer in the background
        self.control_process = Thread(target=self.process_control_uart)
        self.data_process = Thread(target=self.process_data_uart)
 
    def process_control_uart(self):
        while self.uarts_enable:
            self._read_serial_lines(self.control_uart, self.control_rx_queue)
            self._write_serial(self.control_uart, self.control_tx_queue)
 
            # Sleep briefly to allow other threads to run and data to arrive
            time.sleep(0.001)
 
    def process_data_uart(self):
        while self.uarts_enable:
            self._read_serial_frames(self.data_uart, self.data_rx_queue, self.frame_type)
            self._write_serial(self.data_uart, self.data_tx_queue)
 
            # Sleep briefly to allow other threads to run and data to arrive
            time.sleep(0.001)
 
    @staticmethod
    def _read_serial_lines(ser_inst, rx_q):
        # Read a byte at a time until a \n is reached
        byte_data = b''
        while ser_inst.in_waiting:
            byte = ser_inst.read(1)
            byte_data += byte
            if byte == '\n'.encode():
                break
 
        if byte_data:
            data = byte_data.decode()
            rx_q.put(data)
 
    @staticmethod
    def _read_serial_frames(ser_inst, rx_q, frame_type):
        data = b''
        bytes_read = 0
        sync = False
        while ser_inst.in_waiting:
            byte = ser_inst.read(1)
            data += byte
 
            if not sync and Frame.FRAME_START in data:
                # Start of frame found, discard extra data
                sync = True
                sync_data = data[data.find(Frame.FRAME_START):]
                bytes_read += len(Frame.FRAME_START)
 
                # Read bytes until the packet length is received
                length_data = ser_inst.read(frame_type.PACKET_LENGTH_END - len(Frame.FRAME_START))
                bytes_read += len(length_data)
                packet_length = struct.unpack('<I', length_data[-4:])[0]
                if packet_length > 5000:
                    print("Packet length is {} which seems too large.  Skipping...".format(packet_length))
                    break
 
                # Read remaining bytes specified by the header
                remaining_data = ser_inst.read(packet_length - bytes_read)
 
                full_frame = sync_data + length_data + remaining_data
 
                rx_q.put(full_frame)
 
    @staticmethod
    def _write_serial(ser_inst, tx_q):
        if not tx_q.empty():
            data = tx_q.get()
            data = data.encode()
 
            ser_inst.write(data)
 
    @staticmethod
    def send_item(tx_q, data):
        # Send one item (typically a line ending in '\n') to the tx buffer
        tx_q.put(data)
 
    @staticmethod
    def recv_item(rx_q):
        # Fetch one line or frame from the rx buffer
        return rx_q.get() if not rx_q.empty() else None
 
    def start(self):
        self.uarts_enable = True
 
        self.control_process.start()
        self.data_process.start()
 
    def stop(self):
        self.uarts_enable = False
 
        self.control_process.join()
        self.data_process.join()

プロット

検出された点と経路の結果はMatplotlibを使ってプロットされています。障害物がある領域は赤で、経路に沿った領域は緑で示されています。各領域は1m四方です。これは地図の解像度が1mと以前に定義されていたからです。枠で囲まれた領域は、例えば(0, 0)の地図ノードが0と1の間の現実世界の正方形の領域に対応したものになっており、見やすくなっています。

MatplotlibのアニメーションAPIは、プロットを定期的に更新し、変更を描画するために使用されます。最後に、レーダとロボットからのデータは、プロットされる前に最終的にフォーマットされます。

nav_plot.py

import numpy as np
import time
 
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib import patches
from matplotlib.collections import PatchCollection
 
from math import tan, radians
 
 
class NavPlot:
    def __init__(self, radar_bounds, radar_queue_in=None):
        matplotlib.rcParams['toolbar'] = 'None'
 
        # Copy all the bounding data
        self.radar_left_bound = radar_bounds[0]
        self.radar_right_bound = radar_bounds[1]
        self.radar_back_bound = radar_bounds[2]
        self.radar_front_bound = radar_bounds[3]
 
        # Queues to receive data
        self.radar_queue = radar_queue_in
 
        # Initialize plots
        self.fig = plt.figure()
        self.radar_ax = self.fig.add_subplot(111)
        self.radar_plot_init()
 
        # Initialize the actual plot with empty data
        self.obj_scatter = self.radar_ax.scatter([], [], s=60, marker='o')
        self.radar_path_plot, = self.radar_ax.plot([], [], 'go--')
 
        # Save results from previous update in case there is no new radar data ready, keeps plot smooth
        self.prev_artists = []
 
        # Set refresh rate to ~60 Hz
        self.ani = FuncAnimation(self.fig, self.update, interval=16, blit=True)
 
    def radar_plot_init(self):
        self.radar_ax.set_title('Radar Plot')
        self.radar_ax.set_aspect('equal')
        self.radar_ax.set_xlabel('X (m)')
        self.radar_ax.set_ylabel('Y (m)')
        self.radar_ax.set_xlim(self.radar_left_bound, self.radar_right_bound)
        self.radar_ax.set_ylim(self.radar_back_bound, self.radar_front_bound)
 
        # Invert axes to match current positioning
        # self.radar_ax.invert_yaxis()
        # self.radar_ax.invert_xaxis()
        # self.radar_ax.xaxis.set_label_position('top')
        # self.radar_ax.xaxis.tick_top()
 
        # Configure grid
        minor_xticks = np.arange(self.radar_left_bound, self.radar_right_bound, 0.5)
        minor_yticks = np.arange(self.radar_back_bound, self.radar_front_bound, 0.5)
        self.radar_ax.set_xticks(minor_xticks, minor=True)
        self.radar_ax.set_yticks(minor_yticks, minor=True)
        self.radar_ax.grid(True, which='both')
 
        # Draw an section to make the graph look radar-y
        x_size = abs(self.radar_left_bound) + abs(self.radar_right_bound)
        y_size = 2 * abs(self.radar_front_bound)
        arc = patches.Arc((0, 0), x_size, y_size, 0, theta1=30, theta2=150)
 
        half_rad = self.radar_front_bound / 2
        line_x = half_rad * tan(radians(60))
        left_line = patches.ConnectionPatch((0.5, 0), (-line_x, half_rad), coordsA='data', coordsB='data')
        right_line = patches.ConnectionPatch((0.5, 0), (line_x, half_rad), coordsA='data', coordsB='data')
 
        self.radar_ax.add_patch(arc)
        self.radar_ax.add_patch(left_line)
        self.radar_ax.add_patch(right_line)
 
    def update_obstacles(self):
        # Keep track of things that have changed for the animation
        modified_artists = []
 
        # Obstacle detection data
        radar_data = self.radar_queue.get()
        if len(radar_data) == 0:
            # Keep data from last frame so plot doesn't get choppy
            modified_artists = self.prev_artists
        else:
            # Process each TLV type
            if 'DETECTED_POINTS' in radar_data.keys():
                detections = radar_data['DETECTED_POINTS']
                self.obj_scatter.set_offsets(detections[:, 0:2])
                modified_artists.append(self.obj_scatter)
 
                # Create shaded regions to mark blocked areas
                boxes = []
                for pt in detections:
                    xmin = np.floor(pt[0])
                    ymin = np.floor(pt[1])
 
                    box = patches.Rectangle((xmin, ymin), width=1.0, height=1.0, fill=True)
                    boxes.append(box)
 
                detect_pc = PatchCollection(boxes, facecolor='r', alpha=0.2)
                self.radar_ax.add_collection(detect_pc)
                modified_artists.append(detect_pc)
 
            if 'CLUSTERING_RESULTS' in radar_data.keys():
                clusters = radar_data['CLUSTERING_RESULTS']
 
                for cluster in clusters:
                    xmin = cluster[0] - (cluster[1] / 2)
                    width = cluster[1]
                    ymin = cluster[2] - (cluster[3] / 2)
                    height = cluster[3]
                    rect = patches.Rectangle((xmin, ymin), width=width, height=height, color='#99ff33', fill=False)
 
                    # Do the rectangles individually to preserve the fill=False flag (does't work with PatchCollection)
                    self.radar_ax.add_patch(rect)
                    modified_artists.append(rect)
 
            if 'BEST_PATH' in radar_data.keys():
                path = radar_data['BEST_PATH']
                self.radar_path_plot.set_data(path[:, 0], path[:, 1])
                modified_artists.append(self.radar_path_plot)
 
                # Draw the origin in a different color
                xmin = np.floor(path[0][0])
                ymin = np.floor(path[0][1])
                box = patches.Rectangle((xmin, ymin), width=1.0, height=1.0, fill=False, color='b')
                self.radar_ax.add_patch(box)
                modified_artists.append(box)
 
                boxes = []
                for pt in path[1:]:
                    xmin = np.floor(pt[0])
                    ymin = np.floor(pt[1])
 
                    box = patches.Rectangle((xmin, ymin), width=1.0, height=1.0, fill=False)
                    boxes.append(box)
 
                path_pc = PatchCollection(boxes, facecolors='g', alpha=0.5)
                self.radar_ax.add_collection(path_pc)
                modified_artists.append(path_pc)
 
        self.prev_artists = modified_artists
        return modified_artists
 
    def update(self, frame):
        # Keep track of things that have changed for the animation
        modified_artists = []
 
        # Add any changes from either the obstacle detection or robot movement
        modified_artists.extend(self.update_obstacles())
 
        return modified_artists
 
    def show(self):
        plt.tight_layout()
        plt.show()

メインアプリケーション

メインアプリケーションはすべてをつなぎ合わせます。その役割は、シリアルインターフェースからデータを取得し、Frameインスタンスに変換し、Frameデータをプロットに送信することです。すべてが確実に連携して実行されるように、メインアプリケーションはレーダフレームの読み取りと処理を取り扱う別のスレッドを作成します。また、スレッドは、ナビゲーションの目的地を15秒ごとに新しいランダムな場所に更新します。また、すべてのスレッド間でデータを渡すために使用されるキューも作成されます。 最後に、メインスレッドで実行されるプロットを作成します。デモはプロット ウィンドウが閉じられるまで実行されます。

mmwave_nav.py

import time
import struct
import numpy as np
from threading import Thread
from multiprocessing import Queue
 
from serial_interface import SerialInterface
# Use the correct frame format for the firmware
from frames import Frame, ShortRangeRadarFrameHeader, FrameError
from nav_plot import NavPlot
from cli_utils import *
 
# The SRR firmware is configured for ~20m range with 120 degree viewing angle, about half range should be fine
radar_bounds = (-10, 10, -1, 10)
 
 
def generate_new_destination():
    dist = 0
    new_x = 0
    new_y = 0
    while dist < 2:
        new_x = np.random.randint(radar_bounds[0], radar_bounds[1], dtype=np.int8)
        new_y = np.random.randint(0, radar_bounds[3], dtype=np.int8)
 
        dist = np.sqrt(np.power(new_x, 2) + np.power(new_y, 2))
 
    return new_x, new_y
 
 
def randomize_destination(last_time, period, serial_inst):
    now = time.time()
    if now - last_time > period:
        x, y = generate_new_destination()
        send_destination(serial_inst, x, y)
        return now
    else:
        return last_time
 
 
def process_frame(serial_inst, plot_queue):
    # Get a frame from the data queue and parse
    last_dest_update = 0
    while serial_inst.uarts_enable:
        # Update to a new random destination every 15 seconds
        last_dest_update = randomize_destination(last_dest_update, 15, serial_inst)
 
        serial_frame = serial_inst.recv_item(serial_inst.data_rx_queue)
        if serial_frame:
            try:
                frame = Frame(serial_frame, frame_type=serial_inst.frame_type)
                results = dict()
 
                # Frames that don't contain the parking assist data only have 1-2 points
                # Plotting them makes the graph run choppily so just ignore them
                if 'PARKING_ASSIST' in [tlv.name for tlv in frame.tlvs]:
                    # There can be at most one of each type of TLV in the frame
                    for tlv in frame.tlvs:
                        objs = tlv.objects
 
                        if tlv.name == 'DETECTED_POINTS':
                            tuples = [(float(obj.x), float(obj.y)) for obj in objs]
                            coords = np.array(tuples) / 2 ** tlv.descriptor.xyzQFormat
                            results['DETECTED_POINTS'] = coords
 
                        elif tlv.name == 'CLUSTERING_RESULTS':
                            tuples = [(float(obj.xCenter), float(obj.xSize),
                                       float(obj.yCenter), float(obj.ySize)) for obj in objs]
                            data = np.array(tuples) / 2 ** tlv.descriptor.xyzQFormat
                            data = np.around(data, 3)
                            results['CLUSTERING_RESULTS'] = data
 
                        elif tlv.name == 'BEST_PATH':
                            tuples = [(obj.x, obj.y) for obj in objs]
                            # For plotting, offset path by 0.5 so it plots the points in the center of each 1m block
                            path = np.array(tuples) + 0.5
                            if path.size == 0:
                                results['BEST_PATH'] = np.array([[0, 0]])
                            else:
                                results['BEST_PATH'] = path
 
                    plot_queue.put(results)
            except (KeyError, struct.error, IndexError, FrameError, OverflowError) as e:
                # Some data got in the wrong place, just skip the frame
                print('Exception occured: ', e)
                print("Skipping frame due to error...")
 
        # Sleep to allow other threads to run
        time.sleep(0.001)
 
 
def run_demo(control_port, data_port, reconfig=False):
    # Open serial interface to the device
    # Specify which frame/header structure to search for
    interface = SerialInterface(control_port, data_port, frame_type=ShortRangeRadarFrameHeader)
    interface.start()
 
    # Write configs to device and start the sensor
    if reconfig:
        print("Sending configuration command...")
        interface.send_item(interface.control_tx_queue, 'advFrameCfg\n')
        time.sleep(3)
 
        print("Starting sensor...")
        interface.send_item(interface.control_tx_queue, 'sensorStart\n')
 
    # Create queues that will be used to transfer data between processes
    radar2plot_queue = Queue()
 
    # Create a thread to parse the frames
    # Sends data to both the plot and the robot
    processing_thread = Thread(target=process_frame, args=(interface, radar2plot_queue,))
    processing_thread.start()
 
    # Plot instance
    # Receives data from the processing and robot threads
    my_plot = NavPlot(radar_bounds, radar2plot_queue)
    my_plot.show()
 
    # When the plot is closed, stop all the threads and safely end the program
    print("Shutting down...")
 
    interface.send_item(interface.control_tx_queue, 'sensorStop\n')
    interface.stop()
    processing_thread.join()
 
 
if __name__ == "__main__":
    import argparse
 
    parser = argparse.ArgumentParser(description='Run mmWave navigation visualization demo.')
    parser.add_argument('control_port', help='COM port for configuration, control')
    parser.add_argument('data_port', help='COM port for radar data transfer')
    parser.add_argument('--reconfig', help='Send configuration settings to radar', action='store_true')
    args = parser.parse_args()
 
    run_demo(args.control_port, args.data_port, args.reconfig)

デモの実行

  1. 上記のリソースセクションからプロジェクトのソースファイルをダウンロードしてください。
  2. 上記の「前提条件」と「デバッグ」の手順を完了してください。
  3. Code Composer Studioでsrrdemo_16xx_mssを右クリックし、Import → Import… → General → File Systemを選択します。
  4. ステップ1でダウンロードしたソースファイルからすべてをインポートします。



  5. 要求があれば、既存のTIソースファイルを上書きします。
  6. 両方のプロジェクトを選択し、右クリックしてRebuild Projectを選択し、クリーンとビルドを行います。



  7. ラボのユーザーマニュアルにあるデバッグ命令を使用するか、最後のステップで得られた.bin fileとUniflashを使用してボードをプログラムします。プログラムされると、ボードはコンフィギュレーションされるのを待ちます。
  8. 必要なPythonパッケージをインストールします。できれば仮想環境内で。

pip install

cd <path to downloaded source files>
pip install -r requirements.txt
  1. ビジュアライザを実行するには、ボードをPCに接続し、作成するシリアルポートを指定します。reconfigフラグは、AWR1642がまだ起動していない場合のみ必要です。Pythonスクリプトを停止して再実行するだけであれば、ボードを再設定する必要はありません。

Run Demo

cd <path to downloaded source files>
python mmwave_nav.py <Control Port> <Data Port> --reconfig

まとめ

このチュートリアルでは、Texas Instrumentの XWR1642 mmWaveセンサキットを使って開発を始めるための手順を、TIのサンプルに基づいてロボットナビゲーションアプリケーションを構築することによって説明しました。これはかなり単純な経路計画の実装ですが、自分のプロジェクトにmmWaveレーダを組み込もうとしている人にとって、出発点となるはずです。次のアクションとしては、これを実際のロボットに組み込み、1m四方の単純なアプローチではなく、経路結果の粒度を改善することでしょう。

ご質問がある場合、またはフィードバックをお寄せになりたい場合は、Digi-KeyのTechForumにアクセスしてください。




オリジナル・ソース(English)