Dev

그리드 분할을 이용한 Weighted A* 성능 최적화

칼퇴시켜주세요 2023. 4. 1. 20:32
728x90

지난 포스팅에서 A*, Weighted A*에 대해 설명하고 왜 경로찾는 알고리즘에서 Weighted A*가 조금더 성능이 좋은지 알아보았습니다. 사실 A*부터 weighted A* 까지 오게된 이유는 개발중인 프로젝트에서 장애물을 피하면서 특정 경유지를 필수로 지나 목적지 까지 NxN크기의 그리드에서 배관을 자동으로 설계 할수 있도록 경로를 구해야하는 이슈가 있었습니다.

 

일반적인 A*알고리즘을 이용하면 dijkstra 보다 큰 그리드에 대해 출발지와 목적지 사이의 경로를 빠르게 구할수 있습니다. 하지만 이 또한 그리드 크기가 커지거나 경유지가 많아질 경우 불필요한 확장을 많이 하여 탐색 속도가 저하됩니다.

 

기존의 A*알고리즘의 성능을 향상시키기 위해 weighted A*알고리즘을 도입하였고 휴리스틱 함수에 가중치를 곱하여 목적지와 가까운 노드는 상대적으로 목적지와 먼 노드보다 적은 fCost를 갖도록 하였습니다. 이전 포스팅(WeightedA*)에서 성능 향상을 확인 할 수 있었습니다.

 

A Star Algorithm(A*, Weighted A*)

A Star 알고리즘은 기본으로 Djikstra 알고리즘을 확장한 개념으로 다음 노드로 이동하는 과정에서 가중치가 가장 작은 노드를 우선으로 탐색하는 개념은 동일하다. 우선순위 큐에서 Top을 뽑을 때 d

devconf.tistory.com

이후 100x100, 200x200, 300x300, 400x400, 500x500 크기의 그리드, 경유지 20개, 장애물 그리드 크기의 30%를 기준으로 성능 프로파일링을 진행하였습니다.

 

경유지가 많지 않다면 weighted A*알고리즘으로 충분히 커버가 가능한 부분이였습니다. 하지만 이전에 지나온 경유지를 관통해서 다음 경유지로 경로를 탐색할 수 없다는 가정하에  경유지가 많아질 수록 성능 많이 떨어지는것을 확인하였습니다.

 

성능이 떨어진 이유는 경유지를 무조건 지나가는데 자동으로 현재 위치에서 가장 가까운(경로 길이가 가장 짧은) 경유지를 선택하여 이동하는 방식을 적용하였기 때문입니다. 예를들어 현재 위치에서 가장 가까운 경유지를 선택한다고 가정을 해보겠습니다. 경유지가 a,b,c가 존재할때 a까지의 맨해튼 거리가 다른 b,c보다 가장 짧다고 하였을때 단순하게 현재 위치에서 a까지 weighted A*알고리즘을 이용하여 경로를 구하면 될것 같습니다. 

 

하지만 실제로 현재 위치에서 a까지의 경로는 중간에 장애물 때문에 b,c보다 길 수 있습니다. 이를 해결하기 위해 현재 위치에서 모든 경유지까지 weighted A*를 이용해 가장 짧은 경유지를 구하고 해당 경유지를 먼저 방문하도록 구현할 수 있습니다. 

 

처음에는 위의 방법을 이용하여 모든 경유지를 방문하는 경로를 구하였습니다. 그러나 실제 성능 프로파일 결과 모든 경유지 사이에서 가장 짧은 경로를 weighted A*로 구할 경우 만약 경유지가 20개라면 210번의 weighted A*를 수행해야 하기 때문에 성능 저하가 발생하는 것은 당연한 것이였습니다.


그리드 분할과 Weighted A*

어떻게 하면 더욱 빠르게 구할수 있을까 고민하고 찾아본 결과 몇가지 방법이 존재하였는데 다음과 같습니다.

1. 그리드 분할

2. 분산 처리

3. 휴리스틱 함수 조정

 

이중에서 가장 간단한 그리드 분할을 이용해 보겠습니다.

 

그리드 분할이란, 그리드 크기를 일정 크기로 분할하여 분할된 범위 안에서만 노드를 확장하여 확장으로 발생하는 탐색 속도 저하를 막을수 있습니다.

 

하지만 무조건 분할을 크게 한다고 하여 좋은 결과를 내는것은 아닙니다. 만약 분할 크기가 너무 크다면 정확한 경로를 얻을수 없을 수 있습니다. 반면 분할 크기를 너무 작게 할 경우 비교적 정확한 경로는 얻을 수 있지만 분할을 함으로써 얻을 수 있는 탐색 속도 향상이 적을수 있습니다.

 

