バックナンバーはこちら。
https://www.simulationroom999.com/blog/diagnostic-communication-backnumber/
はじめに
ISO-TPのシミュレーションをしよう。のシリーズ。
Pythonパッケージcan-isotpによる疑似ECU。
登場人物
博識フクロウのフクさん
イラストACにて公開の「kino_k」さんのイラストを使用しています。
https://www.ac-illust.com/main/profile.php?id=iKciwKA9&area=1
エンジニア歴8年の太郎くん
イラストACにて公開の「しのみ」さんのイラストを使用しています。
https://www.ac-illust.com/main/profile.php?id=uCKphAW2&area=1
前回の通信だと不完全?
前回、can-isotpでマルチフレームリクエストをしたんだけど。
うん。FCを送ってなんとか動いたね。
ISO15765-2としての通信としては、これでOKなんだけど、
ISO14229-1まで考えると不完全なんだよね。
どういうこと?
車両診断通信は一般的に
リクエストとレスポンスで1つの通信になるんだよ。
というわけで、前回の通信はリクエストだけなんで不完全ってことになる。
って、ことは今回はpytohn-canでレスポンスメッセージを再現するってこと?
結構大変じゃない??
今回はcan-isotpでレスポンスする。
え?そんなことできるの?
たぶん?
(まじか)
can-isotpでリクエスト-レスポンス
まずは構成を書くね。
恒例の図解だね。
まぁ両方ともcan-isotp使ってリクエストレスポンスしますよって構成だね。
今回のメンドーなポイントとしては以下。
- リクエスト側、レスポンス側の2つのスクリプトが必要。
- 共に送信機能と受信機能が必要。
メンドクサソウってことだけはわかった。
can-isotpリクエスト側のスクリプト
とりあえず、リクエスト側スクリプトはこれ
import isotp
import logging
import time
import threading
from can.interfaces.vector import VectorBus
class ThreadedApp:
def __init__(self):
isotp_params = {
'stmin' : 0,
'blocksize' : 0,
'wftmax' : 0,
'll_data_length' : 8,
'tx_padding' : 0xCC,
'rx_flowcontrol_timeout' : 1000,
'rx_consecutive_frame_timeout' : 1000,
'squash_stmin_requirement' : False,
'can_fd' : False,
'tx_data_min_length' : 8
}
self.exit_requested = False
self.bus = VectorBus(channel='0', bitrate=500000)
addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0xF1, target_address=0x10)
self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)
def start(self):
self.exit_requested = False
self.thread = threading.Thread(target = self.thread_task)
self.thread.start()
def stop(self):
self.exit_requested = True
if self.thread.isAlive():
self.thread.join()
def send(self, msg):
self.stack.send(msg)
def my_error_handler(self, error):
logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
def thread_task(self):
while self.exit_requested == False:
self.stack.process() # Non-blocking
time.sleep(0.001)
def shutdown(self):
self.stop()
self.bus.shutdown()
def sendrecv( app, msg ):
print("Send msg : %s" % (msg.hex()))
app.send(msg)
t1 = time.time()
while time.time() - t1 < 5:
if app.stack.available():
payload = app.stack.recv()
print("Recv msg : %s" % (payload.hex()))
break
time.sleep(0.2)
if __name__ == '__main__':
app = ThreadedApp()
app.start()
#sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07')
sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10')
print("Exiting")
app.shutdown()
これはまた結構ややこしくなったねー。
受信用にスレッドを使ってるからねー。
とりあえずはsendrecv関数に電文を渡すと、ISO15765-2に則って送受信すると思って良いよ。
can-isotpレスポンス側のスクリプト
おおよそ構成は一緒だけどレスポンス側のスクリプトがこれ。
import isotp
import logging
import time
import threading
from can.interfaces.vector import VectorBus
class ThreadedApp:
def __init__(self):
isotp_params = {
'stmin' : 0,
'blocksize' : 0,
'wftmax' : 0,
'll_data_length' : 8,
'tx_padding' : 0xCC,
'rx_flowcontrol_timeout' : 1000,
'rx_consecutive_frame_timeout' : 1000,
'squash_stmin_requirement' : False,
'can_fd' : True,
'tx_data_min_length' : 8
}
self.exit_requested = False
self.bus = VectorBus(channel='0', bitrate=500000)
addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x10, target_address=0xF1)
self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)
def start(self):
self.exit_requested = False
self.thread = threading.Thread(target = self.thread_task)
self.thread.start()
def stop(self):
self.exit_requested = True
if self.thread.isAlive():
self.thread.join()
def send(self, msg):
self.stack.send(msg)
def my_error_handler(self, error):
logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
def thread_task(self):
while self.exit_requested == False:
self.stack.process() # Non-blocking
#time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
time.sleep(0.001) # Variable sleep time based on state machine state
def shutdown(self):
self.stop()
self.bus.shutdown()
def recvsend( app, msg ):
while True:
if app.stack.available():
payload = app.stack.recv()
print("Recv msg : %s" % (payload.hex()))
break
time.sleep(0.0001)
print("Send msg : %s" % (msg.hex()))
app.send(msg)
t1 = time.time()
while time.time() - t1 < 5:
if app.stack.available():
break
if __name__ == '__main__':
app = ThreadedApp()
app.start()
#recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10')
print("Exiting")
app.shutdown()
受信スレッドの作りは一緒。
違いは先のsendrecv関数がrecvsendになって、受送信をする関数になったってくらい。
この関数に電文をセットすると、リクエストを受信したことをトリガーにレスポンスを返す振る舞いになる。
なるほど。
ややこしさの大半は受信スレッドを実現するためのものか。
それもあるけど、クラス化したってのもあるね。
本来クラス化した方が可読性があがるんだけど、振る舞いの説明としてはちょっと不向きかもね。
実際の動作は次回かな。
まとめ
まとめだよ。
- 車両診断通信の大半はリクエスト-レスポンスで一つの通信。
- 例外的にリクエストのみ、レスポンスのみもあるが、かなりレアなパターン。
- リクエスト側、レスポンス側ともに受信スレッドの仕組みでスクリプト肥大している。
バックナンバーはこちら。
コメント