バックナンバーはこちら。
https://www.simulationroom999.com/blog/diagnostic-communication-backnumber/
はじめに
can-isotpでCAN-FDのシミュレーションに向けて。
送受信するPythonコードを書く。
登場人物
博識フクロウのフクさん
イラストACにて公開の「kino_k」さんのイラストを使用しています。
https://www.ac-illust.com/main/profile.php?id=iKciwKA9&area=1
エンジニア歴8年の太郎くん
イラストACにて公開の「しのみ」さんのイラストを使用しています。
https://www.ac-illust.com/main/profile.php?id=uCKphAW2&area=1
CAN-FDリクエスト側can-isotpのPythonコード
早速コードを書いてしまおう。
おー!
import isotp
import logging
import time
import threading
from can.interfaces.vector import VectorBus
class ThreadedApp:
def __init__(self):
isotp_params = {
'stmin' : 0,
'blocksize' : 0,
'wftmax' : 0,
'll_data_length' : 64,
'tx_padding' : 0x55,
'rx_flowcontrol_timeout' : 1000,
'rx_consecutive_frame_timeout' : 1000,
'squash_stmin_requirement' : False,
'can_fd' : True,
'tx_data_min_length' : 8
}
self.exit_requested = False
#self.bus = VectorBus(channel='0', bitrate=500000)
self.bus = VectorBus(channel='0', bitrate=500000, fd=True)
addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0xF1, target_address=0x10)
self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)
def start(self):
self.exit_requested = False
self.thread = threading.Thread(target = self.thread_task)
self.thread.start()
def stop(self):
self.exit_requested = True
if self.thread.isAlive():
self.thread.join()
def send(self, msg):
self.stack.send(msg)
def my_error_handler(self, error):
logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
def thread_task(self):
while self.exit_requested == False:
self.stack.process() # Non-blocking
#time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
time.sleep(0.001) # Variable sleep time based on state machine state
def shutdown(self):
self.stop()
self.bus.shutdown()
def sendrecv( app, msg ):
print("Send msg : %s" % (msg.hex()))
app.send(msg)
t1 = time.time()
while time.time() - t1 < 5:
if app.stack.available():
payload = app.stack.recv()
print("Recv msg : %s" % (payload.hex()))
break
time.sleep(0.2)
if __name__ == '__main__':
app = ThreadedApp()
app.start()
sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07')
# 60byteメッセージ
sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10 … \x09\x10')
# 120byteメッセージ
sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10 … \x09\x10')
# 5000byteメッセージ
sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10 … \x09\x10')
print("Exiting")
app.shutdown()
CAN-FDリクエスト側can-isotpのPythonコード解説
とりあえずDLCを64byte。CAN-FDを有効にした感じ。
そして、それをもって、
- 7byte以下のSingleFrame
- 8byte以上のSingleFrame
- 4095byte以下のマルチフレーム
- 4096byte以上のマルチフレーム
を見る感じだ。
なるほど。
CAN-FDになってからの変化点に着目するわけだ。
CAN-FDレスポンス側can-isotpのPythonコード
対して、レスポンス側はこうなる。
import isotp
import logging
import time
import threading
from can.interfaces.vector import VectorBus
class ThreadedApp:
def __init__(self):
isotp_params = {
'stmin' : 0, # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
'blocksize' : 0, # Request the sender to send 8 consecutives frames before sending a new flow control message
'wftmax' : 0, # Number of wait frame allowed before triggering an error
#'ll_data_length' : 8, # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
'll_data_length' : 16, # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
'tx_padding' : 0xCC, # Will pad all transmitted CAN messages with byte 0x00. None means no padding
'rx_flowcontrol_timeout' : 1000, # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
'rx_consecutive_frame_timeout' : 1000, # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
'squash_stmin_requirement' : False, # When sending, respect the stmin requirement of the receiver. If set to True, go as fast as possible.
'can_fd' : True,
'tx_data_min_length' : 8,
'max_frame_size' : 6000
}
self.exit_requested = False
#self.bus = VectorBus(channel='0', bitrate=500000)
self.bus = VectorBus(channel='0', bitrate=500000, fd=True)
addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x10, target_address=0xF1)
self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)
def start(self):
self.exit_requested = False
self.thread = threading.Thread(target = self.thread_task)
self.thread.start()
def stop(self):
self.exit_requested = True
if self.thread.isAlive():
self.thread.join()
def send(self, msg):
self.stack.send(msg)
def my_error_handler(self, error):
logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
def thread_task(self):
while self.exit_requested == False:
self.stack.process() # Non-blocking
#time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
time.sleep(0.001) # Variable sleep time based on state machine state
def shutdown(self):
self.stop()
self.bus.shutdown()
def recvsend( app, msg ):
while True:
if app.stack.available():
payload = app.stack.recv()
print("Recv msg : %s" % (payload.hex()))
break
time.sleep(0.0001)
print("Send msg : %s" % (msg.hex()))
app.send(msg)
t1 = time.time()
while time.time() - t1 < 5:
if app.stack.available():
break
if __name__ == '__main__':
app = ThreadedApp()
app.start()
recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
print("Exiting")
app.shutdown()
CAN-FDレスポンス側can-isotpのPythonコード解説
あれ?
isotp_paramsにパラメータが増えた?
max_frame_sizeってのがあるけど?
おー、良く気が付いたね。
defaultだと受信可能サイズって4095byteなんだよね。
リクエスト側から5000byteのメッセージを送るんで、
レスポンス側は受信可能サイズをそれ以上にする必要がある。
今回の場合は6000byteまでは受信可能って設定になってる。
レスポンスメッセージ自体は全部SingleFrameになる感じだねー。
まぁリクエスト側で各フレームの違いは見れるんで、
レスポンス側はシンプルでいいかなーって思って。
そうだね。
レスポンス側のメッセージはこれでいいのかも。
まとめ
まとめだよ。
- リクエスト用のPythonコード書いた。
- 8byte以上のSingleFrame。
- 4096以上のMultiFrame。
- レスポンス用のPythonコード書いた。
- max_frame_sizeパラメータを弄って受信できるサイズを拡張した。
バックナンバーはこちら。
コメント