그러면 그리드 분할을 이용한 코드를 보고 간단한 설명을 하겠습니다.

private fun autoFindPath(start:Node,finish:Node,obstacles: MutableSet<Node>,stepOvers: MutableSet<Node>,grid: Array<Array<Double>>): MutableList<Node>{
        //visited 초기화
        var visited = mutableSetOf<Node>()
        val path = mutableListOf<Node>()
        val weight =4.7
        var current = start
        //partitionFinish 대신에 일반 finish하면?
        obstacles.add(finish)
        while (stepOvers.isNotEmpty()) {
            try {
                var highLevelStart = Node(current.x / PARTITION_SIZE, current.z / PARTITION_SIZE)
                var minPartialHighLevelPath = mutableListOf<Node>()

                var realEnd =current
                var currentIdx:Int =0

                for ((idx,stepOver) in stepOvers.withIndex()) {
                    var end = stepOver
                    // high-level로 그리드를 크기를 10x10으로 분할한다.
                    visited.clear()
                    var highLevelEnd = Node(end.x / PARTITION_SIZE, end.z / PARTITION_SIZE)

                    val highLevelPath = weightedAStar(
                        highLevelStart,
                        highLevelEnd,
                        grid,
                        visited,
                        obstacles,
                        stepOvers,
                        weight,
                        mutableSetOf((0 until grid.size/PARTITION_SIZE) to (0 until grid[0].size/PARTITION_SIZE)),
                        true
                    )
                    if (highLevelPath.isNotEmpty()) {
                        if (minPartialHighLevelPath.isEmpty()) {
                            highLevelPath.map { node -> minPartialHighLevelPath.add(node) }
                            realEnd = stepOver
                            currentIdx=idx
                        } else {
                            if (minPartialHighLevelPath.size > highLevelPath.size) {
                                minPartialHighLevelPath = highLevelPath
                                realEnd = stepOver
                                currentIdx=idx
                            } else if (minPartialHighLevelPath.size == highLevelPath.size) {
                                val minPartialDistance = euclideanDistance(current,realEnd)

                                val partialDistance = euclideanDistance(current,stepOver)

                                if (minPartialDistance > partialDistance) {
                                    minPartialHighLevelPath = highLevelPath
                                    realEnd = stepOver
                                    currentIdx=idx
                                }
                            }
                        }
                    } else {
                        println("Path Not found")
                    }
                }

                var gridBound = mutableSetOf<Pair<IntRange, IntRange>>()
                for (i in 0 until minPartialHighLevelPath.size) {
                    val partitionStart = minPartialHighLevelPath[i]

                    val partitionXRange =
                        (partitionStart.x * PARTITION_SIZE) until ((partitionStart.x + 1) * PARTITION_SIZE + 1)
                    val partitionZRange =
                        (partitionStart.z * PARTITION_SIZE) until ((partitionStart.z + 1) * PARTITION_SIZE + 1)

                    gridBound.add(partitionXRange to partitionZRange)
                }

                stepOvers.remove(realEnd)

                var partitionPath =
                    weightedAStar(
                        current,
                        realEnd,
                        grid,
                        visited,
                        obstacles,
                        stepOvers,
                        weight,
                        gridBound
                    )

                visited.clear()
                if(partitionPath.isEmpty()){
                    partitionPath =
                        weightedAStar(
                            current,
                            realEnd,
                            grid,
                            visited,
                            obstacles,
                            stepOvers,
                            weight,
                            mutableSetOf((0 until grid.size) to (0 until grid[0].size))
                        )
                }

                if(partitionPath.isEmpty()){
                    throw IllegalArgumentException("${currentIdx}Path_Not-Found")
                }

                current = realEnd
                path.addAll(if (path.isEmpty()) partitionPath else partitionPath.drop(1))
                partitionPath.map { node -> obstacles.add(node) }
            } catch (e: Exception) {
                println("Error: ${e.message}")
            }
            visited.clear()
        }

        obstacles.remove(finish)

위의 코드는 현재 위치에서 모든 경유지 까지의 최단 경로를 단순 weighted A*가 아닌 그리드 분할을 이용한 방법입니다.

 

  • highLevelStart
    • highLevelStart는 현재 위치를 그리드 분할을 통해 압축된 좌표를 의미합니다.
  • highLevelEnd
    • highLevelEnd는 경유지 위치를 그리드 분할을 통해 압축된 좌표를 의미합니다.
  • highLevelPath
    • highLevelPath은 압축된 현재 위치에서 압축된 경유지 까지의 경로를 의미합니다. 예를드러 10x10크기로 압축을 하였을 경우  시작 좌표가 (5,12) 경유지 좌표가 (38,51)이라면 압축된 좌표는 각각 (0,1) (3,5)가 됩니다. 
    • 이제 highLevel을 이용하여 weighted A*를 수행할경우 기존의 (5,12) (38,51)사이의 경로 갯수보다  (0,1) (3,5)를 이용한 경로 갯수가 현저하게 적어지는것을 알 수 있습니다.

하지만 위에서 그리드 분할로 구한 경로를 백프로 신뢰를 할수 있을까요?

만약에 압축된 그리드 안에 장애물이 존재하여 실제로는 지나갈 수 없지만 압축된 좌표로 이동이 가능하다고하여 오류를 범할 수 있습니다.

 

이를 해결하기 위해 몇가지 트릭을 사용하여 문제를 해결 할 수 있습니다. 

1. highLevelPath를 기반으로 실제 경로를 구해봅니다.

2. 실제로 그리드 범위를 벗어나지 않고 장애물을 피해 경로를 구할 수 있다면 해당 highLevelPath를 장애물 처리 합니다.

3. 이후 weighted A*내부 구현에서 highLevelPath가 장애물로 들어가 있는지 없는지 확인을 합니다.

 

 private fun weightedAStar(
        start: Node,
        end: Node,
        grid: Array<Array<Double>>,
        visited: MutableSet<Node> = mutableSetOf(),
        obstacles: MutableSet<Node> = mutableSetOf(),
        stepOvers: MutableSet<Node> = mutableSetOf(),
        weight: Double,
        gridBound: MutableSet<Pair<IntRange, IntRange>> = mutableSetOf(),
        highLevelPath: Boolean = false
    ): MutableList<Node> {
        val openList = PriorityQueue<Node>(compareBy { it.fCost })
        val startNode = Node(start.x, start.z, manhattanDistance(start, end) * weight, 0.0)
        openList.add(startNode)

        val closedList = mutableSetOf<Node>()

        while (openList.isNotEmpty()) {
            val current = openList.poll()
            visited.add(current)
            if (current.equals(end)) {
                return reconstructPath(current)
            }

            closedList.add(current)

            val neighbors = mutableListOf<Node>()
            val dx = intArrayOf(-1, 0, 1, 0)
            val dz = intArrayOf(0, -1, 0, 1)

            for (i in dx.indices) {
                val nx = current.x + dx[i]
                val nz = current.z + dz[i]

                val neighbor = Node(nx, nz)

                if(!highLevelPath) {
                    if (gridBound.any { (xRange, zRange) -> nx in xRange && nz in zRange } &&
                        grid.indices.contains(neighbor.x) &&
                        grid[0].indices.contains(neighbor.z) &&
                        !visited.contains(neighbor) &&
                        !obstacles.contains(neighbor) &&
                        !stepOvers.contains(neighbor)) {
                            neighbors.add(Node(nx, nz))
                    }
                }else{
                    if (gridBound.any { (xRange, zRange) -> nx in xRange && nz in zRange } &&
                        grid.indices.contains(neighbor.x) &&
                        grid[0].indices.contains(neighbor.z) &&
                        !visited.contains(neighbor)) {
                            neighbors.add(Node(nx, nz))
                    }
                }

            }

            for (neighbor in neighbors) {
                val tentativeG = current.gCost + manhattanDistance(current, neighbor) * weight
                val tentativeF = tentativeG + manhattanDistance(neighbor, end) * weight

                if (closedList.any { it.x == neighbor.x && it.z == neighbor.z } && tentativeF >= neighbor.fCost) {
                    continue
                }

                val openNode = openList.find { it.x == neighbor.x && it.z == neighbor.z }
                if (openNode == null || tentativeF < openNode.fCost) {
                    neighbor.parent = current
                    neighbor.gCost = tentativeG
                    neighbor.fCost = tentativeF

                    if (openNode != null) {
                        openList.remove(openNode)
                    }

                    openList.add(neighbor)
                }
            }
        }

        return mutableListOf()
    }

위의 코드를 보면 현재 수행하고있는 weighted A* 가 highLevel인지 아닌지 판단하고 조건에 맞게 수행하는 것을 확인 할 수 있습니다. 

 

이 방식을 이용한다면 highLevelPath를 고려하여 현재 위치에서 장애물까지 관통하지 않고 경로를 빠르게 구할 수 있습니다. 하지만 앞서 설명했듯이 그리드 분할 크기를 너무 크게 하였을경우 highLevelPath에서 장애물로 처리하는 그리드 크기가 커지게 되고, 따라서 정확한 결과를 얻을 수 없습니다.

반응형